Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
sysio::connection Class Reference
Inheritance diagram for sysio::connection:
Collaboration diagram for sysio::connection:

Public Member Functions

 connection (const string &endpoint)
 
 connection ()
 
 ~connection ()=default
 
bool start_session ()
 
bool socket_is_open () const
 
const stringpeer_address () const
 
void set_connection_type (const string &peer_addr)
 
bool is_transactions_only_connection () const
 
bool is_blocks_only_connection () const
 
void set_heartbeat_timeout (std::chrono::milliseconds msec)
 
connection_status get_status () const
 
bool connected ()
 
bool current ()
 
void close (bool reconnect=true, bool shutdown=false)
 
bool populate_handshake (handshake_message &hello)
 
bool resolve_and_connect ()
 
void connect (const std::shared_ptr< tcp::resolver > &resolver, tcp::resolver::results_type endpoints)
 
void start_read_message ()
 
bool process_next_message (uint32_t message_length)
 Process the next message from the pending message buffer.
 
void send_handshake ()
 
void blk_send_branch (const block_id_type &msg_head_id)
 
void blk_send_branch_impl (uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num)
 
void blk_send (const block_id_type &blkid)
 
void stop_send ()
 
void enqueue (const net_message &msg)
 
void enqueue_block (const signed_block_ptr &sb, bool to_sync_queue=false)
 
void enqueue_buffer (const std::shared_ptr< std::vector< char > > &send_buffer, go_away_reason close_after_send, bool to_sync_queue=false)
 
void cancel_sync (go_away_reason)
 
void flush_queues ()
 
bool enqueue_sync_block ()
 
void request_sync_blocks (uint32_t start, uint32_t end)
 
void cancel_wait ()
 
void sync_wait ()
 
void fetch_wait ()
 
void sync_timeout (boost::system::error_code ec)
 
void fetch_timeout (boost::system::error_code ec)
 
void queue_write (const std::shared_ptr< vector< char > > &buff, std::function< void(boost::system::error_code, std::size_t)> callback, bool to_sync_queue=false)
 
void do_queue_write ()
 
bool is_valid (const handshake_message &msg) const
 
void handle_message (const handshake_message &msg)
 
void handle_message (const chain_size_message &msg)
 
void handle_message (const go_away_message &msg)
 
void handle_message (const notice_message &msg)
 
void handle_message (const request_message &msg)
 
void handle_message (const sync_request_message &msg)
 
void handle_message (const signed_block &msg)=delete
 
void handle_message (const block_id_type &id, signed_block_ptr msg)
 
void handle_message (const packed_transaction &msg)=delete
 
void handle_message (packed_transaction_ptr msg)
 
void process_signed_block (const block_id_type &id, signed_block_ptr msg, block_state_ptr bsp)
 
fc::variant_object get_logger_variant () const
 

Public Attributes

boost::asio::io_context::strand strand
 
std::shared_ptr< tcp::socket > socket
 
fc::message_buffer< 1024 *1024 > pending_message_buffer
 
std::atomic< std::size_t > outstanding_read_bytes {0}
 
queued_buffer buffer_queue
 
fc::sha256 conn_node_id
 
string short_conn_node_id
 
string log_p2p_address
 
string log_remote_endpoint_ip
 
string log_remote_endpoint_port
 
string local_endpoint_ip
 
string local_endpoint_port
 
std::atomic< uint32_ttrx_in_progress_size {0}
 
fc::time_point last_dropped_trx_msg_time
 
const uint32_t connection_id
 
int16_t sent_handshake_count = 0
 
std::atomic< bool > connecting {true}
 
std::atomic< bool > syncing {false}
 
std::atomic< uint16_tprotocol_version = 0
 
uint16_t net_version = net_version_max
 
block_status_monitor block_status_monitor_
 
std::atomic< uint16_tconsecutive_immediate_connection_close = 0
 
std::mutex response_expected_timer_mtx
 
boost::asio::steady_timer response_expected_timer
 
std::atomic< go_away_reasonno_retry {no_reason}
 
std::mutex conn_mtx
 
std::optional< request_messagelast_req
 
handshake_message last_handshake_recv
 
handshake_message last_handshake_sent
 
block_id_type fork_head
 
uint32_t fork_head_num {0}
 
fc::time_point last_close
 
string remote_endpoint_ip
 
tstamp latest_msg_time {0}
 
tstamp hb_timeout {std::chrono::milliseconds{def_keepalive_interval}.count()}
 
tstamp latest_blk_time {0}
 

Peer Timestamps

Time message handling

tstamp org {0}
 originate timestamp
 
tstamp rec {0}
 receive timestamp
 
tstamp dst {0}
 destination timestamp
 
tstamp xmt {0}
 transmit timestamp
 
void check_heartbeat (tstamp current_time)
 Check heartbeat time and send Time_message.
 
void send_time ()
 Populate and queue time_message.
 
void send_time (const time_message &msg)
 Populate and queue time_message immediately using incoming time_message.
 
void handle_message (const time_message &msg)
 Process time_message.
 
static tstamp get_time ()
 Read system time and convert to a 64 bit integer.
 

Detailed Description

Definition at line 581 of file net_plugin.cpp.

Constructor & Destructor Documentation

◆ connection() [1/2]

sysio::connection::connection ( const string & endpoint)
explicit

Definition at line 878 of file net_plugin.cpp.

879 : peer_addr( endpoint ),
880 strand( my_impl->thread_pool->get_executor() ),
881 socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
882 log_p2p_address( endpoint ),
884 response_expected_timer( my_impl->thread_pool->get_executor() ),
887 {
888 fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
889 }
handshake_message last_handshake_recv
const uint32_t connection_id
boost::asio::steady_timer response_expected_timer
boost::asio::io_context::strand strand
handshake_message last_handshake_sent
std::shared_ptr< tcp::socket > socket
std::atomic< uint32_t > current_connection_id
std::optional< sysio::chain::named_thread_pool > thread_pool
#define fc_ilog(LOGGER, FORMAT,...)
Definition logger.hpp:83

◆ connection() [2/2]

sysio::connection::connection ( )

Definition at line 891 of file net_plugin.cpp.

892 : peer_addr(),
893 strand( my_impl->thread_pool->get_executor() ),
894 socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
896 response_expected_timer( my_impl->thread_pool->get_executor() ),
899 {
900 fc_dlog( logger, "new connection object created" );
901 }
#define fc_dlog(LOGGER, FORMAT,...)
Definition logger.hpp:77

◆ ~connection()

sysio::connection::~connection ( )
default

Member Function Documentation

◆ blk_send()

void sysio::connection::blk_send ( const block_id_type & blkid)

Definition at line 1104 of file net_plugin.cpp.

1104 {
1105 connection_wptr weak = shared_from_this();
1106 app().post( priority::medium, [blkid, weak{std::move(weak)}]() {
1107 connection_ptr c = weak.lock();
1108 if( !c ) return;
1109 try {
1110 controller& cc = my_impl->chain_plug->chain();
1111 signed_block_ptr b = cc.fetch_block_by_id( blkid );
1112 if( b ) {
1113 fc_dlog( logger, "fetch_block_by_id num ${n}, connection ${cid}",
1114 ("n", b->block_num())("cid", c->connection_id) );
1115 my_impl->dispatcher->add_peer_block( blkid, c->connection_id );
1116 c->strand.post( [c, b{std::move(b)}]() {
1117 c->enqueue_block( b );
1118 } );
1119 } else {
1120 fc_ilog( logger, "fetch block by id returned null, id ${id}, connection ${cid}",
1121 ("id", blkid)("cid", c->connection_id) );
1122 }
1123 } catch( const assert_exception& ex ) {
1124 fc_elog( logger, "caught assert on fetch_block_by_id, ${ex}, id ${id}, connection ${cid}",
1125 ("ex", ex.to_string())("id", blkid)("cid", c->connection_id) );
1126 } catch( ... ) {
1127 fc_elog( logger, "caught other exception fetching block id ${id}, connection ${cid}",
1128 ("id", blkid)("cid", c->connection_id) );
1129 }
1130 });
1131 }
auto post(int priority, Func &&func)
signed_block_ptr fetch_block_by_id(block_id_type id) const
unique_ptr< dispatch_manager > dispatcher
chain_plugin * chain_plug
client::connection_ptr connection_ptr
#define fc_elog(LOGGER, FORMAT,...)
Definition logger.hpp:95
application & app()
std::shared_ptr< signed_block > signed_block_ptr
Definition block.hpp:105
std::weak_ptr< connection > connection_wptr
static constexpr int medium
Here is the call graph for this function:
Here is the caller graph for this function:

◆ blk_send_branch()

void sysio::connection::blk_send_branch ( const block_id_type & msg_head_id)

Definition at line 1029 of file net_plugin.cpp.

1029 {
1030 uint32_t head_num = 0;
1031 std::tie( std::ignore, std::ignore, head_num,
1032 std::ignore, std::ignore, std::ignore ) = my_impl->get_chain_info();
1033
1034 peer_dlog(this, "head_num = ${h}",("h",head_num));
1035 if(head_num == 0) {
1036 notice_message note;
1037 note.known_blocks.mode = normal;
1038 note.known_blocks.pending = 0;
1039 enqueue(note);
1040 return;
1041 }
1042 std::unique_lock<std::mutex> g_conn( conn_mtx );
1043 if( last_handshake_recv.generation >= 1 ) {
1044 peer_dlog( this, "maybe truncating branch at = ${h}:${id}",
1046 }
1047
1049 g_conn.unlock();
1050 const auto lib_num = block_header::num_from_id(lib_id);
1051 if( lib_num == 0 ) return; // if last_irreversible_block_id is null (we have not received handshake or reset)
1052
1053 app().post( priority::medium, [chain_plug = my_impl->chain_plug, c = shared_from_this(),
1054 lib_num, head_num, msg_head_id]() {
1055 auto msg_head_num = block_header::num_from_id(msg_head_id);
1056 bool on_fork = msg_head_num == 0;
1057 bool unknown_block = false;
1058 if( !on_fork ) {
1059 try {
1060 const controller& cc = chain_plug->chain();
1061 block_id_type my_id = cc.get_block_id_for_num( msg_head_num );
1062 on_fork = my_id != msg_head_id;
1063 } catch( const unknown_block_exception& ) {
1064 unknown_block = true;
1065 } catch( ... ) {
1066 on_fork = true;
1067 }
1068 }
1069 if( unknown_block ) {
1070 c->strand.post( [msg_head_num, c]() {
1071 peer_ilog( c, "Peer asked for unknown block ${mn}, sending: benign_other go away", ("mn", msg_head_num) );
1072 c->no_retry = benign_other;
1073 c->enqueue( go_away_message( benign_other ) );
1074 } );
1075 } else {
1076 if( on_fork ) msg_head_num = 0;
1077 // if peer on fork, start at their last lib, otherwise we can start at msg_head+1
1078 c->strand.post( [c, msg_head_num, lib_num, head_num]() {
1079 c->blk_send_branch_impl( msg_head_num, lib_num, head_num );
1080 } );
1081 }
1082 } );
1083 }
block_id_type get_block_id_for_num(uint32_t block_num) const
std::mutex conn_mtx
void enqueue(const net_message &msg)
std::tuple< uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type > get_chain_info() const
@ normal
Definition protocol.hpp:96
@ benign_other
reasons such as a timeout. not fatal but warrant resetting
Definition protocol.hpp:56
#define peer_dlog(PEER, FORMAT,...)
#define peer_ilog(PEER, FORMAT,...)
unsigned int uint32_t
Definition stdint.h:126
static uint32_t num_from_id(const block_id_type &id)
block_id_type head_id
Definition protocol.hpp:39
block_id_type last_irreversible_block_id
Definition protocol.hpp:37
Here is the call graph for this function:
Here is the caller graph for this function:

◆ blk_send_branch_impl()

void sysio::connection::blk_send_branch_impl ( uint32_t msg_head_num,
uint32_t lib_num,
uint32_t head_num )

Definition at line 1086 of file net_plugin.cpp.

1086 {
1087 if( !peer_requested ) {
1088 auto last = msg_head_num != 0 ? msg_head_num : lib_num;
1089 peer_requested = peer_sync_state( last+1, head_num, last );
1090 } else {
1091 auto last = msg_head_num != 0 ? msg_head_num : std::min( peer_requested->last, lib_num );
1092 uint32_t end = std::max( peer_requested->end_block, head_num );
1093 peer_requested = peer_sync_state( last+1, end, last );
1094 }
1095 if( peer_requested->start_block <= peer_requested->end_block ) {
1096 peer_ilog( this, "enqueue ${s} - ${e}", ("s", peer_requested->start_block)("e", peer_requested->end_block) );
1098 } else {
1099 peer_ilog( this, "nothing to enqueue" );
1100 peer_requested.reset();
1101 }
1102 }
Here is the call graph for this function:

◆ cancel_sync()

void sysio::connection::cancel_sync ( go_away_reason reason)

Definition at line 1262 of file net_plugin.cpp.

1262 {
1263 peer_dlog( this, "cancel sync reason = ${m}, write queue size ${o} bytes",
1264 ("m", reason_str( reason ))("o", buffer_queue.write_queue_size()) );
1265 cancel_wait();
1266 flush_queues();
1267 switch (reason) {
1268 case validation :
1269 case fatal_other : {
1270 no_retry = reason;
1271 enqueue( go_away_message( reason ));
1272 break;
1273 }
1274 default:
1275 peer_ilog(this, "sending empty request but not calling sync wait");
1276 enqueue( ( sync_request_message ) {0,0} );
1277 }
1278 }
std::atomic< go_away_reason > no_retry
queued_buffer buffer_queue
uint32_t write_queue_size() const
@ fatal_other
a catch-all for errors we don't have discriminated
Definition protocol.hpp:57
@ validation
the peer sent a block that failed validation
Definition protocol.hpp:55
constexpr auto reason_str(go_away_reason rsn)
Definition protocol.hpp:61
Here is the call graph for this function:

◆ cancel_wait()

void sysio::connection::cancel_wait ( )

Definition at line 1457 of file net_plugin.cpp.

1457 {
1458 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1459 response_expected_timer.cancel();
1460 }
std::mutex response_expected_timer_mtx

◆ check_heartbeat()

void sysio::connection::check_heartbeat ( tstamp current_time)

Definition at line 1156 of file net_plugin.cpp.

1156 {
1157 if( latest_msg_time > 0 ) {
1158 if( current_time > latest_msg_time + hb_timeout ) {
1160 if( !peer_address().empty() ) {
1161 peer_wlog(this, "heartbeat timed out for peer address");
1162 close(true);
1163 } else {
1164 peer_wlog(this, "heartbeat timed out");
1165 close(false);
1166 }
1167 return;
1168 } else {
1169 const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count());
1170 if ( current_time > latest_blk_time + timeout ) {
1172 return;
1173 }
1174 }
1175 }
1176
1177 send_time();
1178 }
void send_time()
Populate and queue time_message.
void close(bool reconnect=true, bool shutdown=false)
const string & peer_address() const
int * count
std::chrono::system_clock::duration::rep tstamp
Definition protocol.hpp:11
#define peer_wlog(PEER, FORMAT,...)
Here is the call graph for this function:

◆ close()

void sysio::connection::close ( bool reconnect = true,
bool shutdown = false )
Parameters
reconnecttrue if we should try and reconnect immediately after close
shutdowntrue only if plugin is shutting down

Definition at line 985 of file net_plugin.cpp.

985 {
986 strand.post( [self = shared_from_this(), reconnect, shutdown]() {
987 connection::_close( self.get(), reconnect, shutdown );
988 });
989 }
@ self
the connection is to itself
Definition protocol.hpp:48
Here is the caller graph for this function:

◆ connect()

void sysio::connection::connect ( const std::shared_ptr< tcp::resolver > & resolver,
tcp::resolver::results_type endpoints )

Definition at line 2343 of file net_plugin.cpp.

2343 {
2344 connecting = true;
2347 boost::asio::async_connect( *socket, endpoints,
2348 boost::asio::bind_executor( strand,
2349 [resolver, c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
2350 if( !err && socket->is_open() && socket == c->socket ) {
2351 if( c->start_session() ) {
2352 c->send_handshake();
2353 }
2354 } else {
2355 fc_elog( logger, "connection failed to ${host}:${port} ${error}",
2356 ("host", endpoint.address().to_string())("port", endpoint.port())( "error", err.message()));
2357 c->close( false );
2358 }
2359 } ) );
2360 }
std::atomic< bool > connecting
fc::message_buffer< 1024 *1024 > pending_message_buffer
Here is the call graph for this function:

◆ connected()

bool sysio::connection::connected ( )

Definition at line 973 of file net_plugin.cpp.

973 {
974 return socket_is_open() && !connecting;
975 }
bool socket_is_open() const
Here is the call graph for this function:
Here is the caller graph for this function:

◆ current()

bool sysio::connection::current ( )

Definition at line 977 of file net_plugin.cpp.

977 {
978 return (connected() && !syncing);
979 }
std::atomic< bool > syncing
Here is the call graph for this function:

◆ do_queue_write()

void sysio::connection::do_queue_write ( )

Definition at line 1212 of file net_plugin.cpp.

1212 {
1214 return;
1215 connection_ptr c(shared_from_this());
1216
1217 std::vector<boost::asio::const_buffer> bufs;
1219
1220 strand.post( [c{std::move(c)}, bufs{std::move(bufs)}]() {
1221 boost::asio::async_write( *c->socket, bufs,
1222 boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
1223 try {
1224 c->buffer_queue.clear_out_queue();
1225 // May have closed connection and cleared buffer_queue
1226 if( !c->socket_is_open() || socket != c->socket ) {
1227 peer_ilog( c, "async write socket ${r} before callback", ("r", c->socket_is_open() ? "changed" : "closed") );
1228 c->close();
1229 return;
1230 }
1231
1232 if( ec ) {
1233 if( ec.value() != boost::asio::error::eof ) {
1234 peer_elog( c, "Error sending to peer: ${i}", ( "i", ec.message() ) );
1235 } else {
1236 peer_wlog( c, "connection closure detected on write" );
1237 }
1238 c->close();
1239 return;
1240 }
1241
1242 c->buffer_queue.out_callback( ec, w );
1243
1244 c->enqueue_sync_block();
1245 c->do_queue_write();
1246 } catch ( const std::bad_alloc& ) {
1247 throw;
1248 } catch ( const boost::interprocess::bad_alloc& ) {
1249 throw;
1250 } catch( const fc::exception& ex ) {
1251 peer_elog( c, "fc::exception in do_queue_write: ${s}", ("s", ex.to_string()) );
1252 } catch( const std::exception& ex ) {
1253 peer_elog( c, "std::exception in do_queue_write: ${s}", ("s", ex.what()) );
1254 } catch( ... ) {
1255 peer_elog( c, "Unknown exception in do_queue_write" );
1256 }
1257 }));
1258 });
1259 }
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_string(log_level ll=log_level::info) const
bool ready_to_send() const
void fill_out_buffer(std::vector< boost::asio::const_buffer > &bufs)
catch(...)
#define peer_elog(PEER, FORMAT,...)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ enqueue()

void sysio::connection::enqueue ( const net_message & msg)

Definition at line 1414 of file net_plugin.cpp.

1414 {
1416 go_away_reason close_after_send = no_reason;
1417 if (std::holds_alternative<go_away_message>(m)) {
1418 close_after_send = std::get<go_away_message>(m).reason;
1419 }
1420
1421 buffer_factory buff_factory;
1422 auto send_buffer = buff_factory.get_send_buffer( m );
1423 enqueue_buffer( send_buffer, close_after_send );
1424 }
void enqueue_buffer(const std::shared_ptr< std::vector< char > > &send_buffer, go_away_reason close_after_send, bool to_sync_queue=false)
#define __func__
void verify_strand_in_this_thread(const Strand &strand, const char *func, int line)
go_away_reason
Definition protocol.hpp:46
@ no_reason
no reason to go away
Definition protocol.hpp:47
Here is the call graph for this function:
Here is the caller graph for this function:

◆ enqueue_block()

void sysio::connection::enqueue_block ( const signed_block_ptr & sb,
bool to_sync_queue = false )

Definition at line 1427 of file net_plugin.cpp.

1427 {
1428 peer_dlog( this, "enqueue block ${num}", ("num", b->block_num()) );
1430
1431 block_buffer_factory buff_factory;
1432 auto sb = buff_factory.get_send_buffer( b );
1434 enqueue_buffer( sb, no_reason, to_sync_queue);
1435 }
static tstamp get_time()
Read system time and convert to a 64 bit integer.
Here is the call graph for this function:

◆ enqueue_buffer()

void sysio::connection::enqueue_buffer ( const std::shared_ptr< std::vector< char > > & send_buffer,
go_away_reason close_after_send,
bool to_sync_queue = false )

Definition at line 1438 of file net_plugin.cpp.

1441 {
1442 connection_ptr self = shared_from_this();
1443 queue_write(send_buffer,
1444 [conn{std::move(self)}, close_after_send](boost::system::error_code ec, std::size_t ) {
1445 if (ec) return;
1446 if (close_after_send != no_reason) {
1447 fc_ilog( logger, "sent a go away message: ${r}, closing connection ${cid}",
1448 ("r", reason_str(close_after_send))("cid", conn->connection_id) );
1449 conn->close();
1450 return;
1451 }
1452 },
1453 to_sync_queue);
1454 }
void queue_write(const std::shared_ptr< vector< char > > &buff, std::function< void(boost::system::error_code, std::size_t)> callback, bool to_sync_queue=false)
Here is the call graph for this function:

◆ enqueue_sync_block()

bool sysio::connection::enqueue_sync_block ( )

Definition at line 1281 of file net_plugin.cpp.

1281 {
1282 if( !peer_requested ) {
1283 return false;
1284 } else {
1285 peer_dlog( this, "enqueue sync block ${num}", ("num", peer_requested->last + 1) );
1286 }
1287 uint32_t num = ++peer_requested->last;
1288 if(num == peer_requested->end_block) {
1289 peer_requested.reset();
1290 peer_dlog( this, "completing enqueue_sync_block ${num}", ("num", num) );
1291 }
1292 connection_wptr weak = shared_from_this();
1293 app().post( priority::medium, [num, weak{std::move(weak)}]() {
1294 connection_ptr c = weak.lock();
1295 if( !c ) return;
1296 controller& cc = my_impl->chain_plug->chain();
1298 try {
1299 sb = cc.fetch_block_by_number( num );
1300 } FC_LOG_AND_DROP();
1301 if( sb ) {
1302 c->strand.post( [c, sb{std::move(sb)}]() {
1303 c->enqueue_block( sb, true );
1304 });
1305 } else {
1306 c->strand.post( [c, num]() {
1307 peer_ilog( c, "enqueue sync, unable to fetch block ${num}, sending benign_other go away", ("num", num) );
1308 c->peer_requested.reset(); // unable to provide requested blocks
1309 c->no_retry = benign_other;
1310 c->enqueue( go_away_message( benign_other ) );
1311 });
1312 }
1313 });
1314
1315 return true;
1316 }
signed_block_ptr fetch_block_by_number(uint32_t block_num) const
#define FC_LOG_AND_DROP(...)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ fetch_timeout()

void sysio::connection::fetch_timeout ( boost::system::error_code ec)

Definition at line 1495 of file net_plugin.cpp.

1495 {
1496 if( !ec ) {
1497 my_impl->dispatcher->retry_fetch( shared_from_this() );
1498 } else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
1499 peer_elog( this, "setting timer for fetch request got error ${ec}", ("ec", ec.message() ) );
1500 }
1501 }

◆ fetch_wait()

void sysio::connection::fetch_wait ( )

Definition at line 1474 of file net_plugin.cpp.

1474 {
1475 connection_ptr c( shared_from_this() );
1476 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1477 response_expected_timer.expires_from_now( my_impl->resp_expected_period );
1478 response_expected_timer.async_wait(
1479 boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) {
1480 c->fetch_timeout(ec);
1481 } ) );
1482 }
boost::asio::steady_timer::duration resp_expected_period

◆ flush_queues()

void sysio::connection::flush_queues ( )

Definition at line 981 of file net_plugin.cpp.

981 {
983 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_logger_variant()

fc::variant_object sysio::connection::get_logger_variant ( ) const
inline

Definition at line 790 of file net_plugin.cpp.

790 {
792 mvo( "_name", log_p2p_address)
793 ( "_cid", connection_id )
794 ( "_id", conn_node_id )
795 ( "_sid", short_conn_node_id )
796 ( "_ip", log_remote_endpoint_ip )
797 ( "_port", log_remote_endpoint_port )
798 ( "_lip", local_endpoint_ip )
799 ( "_lport", local_endpoint_port );
800 return mvo;
801 }
An order-preserving dictionary of variants.
fc::sha256 conn_node_id
string log_remote_endpoint_port
string log_remote_endpoint_ip

◆ get_status()

connection_status sysio::connection::get_status ( ) const

Definition at line 943 of file net_plugin.cpp.

943 {
944 connection_status stat;
945 stat.peer = peer_addr;
946 stat.connecting = connecting;
947 stat.syncing = syncing;
948 std::lock_guard<std::mutex> g( conn_mtx );
949 stat.last_handshake = last_handshake_recv;
950 return stat;
951 }

◆ get_time()

static tstamp sysio::connection::get_time ( )
inlinestatic

There are only two calls on this routine in the program. One when a packet arrives from the network and the other when a packet is placed on the send queue. Calls the kernel time of day routine and converts to a (at least) 64 bit integer.

Definition at line 729 of file net_plugin.cpp.

729 {
730 return std::chrono::system_clock::now().time_since_epoch().count();
731 }
Here is the caller graph for this function:

◆ handle_message() [1/11]

void sysio::connection::handle_message ( const block_id_type & id,
signed_block_ptr msg )

Definition at line 3136 of file net_plugin.cpp.

3136 {
3137 peer_dlog( this, "received signed_block ${num}, id ${id}", ("num", ptr->block_num())("id", id) );
3138
3139 controller& cc = my_impl->chain_plug->chain();
3140 block_state_ptr bsp;
3141 bool exception = false;
3142 try {
3143 if( cc.fetch_block_state_by_id( id ) ) {
3144 my_impl->dispatcher->add_peer_block( id, connection_id );
3145 my_impl->sync_master->sync_recv_block( shared_from_this(), id, ptr->block_num(), false );
3146 return;
3147 }
3148 // this may return null if block is not immediately ready to be processed
3149 bsp = cc.create_block_state( id, ptr );
3150 } catch( const fc::exception& ex ) {
3151 exception = true;
3152 peer_elog(this, "bad block exception: #${n} ${id}...: ${m}",
3153 ("n", ptr->block_num())("id", id.str().substr(8,16))("m",ex.to_string()));
3154 } catch( ... ) {
3155 exception = true;
3156 peer_elog(this, "bad block: #${n} ${id}...: unknown exception",
3157 ("n", ptr->block_num())("id", id.str().substr(8,16)));
3158 }
3159 if( exception ) {
3160 my_impl->sync_master->rejected_block( shared_from_this(), ptr->block_num() );
3161 my_impl->dispatcher->rejected_block( id );
3162 return;
3163 }
3164
3165 bool signal_producer = !!bsp; // ready to process immediately, so signal producer to interrupt start_block
3166 app().post(priority::medium, [ptr{std::move(ptr)}, bsp{std::move(bsp)}, id, c = shared_from_this()]() mutable {
3167 c->process_signed_block( id, std::move(ptr), std::move(bsp) );
3168 });
3169
3170 if( signal_producer )
3171 my_impl->producer_plug->received_block();
3172 }
block_state_ptr create_block_state(const block_id_type &id, const signed_block_ptr &b) const
block_state_ptr fetch_block_state_by_id(block_id_type id) const
unique_ptr< sync_manager > sync_master
producer_plugin * producer_plug
uint64_t id
Definition code_cache.cpp:0
std::shared_ptr< block_state > block_state_ptr
Here is the call graph for this function:

◆ handle_message() [2/11]

void sysio::connection::handle_message ( const chain_size_message & msg)

Definition at line 2759 of file net_plugin.cpp.

2759 {
2760 peer_dlog(this, "received chain_size_message");
2761 }

◆ handle_message() [3/11]

void sysio::connection::handle_message ( const go_away_message & msg)

Definition at line 2922 of file net_plugin.cpp.

2922 {
2923 peer_wlog( this, "received go_away_message, reason = ${r}", ("r", reason_str( msg.reason )) );
2924
2925 bool retry = no_retry == no_reason; // if no previous go away message
2926 no_retry = msg.reason;
2927 if( msg.reason == duplicate ) {
2928 conn_node_id = msg.node_id;
2929 }
2930 if( msg.reason == wrong_version ) {
2931 if( !retry ) no_retry = fatal_other; // only retry once on wrong version
2932 }
2933 else if ( msg.reason == benign_other ) {
2934 if ( retry ) peer_dlog( this, "received benign_other reason, retrying to connect");
2935 }
2936 else {
2937 retry = false;
2938 }
2939 flush_queues();
2940
2941 close( retry ); // reconnect if wrong_version
2942 }
@ duplicate
the connection is redundant
Definition protocol.hpp:49
@ wrong_version
the peer's network version doesn't match
Definition protocol.hpp:51
Here is the call graph for this function:

◆ handle_message() [4/11]

void sysio::connection::handle_message ( const handshake_message & msg)

Definition at line 2763 of file net_plugin.cpp.

2763 {
2764 peer_dlog( this, "received handshake_message" );
2765 if( !is_valid( msg ) ) {
2766 peer_elog( this, "bad handshake message");
2768 enqueue( go_away_message( fatal_other ) );
2769 return;
2770 }
2771 peer_dlog( this, "received handshake gen ${g}, lib ${lib}, head ${head}",
2772 ("g", msg.generation)("lib", msg.last_irreversible_block_num)("head", msg.head_num) );
2773
2774 std::unique_lock<std::mutex> g_conn( conn_mtx );
2775 last_handshake_recv = msg;
2776 g_conn.unlock();
2777
2778 connecting = false;
2779 if (msg.generation == 1) {
2780 if( msg.node_id == my_impl->node_id) {
2781 peer_elog( this, "Self connection detected node_id ${id}. Closing connection", ("id", msg.node_id) );
2783 enqueue( go_away_message( go_away_reason::self ) );
2784 return;
2785 }
2786
2787 log_p2p_address = msg.p2p_address;
2788 if( peer_address().empty() ) {
2789 set_connection_type( msg.p2p_address );
2790 }
2791
2792 std::unique_lock<std::mutex> g_conn( conn_mtx );
2793 if( peer_address().empty() || last_handshake_recv.node_id == fc::sha256()) {
2794 auto c_time = last_handshake_sent.time;
2795 g_conn.unlock();
2796 peer_dlog( this, "checking for duplicate" );
2797 std::shared_lock<std::shared_mutex> g_cnts( my_impl->connections_mtx );
2798 for(const auto& check : my_impl->connections) {
2799 if(check.get() == this)
2800 continue;
2801 std::unique_lock<std::mutex> g_check_conn( check->conn_mtx );
2802 fc_dlog( logger, "dup check: connected ${c}, ${l} =? ${r}",
2803 ("c", check->connected())("l", check->last_handshake_recv.node_id)("r", msg.node_id) );
2804 if(check->connected() && check->last_handshake_recv.node_id == msg.node_id) {
2806 // It's possible that both peers could arrive here at relatively the same time, so
2807 // we need to avoid the case where they would both tell a different connection to go away.
2808 // Using the sum of the initial handshake times of the two connections, we will
2809 // arbitrarily (but consistently between the two peers) keep one of them.
2810
2811 auto check_time = check->last_handshake_sent.time + check->last_handshake_recv.time;
2812 g_check_conn.unlock();
2813 if (msg.time + c_time <= check_time)
2814 continue;
2815 } else if (net_version < proto_dup_node_id_goaway || msg.network_version < proto_dup_node_id_goaway) {
2816 if (my_impl->p2p_address < msg.p2p_address) {
2817 fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'",
2818 ("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) );
2819 // only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate,
2820 // so there is no chance for both connections to be closed
2821 continue;
2822 }
2823 } else if (my_impl->node_id < msg.node_id) {
2824 fc_dlog( logger, "not duplicate, my_impl->node_id '${lhs}' < msg.node_id '${rhs}'",
2825 ("lhs", my_impl->node_id)("rhs", msg.node_id) );
2826 // only the connection from lower node_id to higher node_id will be considered as a duplicate,
2827 // so there is no chance for both connections to be closed
2828 continue;
2829 }
2830
2831 g_cnts.unlock();
2832 peer_dlog( this, "sending go_away duplicate, msg.p2p_address: ${add}", ("add", msg.p2p_address) );
2833 go_away_message gam(duplicate);
2834 gam.node_id = conn_node_id;
2835 enqueue(gam);
2837 return;
2838 }
2839 }
2840 } else {
2841 peer_dlog( this, "skipping duplicate check, addr == ${pa}, id = ${ni}",
2842 ("pa", peer_address())( "ni", last_handshake_recv.node_id ) );
2843 g_conn.unlock();
2844 }
2845
2846 if( msg.chain_id != my_impl->chain_id ) {
2847 peer_elog( this, "Peer on a different chain. Closing connection" );
2849 enqueue( go_away_message(go_away_reason::wrong_chain) );
2850 return;
2851 }
2852 protocol_version = my_impl->to_protocol_version(msg.network_version);
2853 if( protocol_version != net_version ) {
2854 peer_ilog( this, "Local network version different: ${nv} Remote version: ${mnv}",
2855 ("nv", net_version)("mnv", protocol_version.load()) );
2856 } else {
2857 peer_ilog( this, "Local network version: ${nv}", ("nv", net_version) );
2858 }
2859
2860 conn_node_id = msg.node_id;
2861 short_conn_node_id = conn_node_id.str().substr( 0, 7 );
2862
2863 if( !my_impl->authenticate_peer( msg ) ) {
2864 peer_elog( this, "Peer not authenticated. Closing connection." );
2866 enqueue( go_away_message( go_away_reason::authentication ) );
2867 return;
2868 }
2869
2870 uint32_t peer_lib = msg.last_irreversible_block_num;
2871 connection_wptr weak = shared_from_this();
2872 app().post( priority::medium, [peer_lib, chain_plug = my_impl->chain_plug, weak{std::move(weak)},
2873 msg_lib_id = msg.last_irreversible_block_id]() {
2874 connection_ptr c = weak.lock();
2875 if( !c ) return;
2876 controller& cc = chain_plug->chain();
2877 uint32_t lib_num = cc.last_irreversible_block_num();
2878
2879 fc_dlog( logger, "handshake check for fork lib_num = ${ln}, peer_lib = ${pl}, connection ${cid}",
2880 ("ln", lib_num)("pl", peer_lib)("cid", c->connection_id) );
2881
2882 if( peer_lib <= lib_num && peer_lib > 0 ) {
2883 bool on_fork = false;
2884 try {
2885 block_id_type peer_lib_id = cc.get_block_id_for_num( peer_lib );
2886 on_fork = (msg_lib_id != peer_lib_id);
2887 } catch( const unknown_block_exception& ) {
2888 // allow this for now, will be checked on sync
2889 fc_dlog( logger, "peer last irreversible block ${pl} is unknown, connection ${cid}",
2890 ("pl", peer_lib)("cid", c->connection_id) );
2891 } catch( ... ) {
2892 fc_wlog( logger, "caught an exception getting block id for ${pl}, connection ${cid}",
2893 ("pl", peer_lib)("cid", c->connection_id) );
2894 on_fork = true;
2895 }
2896 if( on_fork ) {
2897 c->strand.post( [c]() {
2898 peer_elog( c, "Peer chain is forked, sending: forked go away" );
2899 c->no_retry = go_away_reason::forked;
2900 c->enqueue( go_away_message( go_away_reason::forked ) );
2901 } );
2902 }
2903 }
2904 });
2905
2906 // we don't support the 2.1 packed_transaction & signed_block, so tell 2.1 clients we are 2.0
2911 return;
2912 }
2913
2914 if( sent_handshake_count == 0 ) {
2916 }
2917 }
2918
2919 my_impl->sync_master->recv_handshake( shared_from_this(), msg );
2920 }
string str() const
Definition sha256.cpp:26
void set_connection_type(const string &peer_addr)
int16_t sent_handshake_count
bool is_valid(const handshake_message &msg) const
std::atomic< uint16_t > protocol_version
bool authenticate_peer(const handshake_message &msg) const
Determine if a peer is allowed to connect.
std::shared_mutex connections_mtx
static constexpr uint16_t to_protocol_version(uint16_t v)
std::set< connection_ptr > connections
constexpr uint16_t proto_explicit_sync
constexpr uint16_t proto_dup_goaway_resolution
constexpr uint16_t proto_wire_sysio_initial
@ wrong_chain
the peer's chain id doesn't match
Definition protocol.hpp:50
@ authentication
peer failed authenicatio
Definition protocol.hpp:58
constexpr uint16_t proto_pruned_types
constexpr uint16_t proto_dup_node_id_goaway
fc::sha256 node_id
used to identify peers and prevent self-connect
Definition protocol.hpp:30
int64_t time
time message created in nanoseconds from epoch
Definition protocol.hpp:32
Here is the call graph for this function:

◆ handle_message() [5/11]

void sysio::connection::handle_message ( const notice_message & msg)

Definition at line 2984 of file net_plugin.cpp.

2984 {
2985 // peer tells us about one or more blocks or txns. When done syncing, forward on
2986 // notices of previously unknown blocks or txns,
2987 //
2988 peer_dlog( this, "received notice_message" );
2989 connecting = false;
2990 if( msg.known_blocks.ids.size() > 1 ) {
2991 peer_elog( this, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection",
2992 ("s", msg.known_blocks.ids.size()) );
2993 close( false );
2994 return;
2995 }
2996 if( msg.known_trx.mode != none ) {
2998 const block_id_type& blkid = msg.known_blocks.ids.empty() ? block_id_type{} : msg.known_blocks.ids.back();
2999 peer_dlog( this, "this is a ${m} notice with ${n} pending blocks: ${num} ${id}...",
3000 ("m", modes_str( msg.known_blocks.mode ))("n", msg.known_blocks.pending)
3001 ("num", block_header::num_from_id( blkid ))("id", blkid.str().substr( 8, 16 )) );
3002 }
3003 }
3004 switch (msg.known_trx.mode) {
3005 case none:
3006 break;
3007 case last_irr_catch_up: {
3008 std::unique_lock<std::mutex> g_conn( conn_mtx );
3009 last_handshake_recv.head_num = msg.known_blocks.pending;
3010 g_conn.unlock();
3011 break;
3012 }
3013 case catch_up : {
3014 break;
3015 }
3016 case normal: {
3017 my_impl->dispatcher->recv_notice( shared_from_this(), msg, false );
3018 }
3019 }
3020
3021 if( msg.known_blocks.mode != none ) {
3022 peer_dlog( this, "this is a ${m} notice with ${n} blocks",
3023 ("m", modes_str( msg.known_blocks.mode ))( "n", msg.known_blocks.pending ) );
3024 }
3025 switch (msg.known_blocks.mode) {
3026 case none : {
3027 break;
3028 }
3029 case last_irr_catch_up:
3030 case catch_up: {
3031 my_impl->sync_master->sync_recv_notice( shared_from_this(), msg );
3032 break;
3033 }
3034 case normal : {
3035 my_impl->dispatcher->recv_notice( shared_from_this(), msg, false );
3036 break;
3037 }
3038 default: {
3039 peer_elog( this, "bad notice_message : invalid known_blocks.mode ${m}",
3040 ("m", static_cast<uint32_t>(msg.known_blocks.mode)) );
3041 }
3042 }
3043 }
bool is_enabled(log_level e) const
Definition logger.cpp:58
@ last_irr_catch_up
Definition protocol.hpp:95
@ catch_up
Definition protocol.hpp:94
constexpr auto modes_str(id_list_modes m)
Definition protocol.hpp:99
Here is the call graph for this function:

◆ handle_message() [6/11]

void sysio::connection::handle_message ( const packed_transaction & msg)
delete

◆ handle_message() [7/11]

void sysio::connection::handle_message ( const request_message & msg)

Definition at line 3045 of file net_plugin.cpp.

3045 {
3046 if( msg.req_blocks.ids.size() > 1 ) {
3047 peer_elog( this, "Invalid request_message, req_blocks.ids.size ${s}, closing",
3048 ("s", msg.req_blocks.ids.size()) );
3049 close();
3050 return;
3051 }
3052
3053 switch (msg.req_blocks.mode) {
3054 case catch_up :
3055 peer_dlog( this, "received request_message:catch_up" );
3056 blk_send_branch( msg.req_blocks.ids.empty() ? block_id_type() : msg.req_blocks.ids.back() );
3057 break;
3058 case normal :
3059 peer_dlog( this, "received request_message:normal" );
3060 if( !msg.req_blocks.ids.empty() ) {
3061 blk_send( msg.req_blocks.ids.back() );
3062 }
3063 break;
3064 default:;
3065 }
3066
3067
3068 switch (msg.req_trx.mode) {
3069 case catch_up :
3070 break;
3071 case none :
3072 if( msg.req_blocks.mode == none ) {
3073 stop_send();
3074 }
3075 // no break
3076 case normal :
3077 if( !msg.req_trx.ids.empty() ) {
3078 peer_elog( this, "Invalid request_message, req_trx.ids.size ${s}", ("s", msg.req_trx.ids.size()) );
3079 close();
3080 return;
3081 }
3082 break;
3083 default:;
3084 }
3085 }
void blk_send(const block_id_type &blkid)
void blk_send_branch(const block_id_type &msg_head_id)
Here is the call graph for this function:

◆ handle_message() [8/11]

void sysio::connection::handle_message ( const signed_block & msg)
delete

◆ handle_message() [9/11]

void sysio::connection::handle_message ( const sync_request_message & msg)

Definition at line 3087 of file net_plugin.cpp.

3087 {
3088 peer_dlog( this, "peer requested ${start} to ${end}", ("start", msg.start_block)("end", msg.end_block) );
3089 if( msg.end_block == 0 ) {
3090 peer_requested.reset();
3091 flush_queues();
3092 } else {
3093 if (peer_requested) {
3094 // This happens when peer already requested some range and sync is still in progress
3095 // It could be higher in case of peer requested head catchup and current request is lib catchup
3096 // So to make sure peer will receive all requested blocks we assign end_block to highest value
3097 peer_requested->end_block = std::max(msg.end_block, peer_requested->end_block);
3098 }
3099 else {
3100 peer_requested = peer_sync_state( msg.start_block, msg.end_block, msg.start_block-1);
3101 }
3103 }
3104 }
Here is the call graph for this function:

◆ handle_message() [10/11]

void sysio::connection::handle_message ( const time_message & msg)

Calculate offset, delay and dispersion. Note carefully the implied processing. The first-order difference is done directly in 64-bit arithmetic, then the result is converted to floating double. All further processing is in floating-double arithmetic with rounding done by the hardware. This is necessary in order to avoid overflow and preserve precision.

Definition at line 2944 of file net_plugin.cpp.

2944 {
2945 peer_ilog( this, "received time_message" );
2946
2947 /* We've already lost however many microseconds it took to dispatch
2948 * the message, but it can't be helped.
2949 */
2950 msg.dst = get_time();
2951
2952 // If the transmit timestamp is zero, the peer is horribly broken.
2953 if(msg.xmt == 0)
2954 return; /* invalid timestamp */
2955
2956 if(msg.xmt == xmt)
2957 return; /* duplicate packet */
2958
2959 xmt = msg.xmt;
2960 rec = msg.rec;
2961 dst = msg.dst;
2962
2963 if( msg.org == 0 ) {
2964 send_time( msg );
2965 return; // We don't have enough data to perform the calculation yet.
2966 }
2967
2968 double offset = (double(rec - org) + double(msg.xmt - dst)) / 2;
2969 double NsecPerUsec{1000};
2970
2972 logger.log( FC_LOG_MESSAGE( all, "Clock offset is ${o}ns (${us}us)",
2973 ("o", offset)( "us", offset / NsecPerUsec ) ) );
2974 org = 0;
2975 rec = 0;
2976
2977 std::unique_lock<std::mutex> g_conn( conn_mtx );
2978 if( last_handshake_recv.generation == 0 ) {
2979 g_conn.unlock();
2981 }
2982 }
void log(log_message m)
Definition logger.cpp:62
tstamp rec
receive timestamp
tstamp xmt
transmit timestamp
tstamp org
originate timestamp
tstamp dst
destination timestamp
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
Here is the call graph for this function:

◆ handle_message() [11/11]

void sysio::connection::handle_message ( packed_transaction_ptr msg)

Definition at line 3110 of file net_plugin.cpp.

3110 {
3111 const auto& tid = trx->id();
3112 peer_dlog( this, "received packed_transaction ${id}", ("id", tid) );
3113
3115 my_impl->chain_plug->accept_transaction( trx,
3116 [weak = weak_from_this(), trx](const std::variant<fc::exception_ptr, transaction_trace_ptr>& result) mutable {
3117 // next (this lambda) called from application thread
3118 if (std::holds_alternative<fc::exception_ptr>(result)) {
3119 fc_dlog( logger, "bad packed_transaction : ${m}", ("m", std::get<fc::exception_ptr>(result)->what()) );
3120 } else {
3121 const transaction_trace_ptr& trace = std::get<transaction_trace_ptr>(result);
3122 if( !trace->except ) {
3123 fc_dlog( logger, "chain accepted transaction, bcast ${id}", ("id", trace->id) );
3124 } else {
3125 fc_elog( logger, "bad packed_transaction : ${m}", ("m", trace->except->what()));
3126 }
3127 }
3128 connection_ptr conn = weak.lock();
3129 if( conn ) {
3130 conn->trx_in_progress_size -= calc_trx_size( trx );
3131 }
3132 });
3133 }
void accept_transaction(const chain::packed_transaction_ptr &trx, chain::plugin_interface::next_function< chain::transaction_trace_ptr > next)
std::atomic< uint32_t > trx_in_progress_size
std::shared_ptr< transaction_trace > transaction_trace_ptr
Definition trace.hpp:20
size_t calc_trx_size(const packed_transaction_ptr &trx)
Here is the call graph for this function:

◆ is_blocks_only_connection()

bool sysio::connection::is_blocks_only_connection ( ) const
inline

Definition at line 595 of file net_plugin.cpp.

595{ return connection_type == blocks_only; }
Here is the caller graph for this function:

◆ is_transactions_only_connection()

bool sysio::connection::is_transactions_only_connection ( ) const
inline

Definition at line 594 of file net_plugin.cpp.

594{ return connection_type == transactions_only; }
Here is the caller graph for this function:

◆ is_valid()

bool sysio::connection::is_valid ( const handshake_message & msg) const

Definition at line 2720 of file net_plugin.cpp.

2720 {
2721 // Do some basic validation of an incoming handshake_message, so things
2722 // that really aren't handshake messages can be quickly discarded without
2723 // affecting state.
2724 bool valid = true;
2725 if (msg.last_irreversible_block_num > msg.head_num) {
2726 peer_wlog( this, "Handshake message validation: last irreversible block (${i}) is greater than head block (${h})",
2727 ("i", msg.last_irreversible_block_num)("h", msg.head_num) );
2728 valid = false;
2729 }
2730 if (msg.p2p_address.empty()) {
2731 peer_wlog( this, "Handshake message validation: p2p_address is null string" );
2732 valid = false;
2733 } else if( msg.p2p_address.length() > max_handshake_str_length ) {
2734 // see max_handshake_str_length comment in protocol.hpp
2735 peer_wlog( this, "Handshake message validation: p2p_address too large: ${p}",
2736 ("p", msg.p2p_address.substr(0, max_handshake_str_length) + "...") );
2737 valid = false;
2738 }
2739 if (msg.os.empty()) {
2740 peer_wlog( this, "Handshake message validation: os field is null string" );
2741 valid = false;
2742 } else if( msg.os.length() > max_handshake_str_length ) {
2743 peer_wlog( this, "Handshake message validation: os field too large: ${p}",
2744 ("p", msg.os.substr(0, max_handshake_str_length) + "...") );
2745 valid = false;
2746 }
2747 if( msg.agent.length() > max_handshake_str_length ) {
2748 peer_wlog( this, "Handshake message validation: agent field too large: ${p}",
2749 ("p", msg.agent.substr(0, max_handshake_str_length) + "...") );
2750 valid = false;
2751 }
2752 if ((msg.sig != chain::signature_type() || msg.token != sha256()) && (msg.token != fc::sha256::hash(msg.time))) {
2753 peer_wlog( this, "Handshake message validation: token field invalid" );
2754 valid = false;
2755 }
2756 return valid;
2757 }
static sha256 hash(const char *d, uint32_t dlen)
Definition sha256.cpp:44
fc::crypto::signature signature_type
Definition types.hpp:78
constexpr size_t max_handshake_str_length
Definition protocol.hpp:25
Here is the call graph for this function:
Here is the caller graph for this function:

◆ peer_address()

const string & sysio::connection::peer_address ( ) const
inline

Definition at line 591 of file net_plugin.cpp.

591{ return peer_addr; } // thread safe, const
Here is the caller graph for this function:

◆ populate_handshake()

bool sysio::connection::populate_handshake ( handshake_message & hello)

Definition at line 3504 of file net_plugin.cpp.

3504 {
3505 namespace sc = std::chrono;
3506 hello.network_version = net_version_base + net_version;
3507 uint32_t lib, head;
3508 std::tie( lib, std::ignore, head,
3509 hello.last_irreversible_block_id, std::ignore, hello.head_id ) = my_impl->get_chain_info();
3510 hello.last_irreversible_block_num = lib;
3511 hello.head_num = head;
3512 hello.chain_id = my_impl->chain_id;
3513 hello.node_id = my_impl->node_id;
3514 hello.key = my_impl->get_authentication_key();
3515 hello.time = sc::duration_cast<sc::nanoseconds>(sc::system_clock::now().time_since_epoch()).count();
3516 hello.token = fc::sha256::hash(hello.time);
3517 hello.sig = my_impl->sign_compact(hello.key, hello.token);
3518 // If we couldn't sign, don't send a token.
3519 if(hello.sig == chain::signature_type())
3520 hello.token = sha256();
3521 hello.p2p_address = my_impl->p2p_address;
3522 if( is_transactions_only_connection() ) hello.p2p_address += ":trx";
3523 if( is_blocks_only_connection() ) hello.p2p_address += ":blk";
3524 hello.p2p_address += " - " + hello.node_id.str().substr(0,7);
3525#if defined( __APPLE__ )
3526 hello.os = "osx";
3527#elif defined( __linux__ )
3528 hello.os = "linux";
3529#elif defined( _WIN32 )
3530 hello.os = "win32";
3531#else
3532 hello.os = "other";
3533#endif
3534 hello.agent = my_impl->user_agent_name;
3535
3536 return true;
3537 }
bool is_blocks_only_connection() const
bool is_transactions_only_connection() const
chain::signature_type sign_compact(const chain::public_key_type &signer, const fc::sha256 &digest) const
Returns a signature of the digest using the corresponding private key of the signer.
chain::public_key_type get_authentication_key() const
Retrieve public key used to authenticate with peers.
constexpr uint16_t net_version_base
Here is the call graph for this function:

◆ process_next_message()

bool sysio::connection::process_next_message ( uint32_t message_length)

Process the next message from the pending_message_buffer. message_length is the already determined length of the data part of the message that will handle the message. Returns true is successful. Returns false if an error was encountered unpacking or processing the message.

Definition at line 2560 of file net_plugin.cpp.

2560 {
2561 try {
2563
2564 // if next message is a block we already have, exit early
2566 unsigned_int which{};
2567 fc::raw::unpack( peek_ds, which );
2568 if( which == signed_block_which ) {
2570 return process_next_block_message( message_length );
2571
2572 } else if( which == packed_transaction_which ) {
2573 return process_next_trx_message( message_length );
2574
2575 } else {
2577 net_message msg;
2578 fc::raw::unpack( ds, msg );
2579 msg_handler m( shared_from_this() );
2580 std::visit( m, msg );
2581 }
2582
2583 } catch( const fc::exception& e ) {
2584 peer_elog( this, "Exception in handling message: ${s}", ("s", e.to_detail_string()) );
2585 close();
2586 return false;
2587 }
2588 return true;
2589 }
std::string to_detail_string(log_level ll=log_level::all) const
mb_datastream< buffer_len > create_datastream()
mb_peek_datastream< buffer_len > create_peek_datastream()
static const Segment ds(Segment::ds)
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
constexpr uint32_t signed_block_which
std::variant< handshake_message, chain_size_message, go_away_message, time_message, notice_message, request_message, sync_request_message, signed_block, packed_transaction > net_message
Definition protocol.hpp:138
constexpr uint32_t packed_transaction_which
Here is the call graph for this function:

◆ process_signed_block()

void sysio::connection::process_signed_block ( const block_id_type & id,
signed_block_ptr msg,
block_state_ptr bsp )

Definition at line 3175 of file net_plugin.cpp.

3175 {
3176 controller& cc = my_impl->chain_plug->chain();
3177 uint32_t blk_num = msg->block_num();
3178 // use c in this method instead of this to highlight that all methods called on c-> must be thread safe
3179 connection_ptr c = shared_from_this();
3180
3181 // if we have closed connection then stop processing
3182 if( !c->socket_is_open() )
3183 return;
3184
3185 try {
3186 if( cc.fetch_block_by_id(blk_id) ) {
3187 c->strand.post( [sync_master = my_impl->sync_master.get(),
3188 dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
3189 dispatcher->add_peer_block( blk_id, c->connection_id );
3190 sync_master->sync_recv_block( c, blk_id, blk_num, false );
3191 });
3192 return;
3193 }
3194 } catch(...) {
3195 // should this even be caught?
3196 fc_elog( logger, "Caught an unknown exception trying to recall block ID" );
3197 }
3198
3199 fc::microseconds age( fc::time_point::now() - msg->timestamp);
3200 fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection ${cid}",
3201 ("n", blk_num)("age", age.to_seconds())("cid", c->connection_id) );
3202
3203 go_away_reason reason = fatal_other;
3204 try {
3205 bool accepted = my_impl->chain_plug->accept_block(msg, blk_id, bsp);
3206 my_impl->update_chain_info();
3207 if( !accepted ) return;
3208 reason = no_reason;
3209 } catch( const unlinkable_block_exception &ex) {
3210 fc_elog(logger, "unlinkable_block_exception connection ${cid}: #${n} ${id}...: ${m}",
3211 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3212 reason = unlinkable;
3213 } catch( const block_validate_exception &ex ) {
3214 fc_elog(logger, "block_validate_exception connection ${cid}: #${n} ${id}...: ${m}",
3215 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3216 reason = validation;
3217 } catch( const assert_exception &ex ) {
3218 fc_elog(logger, "block assert_exception connection ${cid}: #${n} ${id}...: ${m}",
3219 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3220 } catch( const fc::exception &ex ) {
3221 fc_elog(logger, "bad block exception connection ${cid}: #${n} ${id}...: ${m}",
3222 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3223 } catch( ... ) {
3224 fc_elog(logger, "bad block connection ${cid}: #${n} ${id}...: unknown exception",
3225 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16)));
3226 }
3227
3228 if( reason == no_reason ) {
3229 boost::asio::post( my_impl->thread_pool->get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() {
3230 fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) );
3231 dispatcher->add_peer_block( blk_id, cid );
3232 });
3233 c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
3234 dispatcher->recv_block( c, blk_id, blk_num );
3235 sync_master->sync_recv_block( c, blk_id, blk_num, true );
3236 });
3237 } else {
3238 c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
3239 sync_master->rejected_block( c, blk_num );
3240 dispatcher->rejected_block( blk_id );
3241 });
3242 }
3243 }
static time_point now()
Definition time.cpp:14
bool accept_block(const chain::signed_block_ptr &block, const chain::block_id_type &id, const chain::block_state_ptr &bsp)
key Invalid authority Invalid transaction Invalid block ID Invalid packed transaction Invalid chain ID Invalid symbol Signature type is not a currently activated type Block can not be found block_validate_exception
@ unlinkable
the peer sent a block we couldn't use
Definition protocol.hpp:53
Here is the call graph for this function:

◆ queue_write()

void sysio::connection::queue_write ( const std::shared_ptr< vector< char > > & buff,
std::function< void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue = false )

Definition at line 1200 of file net_plugin.cpp.

1202 {
1203 if( !buffer_queue.add_write_queue( buff, callback, to_sync_queue )) {
1204 peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) );
1205 close();
1206 return;
1207 }
1209 }
bool add_write_queue(const std::shared_ptr< vector< char > > &buff, std::function< void(boost::system::error_code, std::size_t)> callback, bool to_sync_queue)
Here is the call graph for this function:

◆ request_sync_blocks()

void sysio::connection::request_sync_blocks ( uint32_t start,
uint32_t end )

Definition at line 1504 of file net_plugin.cpp.

1504 {
1505 sync_request_message srm = {start,end};
1506 enqueue( net_message(srm) );
1507 sync_wait();
1508 }

◆ resolve_and_connect()

bool sysio::connection::resolve_and_connect ( )

Definition at line 2287 of file net_plugin.cpp.

2287 {
2288 switch ( no_retry ) {
2289 case no_reason:
2290 case wrong_version:
2291 case benign_other:
2292 case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
2293 break;
2294 default:
2295 fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
2296 return false;
2297 }
2298
2299 string::size_type colon = peer_address().find(':');
2300 if (colon == std::string::npos || colon == 0) {
2301 fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address()) );
2302 return false;
2303 }
2304
2305 connection_ptr c = shared_from_this();
2306
2308 auto connector_period_us = std::chrono::duration_cast<std::chrono::microseconds>( my_impl->connector_period );
2309 std::lock_guard<std::mutex> g( c->conn_mtx );
2310 if( last_close == fc::time_point() || last_close > fc::time_point::now() - fc::microseconds( connector_period_us.count() ) ) {
2311 return true; // true so doesn't remove from valid connections
2312 }
2313 }
2314
2315 strand.post([c]() {
2316 string::size_type colon = c->peer_address().find(':');
2317 string::size_type colon2 = c->peer_address().find(':', colon + 1);
2318 string host = c->peer_address().substr( 0, colon );
2319 string port = c->peer_address().substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1));
2320 c->set_connection_type( c->peer_address() );
2321
2322 auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool->get_executor() );
2323 connection_wptr weak_conn = c;
2324 // Note: need to add support for IPv6 too
2325 resolver->async_resolve( tcp::v4(), host, port, boost::asio::bind_executor( c->strand,
2326 [resolver, weak_conn, host, port]( const boost::system::error_code& err, tcp::resolver::results_type endpoints ) {
2327 auto c = weak_conn.lock();
2328 if( !c ) return;
2329 if( !err ) {
2330 c->connect( resolver, endpoints );
2331 } else {
2332 fc_elog( logger, "Unable to resolve ${host}:${port} ${error}",
2333 ("host", host)("port", port)( "error", err.message() ) );
2334 c->connecting = false;
2335 ++c->consecutive_immediate_connection_close;
2336 }
2337 } ) );
2338 } );
2339 return true;
2340 }
fc::time_point last_close
std::atomic< uint16_t > consecutive_immediate_connection_close
boost::asio::steady_timer::duration connector_period
constexpr auto def_max_consecutive_immediate_connection_close
Here is the call graph for this function:

◆ send_handshake()

void sysio::connection::send_handshake ( )

Definition at line 1137 of file net_plugin.cpp.

1137 {
1138 strand.post( [c = shared_from_this()]() {
1139 std::unique_lock<std::mutex> g_conn( c->conn_mtx );
1140 if( c->populate_handshake( c->last_handshake_sent ) ) {
1141 static_assert( std::is_same_v<decltype( c->sent_handshake_count ), int16_t>, "INT16_MAX based on int16_t" );
1142 if( c->sent_handshake_count == INT16_MAX ) c->sent_handshake_count = 1; // do not wrap
1143 c->last_handshake_sent.generation = ++c->sent_handshake_count;
1144 auto last_handshake_sent = c->last_handshake_sent;
1145 g_conn.unlock();
1146 peer_ilog( c, "Sending handshake generation ${g}, lib ${lib}, head ${head}, id ${id}",
1149 ("head", last_handshake_sent.head_num)("id", last_handshake_sent.head_id.str().substr(8,16)) );
1150 c->enqueue( last_handshake_sent );
1151 }
1152 });
1153 }
signed short int16_t
Definition stdint.h:122
#define INT16_MAX
Definition stdint.h:181
uint32_t last_irreversible_block_num
Definition protocol.hpp:36
Here is the call graph for this function:
Here is the caller graph for this function:

◆ send_time() [1/2]

void sysio::connection::send_time ( )

Definition at line 1181 of file net_plugin.cpp.

1181 {
1182 time_message xpkt;
1183 xpkt.org = rec;
1184 xpkt.rec = dst;
1185 xpkt.xmt = get_time();
1186 org = xpkt.xmt;
1187 enqueue(xpkt);
1188 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ send_time() [2/2]

void sysio::connection::send_time ( const time_message & msg)

Definition at line 1191 of file net_plugin.cpp.

1191 {
1192 time_message xpkt;
1193 xpkt.org = msg.xmt;
1194 xpkt.rec = msg.dst;
1195 xpkt.xmt = get_time();
1196 enqueue(xpkt);
1197 }
Here is the call graph for this function:

◆ set_connection_type()

void sysio::connection::set_connection_type ( const string & peer_addr)

Definition at line 918 of file net_plugin.cpp.

918 {
919 // host:port:[<trx>|<blk>]
920 string::size_type colon = peer_add.find(':');
921 string::size_type colon2 = peer_add.find(':', colon + 1);
922 string::size_type end = colon2 == string::npos
923 ? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex
924 string host = peer_add.substr( 0, colon );
925 string port = peer_add.substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1));
926 string type = colon2 == string::npos ? "" : end == string::npos ?
927 peer_add.substr( colon2 + 1 ) : peer_add.substr( colon2 + 1, end - (colon2 + 1) );
928
929 if( type.empty() ) {
930 fc_dlog( logger, "Setting connection ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) );
931 connection_type = both;
932 } else if( type == "trx" ) {
933 fc_dlog( logger, "Setting connection ${c} type for: ${peer} to transactions only", ("c", connection_id)("peer", peer_add) );
934 connection_type = transactions_only;
935 } else if( type == "blk" ) {
936 fc_dlog( logger, "Setting connection ${c} type for: ${peer} to blocks only", ("c", connection_id)("peer", peer_add) );
937 connection_type = blocks_only;
938 } else {
939 fc_wlog( logger, "Unknown connection ${c} type: ${t}, for ${peer}", ("c", connection_id)("t", type)("peer", peer_add) );
940 }
941 }
#define fc_wlog(LOGGER, FORMAT,...)
Definition logger.hpp:89
Here is the caller graph for this function:

◆ set_heartbeat_timeout()

void sysio::connection::set_heartbeat_timeout ( std::chrono::milliseconds msec)
inline

Definition at line 596 of file net_plugin.cpp.

596 {
597 std::chrono::system_clock::duration dur = msec;
598 hb_timeout = dur.count();
599 }

◆ socket_is_open()

bool sysio::connection::socket_is_open ( ) const
inline

Definition at line 590 of file net_plugin.cpp.

590{ return socket_open.load(); } // thread safe, atomic
Here is the caller graph for this function:

◆ start_read_message()

void sysio::connection::start_read_message ( )

Definition at line 2432 of file net_plugin.cpp.

2432 {
2433 try {
2434 std::size_t minimum_read =
2435 std::atomic_exchange<decltype(outstanding_read_bytes.load())>( &outstanding_read_bytes, 0 );
2436 minimum_read = minimum_read != 0 ? minimum_read : message_header_size;
2437
2438 if (my_impl->use_socket_read_watermark) {
2439 const size_t max_socket_read_watermark = 4096;
2440 std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);
2441 boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);
2442 boost::system::error_code ec;
2443 socket->set_option( read_watermark_opt, ec );
2444 if( ec ) {
2445 peer_elog( this, "unable to set read watermark: ${e1}", ("e1", ec.message()) );
2446 }
2447 }
2448
2449 auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {
2450 if (ec || bytes_transferred >= minimum_read ) {
2451 return 0;
2452 } else {
2453 return minimum_read - bytes_transferred;
2454 }
2455 };
2456
2457 uint32_t write_queue_size = buffer_queue.write_queue_size();
2458 if( write_queue_size > def_max_write_queue_size ) {
2459 peer_elog( this, "write queue full ${s} bytes, giving up on connection, closing", ("s", write_queue_size) );
2460 close( false );
2461 return;
2462 }
2463
2464 boost::asio::async_read( *socket,
2466 boost::asio::bind_executor( strand,
2467 [conn = shared_from_this(), socket=socket]( boost::system::error_code ec, std::size_t bytes_transferred ) {
2468 // may have closed connection and cleared pending_message_buffer
2469 if( !conn->socket_is_open() || socket != conn->socket ) return;
2470
2471 bool close_connection = false;
2472 try {
2473 if( !ec ) {
2474 if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
2475 peer_elog( conn, "async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",
2476 ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()) );
2477 }
2478 SYS_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write(), plugin_exception, "");
2479 conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
2480 while (conn->pending_message_buffer.bytes_to_read() > 0) {
2481 uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
2482
2483 if (bytes_in_buffer < message_header_size) {
2484 conn->outstanding_read_bytes = message_header_size - bytes_in_buffer;
2485 break;
2486 } else {
2487 uint32_t message_length;
2488 auto index = conn->pending_message_buffer.read_index();
2489 conn->pending_message_buffer.peek(&message_length, sizeof(message_length), index);
2490 if(message_length > def_send_buffer_size*2 || message_length == 0) {
2491 peer_elog( conn, "incoming message length unexpected (${i})", ("i", message_length) );
2492 close_connection = true;
2493 break;
2494 }
2495
2496 auto total_message_bytes = message_length + message_header_size;
2497
2498 if (bytes_in_buffer >= total_message_bytes) {
2499 conn->pending_message_buffer.advance_read_ptr(message_header_size);
2500 conn->consecutive_immediate_connection_close = 0;
2501 if (!conn->process_next_message(message_length)) {
2502 return;
2503 }
2504 } else {
2505 auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;
2506 auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();
2507 if (outstanding_message_bytes > available_buffer_bytes) {
2508 conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );
2509 }
2510
2511 conn->outstanding_read_bytes = outstanding_message_bytes;
2512 break;
2513 }
2514 }
2515 }
2516 if( !close_connection ) conn->start_read_message();
2517 } else {
2518 if (ec.value() != boost::asio::error::eof) {
2519 peer_elog( conn, "Error reading message: ${m}", ( "m", ec.message() ) );
2520 } else {
2521 peer_ilog( conn, "Peer closed connection" );
2522 }
2523 close_connection = true;
2524 }
2525 }
2526 catch ( const std::bad_alloc& )
2527 {
2528 throw;
2529 }
2530 catch ( const boost::interprocess::bad_alloc& )
2531 {
2532 throw;
2533 }
2534 catch(const fc::exception &ex)
2535 {
2536 peer_elog( conn, "Exception in handling read data ${s}", ("s",ex.to_string()) );
2537 close_connection = true;
2538 }
2539 catch(const std::exception &ex) {
2540 peer_elog( conn, "Exception in handling read data: ${s}", ("s",ex.what()) );
2541 close_connection = true;
2542 }
2543 catch (...) {
2544 peer_elog( conn, "Undefined exception handling read data" );
2545 close_connection = true;
2546 }
2547
2548 if( close_connection ) {
2549 peer_elog( conn, "Closing connection" );
2550 conn->close();
2551 }
2552 }));
2553 } catch (...) {
2554 peer_elog( this, "Undefined exception in start_read_message, closing connection" );
2555 close();
2556 }
2557 }
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
std::vector< boost::asio::mutable_buffer > get_buffer_sequence_for_boost_async_read()
uint32_t bytes_to_write() const
std::atomic< std::size_t > outstanding_read_bytes
constexpr auto message_header_size
constexpr auto def_send_buffer_size
constexpr auto def_max_write_queue_size
Here is the call graph for this function:
Here is the caller graph for this function:

◆ start_session()

bool sysio::connection::start_session ( )

Definition at line 954 of file net_plugin.cpp.

954 {
956
957 update_endpoints();
958 boost::asio::ip::tcp::no_delay nodelay( true );
959 boost::system::error_code ec;
960 socket->set_option( nodelay, ec );
961 if( ec ) {
962 peer_elog( this, "connection failed (set_option): ${e1}", ( "e1", ec.message() ) );
963 close();
964 return false;
965 } else {
966 peer_dlog( this, "connected" );
967 socket_open = true;
969 return true;
970 }
971 }
Here is the call graph for this function:

◆ stop_send()

void sysio::connection::stop_send ( )

Definition at line 1133 of file net_plugin.cpp.

1133 {
1134 syncing = false;
1135 }
Here is the caller graph for this function:

◆ sync_timeout()

void sysio::connection::sync_timeout ( boost::system::error_code ec)

Definition at line 1485 of file net_plugin.cpp.

1485 {
1486 if( !ec ) {
1487 my_impl->sync_master->sync_reassign_fetch( shared_from_this(), benign_other );
1488 close(true);
1489 } else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
1490 peer_elog( this, "setting timer for sync request got error ${ec}", ("ec", ec.message()) );
1491 }
1492 }
Here is the call graph for this function:

◆ sync_wait()

void sysio::connection::sync_wait ( )

Definition at line 1463 of file net_plugin.cpp.

1463 {
1464 connection_ptr c(shared_from_this());
1465 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1466 response_expected_timer.expires_from_now( my_impl->resp_expected_period );
1467 response_expected_timer.async_wait(
1468 boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) {
1469 c->sync_timeout( ec );
1470 } ) );
1471 }

Member Data Documentation

◆ block_status_monitor_

block_status_monitor sysio::connection::block_status_monitor_

Definition at line 645 of file net_plugin.cpp.

◆ buffer_queue

queued_buffer sysio::connection::buffer_queue

Definition at line 626 of file net_plugin.cpp.

◆ conn_mtx

std::mutex sysio::connection::conn_mtx
mutable

Definition at line 653 of file net_plugin.cpp.

◆ conn_node_id

fc::sha256 sysio::connection::conn_node_id

Definition at line 628 of file net_plugin.cpp.

◆ connecting

std::atomic<bool> sysio::connection::connecting {true}

Definition at line 640 of file net_plugin.cpp.

640{true};

◆ connection_id

const uint32_t sysio::connection::connection_id

Definition at line 638 of file net_plugin.cpp.

◆ consecutive_immediate_connection_close

std::atomic<uint16_t> sysio::connection::consecutive_immediate_connection_close = 0

Definition at line 646 of file net_plugin.cpp.

◆ dst

tstamp sysio::connection::dst {0}

Definition at line 671 of file net_plugin.cpp.

671{0};

◆ fork_head

block_id_type sysio::connection::fork_head

Definition at line 657 of file net_plugin.cpp.

◆ fork_head_num

uint32_t sysio::connection::fork_head_num {0}

Definition at line 658 of file net_plugin.cpp.

658{0};

◆ hb_timeout

tstamp sysio::connection::hb_timeout {std::chrono::milliseconds{def_keepalive_interval}.count()}

Definition at line 676 of file net_plugin.cpp.

676{std::chrono::milliseconds{def_keepalive_interval}.count()};
constexpr auto def_keepalive_interval

◆ last_close

fc::time_point sysio::connection::last_close

Definition at line 659 of file net_plugin.cpp.

◆ last_dropped_trx_msg_time

fc::time_point sysio::connection::last_dropped_trx_msg_time

Definition at line 637 of file net_plugin.cpp.

◆ last_handshake_recv

handshake_message sysio::connection::last_handshake_recv

Definition at line 655 of file net_plugin.cpp.

◆ last_handshake_sent

handshake_message sysio::connection::last_handshake_sent

Definition at line 656 of file net_plugin.cpp.

◆ last_req

std::optional<request_message> sysio::connection::last_req

Definition at line 654 of file net_plugin.cpp.

◆ latest_blk_time

tstamp sysio::connection::latest_blk_time {0}

Definition at line 677 of file net_plugin.cpp.

677{0};

◆ latest_msg_time

tstamp sysio::connection::latest_msg_time {0}

Definition at line 675 of file net_plugin.cpp.

675{0};

◆ local_endpoint_ip

string sysio::connection::local_endpoint_ip

Definition at line 633 of file net_plugin.cpp.

◆ local_endpoint_port

string sysio::connection::local_endpoint_port

Definition at line 634 of file net_plugin.cpp.

◆ log_p2p_address

string sysio::connection::log_p2p_address

Definition at line 630 of file net_plugin.cpp.

◆ log_remote_endpoint_ip

string sysio::connection::log_remote_endpoint_ip

Definition at line 631 of file net_plugin.cpp.

◆ log_remote_endpoint_port

string sysio::connection::log_remote_endpoint_port

Definition at line 632 of file net_plugin.cpp.

◆ net_version

uint16_t sysio::connection::net_version = net_version_max

Definition at line 644 of file net_plugin.cpp.

◆ no_retry

std::atomic<go_away_reason> sysio::connection::no_retry {no_reason}

Definition at line 651 of file net_plugin.cpp.

651{no_reason};

◆ org

tstamp sysio::connection::org {0}

Definition at line 669 of file net_plugin.cpp.

669{0};

◆ outstanding_read_bytes

std::atomic<std::size_t> sysio::connection::outstanding_read_bytes {0}

Definition at line 624 of file net_plugin.cpp.

624{0}; // accessed only from strand threads

◆ pending_message_buffer

fc::message_buffer<1024*1024> sysio::connection::pending_message_buffer

Definition at line 623 of file net_plugin.cpp.

◆ protocol_version

std::atomic<uint16_t> sysio::connection::protocol_version = 0

Definition at line 643 of file net_plugin.cpp.

◆ rec

tstamp sysio::connection::rec {0}

Definition at line 670 of file net_plugin.cpp.

670{0};

◆ remote_endpoint_ip

string sysio::connection::remote_endpoint_ip

Definition at line 660 of file net_plugin.cpp.

◆ response_expected_timer

boost::asio::steady_timer sysio::connection::response_expected_timer

Definition at line 649 of file net_plugin.cpp.

◆ response_expected_timer_mtx

std::mutex sysio::connection::response_expected_timer_mtx

Definition at line 648 of file net_plugin.cpp.

◆ sent_handshake_count

int16_t sysio::connection::sent_handshake_count = 0

Definition at line 639 of file net_plugin.cpp.

◆ short_conn_node_id

string sysio::connection::short_conn_node_id

Definition at line 629 of file net_plugin.cpp.

◆ socket

std::shared_ptr<tcp::socket> sysio::connection::socket

Definition at line 621 of file net_plugin.cpp.

◆ strand

boost::asio::io_context::strand sysio::connection::strand

Definition at line 620 of file net_plugin.cpp.

◆ syncing

std::atomic<bool> sysio::connection::syncing {false}

Definition at line 641 of file net_plugin.cpp.

641{false};

◆ trx_in_progress_size

std::atomic<uint32_t> sysio::connection::trx_in_progress_size {0}

Definition at line 636 of file net_plugin.cpp.

636{0};

◆ xmt

tstamp sysio::connection::xmt {0}

Definition at line 672 of file net_plugin.cpp.

672{0};

The documentation for this class was generated from the following file: