24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/ip/host_name.hpp>
26#include <boost/asio/steady_timer.hpp>
29#include <shared_mutex>
38 using boost::asio::ip::tcp;
39 using boost::asio::ip::address_v4;
40 using boost::asio::ip::host_name;
41 using boost::multi_index_container;
53 template <
typename Strand>
55 if( !strand.running_in_this_thread() ) {
56 elog(
"wrong strand: ${f} : line ${n}, exiting", (
"f", func)(
"n", line) );
69 typedef multi_index_container<
75 member<node_transaction_state, transaction_id_type, &node_transaction_state::id>,
76 member<node_transaction_state, uint32_t, &node_transaction_state::connection_id>
78 composite_key_compare< sha256_less, std::less<uint32_t> >
82 member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > >
94 struct by_peer_block_id;
97 typedef multi_index_container<
100 ordered_unique< tag<by_id>,
102 member<peer_block_state, uint32_t, &sysio::peer_block_state::connection_id>,
103 member<peer_block_state, block_id_type, &sysio::peer_block_state::id>
105 composite_key_compare< std::less<uint32_t>,
sha256_less >
107 ordered_non_unique< tag<by_peer_block_id>,
109 member<peer_block_state, block_id_type, &sysio::peer_block_state::id>,
110 member<peer_block_state, bool, &sysio::peer_block_state::have_block>
112 composite_key_compare< sha256_less, std::greater<bool> >
114 ordered_non_unique< tag<by_block_num>, member<sysio::peer_block_state, uint32_t, &sysio::peer_block_state::block_num > >
127 static constexpr int64_t block_interval_ns =
128 std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();
130 mutable std::mutex sync_mtx;
132 uint32_t sync_last_requested_num{0};
136 std::atomic<stages> sync_state{in_sync};
139 constexpr static auto stage_str( stages
s );
140 bool set_state( stages
s );
141 bool is_sync_required(
uint32_t fork_head_block_num );
158 return std::unique_lock<std::mutex>(sync_mtx);
161 sync_last_requested_num = 0;
166 mutable std::mutex blk_state_mtx;
168 mutable std::mutex local_txns_mtx;
290 mutable std::mutex chain_info_mtx;
293 uint32_t chain_fork_head_blk_num{0};
301 std::tuple<uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type>
get_chain_info()
const;
307 void transaction_ack(
const std::pair<fc::exception_ptr, packed_transaction_ptr>&);
310 void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
351 string connect(
const string& host );
359#define peer_dlog( PEER, FORMAT, ... ) \
360 FC_MULTILINE_MACRO_BEGIN \
361 if( logger.is_enabled( fc::log_level::debug ) ) { \
362 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
363 logger.log( FC_LOG_MESSAGE( debug, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
365 FC_MULTILINE_MACRO_END
367#define peer_ilog( PEER, FORMAT, ... ) \
368 FC_MULTILINE_MACRO_BEGIN \
369 if( logger.is_enabled( fc::log_level::info ) ) { \
370 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
371 logger.log( FC_LOG_MESSAGE( info, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
373 FC_MULTILINE_MACRO_END
375#define peer_wlog( PEER, FORMAT, ... ) \
376 FC_MULTILINE_MACRO_BEGIN \
377 if( logger.is_enabled( fc::log_level::warn ) ) { \
378 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
379 logger.log( FC_LOG_MESSAGE( warn, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
381 FC_MULTILINE_MACRO_END
383#define peer_elog( PEER, FORMAT, ... ) \
384 FC_MULTILINE_MACRO_BEGIN \
385 if( logger.is_enabled( fc::log_level::error ) ) { \
386 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
387 logger.log( FC_LOG_MESSAGE( error, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
389 FC_MULTILINE_MACRO_END
392 template<class enum_type, class=typename std::enable_if<std::is_enum<enum_type>::value>::type>
395 using T = std::underlying_type_t <enum_type>;
396 return lhs =
static_cast<enum_type>(
static_cast<T>(lhs) |
static_cast<T>(rhs));
419#pragma GCC diagnostic push
420#pragma GCC diagnostic ignored "-Wunused-variable"
429#pragma GCC diagnostic pop
451 std::lock_guard<std::mutex> g( _mtx );
452 _write_queue.clear();
453 _sync_write_queue.clear();
454 _write_queue_size = 0;
458 std::lock_guard<std::mutex> g( _mtx );
459 while ( _out_queue.size() > 0 ) {
460 _out_queue.pop_front();
465 std::lock_guard<std::mutex> g( _mtx );
466 return _write_queue_size;
470 std::lock_guard<std::mutex> g( _mtx );
471 return _out_queue.empty();
475 std::lock_guard<std::mutex> g( _mtx );
477 return ((!_sync_write_queue.empty() || !_write_queue.empty()) && _out_queue.empty());
482 std::function<
void( boost::system::error_code, std::size_t )> callback,
483 bool to_sync_queue ) {
484 std::lock_guard<std::mutex> g( _mtx );
485 if( to_sync_queue ) {
486 _sync_write_queue.push_back( {buff, callback} );
488 _write_queue.push_back( {buff, callback} );
490 _write_queue_size += buff->size();
498 std::lock_guard<std::mutex> g( _mtx );
499 if( _sync_write_queue.size() > 0 ) {
503 SYS_ASSERT( _write_queue_size == 0, plugin_exception,
"write queue size expected to be zero" );
508 std::lock_guard<std::mutex> g( _mtx );
509 for(
auto& m : _out_queue ) {
517 deque<queued_write>& w_queue ) {
518 while ( w_queue.size() > 0 ) {
519 auto& m = w_queue.front();
520 bufs.push_back( boost::asio::buffer( *m.buff ));
521 _write_queue_size -= m.buff->size();
522 _out_queue.emplace_back( m );
528 struct queued_write {
529 std::shared_ptr<vector<char>> buff;
530 std::function<void( boost::system::error_code, std::size_t )> callback;
533 mutable std::mutex _mtx;
535 deque<queued_write> _write_queue;
536 deque<queued_write> _sync_write_queue;
537 deque<queued_write> _out_queue;
548 bool in_accepted_state_ {
true};
552 const uint32_t max_consecutive_rejected_windows_{13};
561 uint32_t max_rejected_windows = 13) :
562 window_size_(window_size) {}
581 class connection :
public std::enable_shared_from_this<connection> {
583 explicit connection(
const string& endpoint );
597 std::chrono::system_clock::duration dur = msec;
602 static const string unknown;
604 void update_endpoints();
606 std::optional<peer_sync_state> peer_requested;
608 std::atomic<bool> socket_open{
false};
610 const string peer_addr;
611 enum connection_types :
char {
617 std::atomic<connection_types> connection_type{both};
684 void close(
bool reconnect =
true,
bool shutdown =
false );
686 static void _close(
connection*
self,
bool reconnect,
bool shutdown );
688 bool process_next_block_message(
uint32_t message_length);
689 bool process_next_trx_message(
uint32_t message_length);
695 void connect(
const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints );
730 return std::chrono::system_clock::now().time_since_epoch().count();
741 void enqueue_buffer(
const std::shared_ptr<std::vector<char>>& send_buffer,
743 bool to_sync_queue =
false);
756 std::function<
void(boost::system::error_code, std::size_t)> callback,
757 bool to_sync_queue =
false);
804 const string connection::unknown =
"<unknown>";
813 SYS_ASSERT(
false, plugin_config_exception,
"Not implemented, call handle_message directly instead" );
819 c->handle_message( msg );
825 c->handle_message( msg );
831 c->handle_message( msg );
837 c->handle_message( msg );
843 c->handle_message( msg );
849 c->handle_message( msg );
854 peer_dlog(
c,
"handle sync_request_message" );
855 c->handle_message( msg );
859 template<
typename Function>
863 if( !
f( c ) )
return;
867 template<
typename Function>
871 if( c->is_transactions_only_connection() )
continue;
872 if( !
f( c ) )
return;
879 : peer_addr( endpoint ),
880 strand( my_impl->thread_pool->get_executor() ),
881 socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
882 log_p2p_address( endpoint ),
883 connection_id( ++my_impl->current_connection_id ),
884 response_expected_timer( my_impl->thread_pool->get_executor() ),
885 last_handshake_recv(),
886 last_handshake_sent()
893 strand( my_impl->thread_pool->get_executor() ),
894 socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
895 connection_id( ++my_impl->current_connection_id ),
896 response_expected_timer( my_impl->thread_pool->get_executor() ),
897 last_handshake_recv(),
898 last_handshake_sent()
904 void connection::update_endpoints() {
905 boost::system::error_code ec;
906 boost::system::error_code ec2;
908 auto lep =
socket->local_endpoint(ec2);
913 std::lock_guard<std::mutex> g_conn(
conn_mtx );
920 string::size_type colon = peer_add.find(
':');
921 string::size_type colon2 = peer_add.find(
':', colon + 1);
922 string::size_type end = colon2 == string::npos
923 ? string::npos : peer_add.find_first_of(
" :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 );
924 string host = peer_add.substr( 0, colon );
925 string port = peer_add.substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1));
926 string type = colon2 == string::npos ?
"" : end == string::npos ?
927 peer_add.substr( colon2 + 1 ) : peer_add.substr( colon2 + 1, end - (colon2 + 1) );
930 fc_dlog(
logger,
"Setting connection ${c} type for: ${peer} to both transactions and blocks", (
"c",
connection_id)(
"peer", peer_add) );
931 connection_type = both;
932 }
else if( type ==
"trx" ) {
934 connection_type = transactions_only;
935 }
else if( type ==
"blk" ) {
937 connection_type = blocks_only;
945 stat.
peer = peer_addr;
948 std::lock_guard<std::mutex> g(
conn_mtx );
958 boost::asio::ip::tcp::no_delay nodelay(
true );
959 boost::system::error_code ec;
960 socket->set_option( nodelay, ec );
962 peer_elog(
this,
"connection failed (set_option): ${e1}", (
"e1", ec.message() ) );
986 strand.post( [
self = shared_from_this(), reconnect, shutdown]() {
987 connection::_close(
self.get(), reconnect, shutdown );
992 void connection::_close(
connection*
self,
bool reconnect,
bool shutdown ) {
993 self->socket_open =
false;
994 boost::system::error_code ec;
995 if(
self->socket->is_open() ) {
996 self->socket->shutdown( tcp::socket::shutdown_both, ec );
997 self->socket->close( ec );
999 self->socket.reset(
new tcp::socket( my_impl->
thread_pool->get_executor() ) );
1000 self->flush_queues();
1001 self->connecting =
false;
1002 self->syncing =
false;
1003 self->block_status_monitor_.reset();
1004 ++
self->consecutive_immediate_connection_close;
1005 bool has_last_req =
false;
1007 std::lock_guard<std::mutex> g_conn(
self->conn_mtx );
1008 has_last_req =
self->last_req.has_value();
1014 if( has_last_req && !shutdown ) {
1017 self->peer_requested.reset();
1018 self->sent_handshake_count = 0;
1019 if( !shutdown) my_impl->
sync_master->sync_reset_lib_num(
self->shared_from_this(),
true );
1021 self->cancel_wait();
1023 if( reconnect && !shutdown ) {
1031 std::tie( std::ignore, std::ignore, head_num,
1032 std::ignore, std::ignore, std::ignore ) = my_impl->
get_chain_info();
1034 peer_dlog(
this,
"head_num = ${h}",(
"h",head_num));
1042 std::unique_lock<std::mutex> g_conn(
conn_mtx );
1044 peer_dlog(
this,
"maybe truncating branch at = ${h}:${id}",
1051 if( lib_num == 0 )
return;
1054 lib_num, head_num, msg_head_id]() {
1056 bool on_fork = msg_head_num == 0;
1057 bool unknown_block =
false;
1062 on_fork = my_id != msg_head_id;
1063 }
catch(
const unknown_block_exception& ) {
1064 unknown_block =
true;
1069 if( unknown_block ) {
1070 c->strand.post( [msg_head_num, c]() {
1071 peer_ilog( c,
"Peer asked for unknown block ${mn}, sending: benign_other go away", (
"mn", msg_head_num) );
1076 if( on_fork ) msg_head_num = 0;
1078 c->strand.post( [c, msg_head_num, lib_num, head_num]() {
1079 c->blk_send_branch_impl( msg_head_num, lib_num, head_num );
1087 if( !peer_requested ) {
1088 auto last = msg_head_num != 0 ? msg_head_num : lib_num;
1091 auto last = msg_head_num != 0 ? msg_head_num : std::min( peer_requested->last, lib_num );
1092 uint32_t end = std::max( peer_requested->end_block, head_num );
1095 if( peer_requested->start_block <= peer_requested->end_block ) {
1096 peer_ilog(
this,
"enqueue ${s} - ${e}", (
"s", peer_requested->start_block)(
"e", peer_requested->end_block) );
1099 peer_ilog(
this,
"nothing to enqueue" );
1100 peer_requested.reset();
1113 fc_dlog(
logger,
"fetch_block_by_id num ${n}, connection ${cid}",
1114 (
"n", b->block_num())(
"cid", c->connection_id) );
1115 my_impl->
dispatcher->add_peer_block( blkid, c->connection_id );
1116 c->strand.post( [c, b{std::move(b)}]() {
1117 c->enqueue_block( b );
1120 fc_ilog(
logger,
"fetch block by id returned null, id ${id}, connection ${cid}",
1121 (
"id", blkid)(
"cid", c->connection_id) );
1123 }
catch(
const assert_exception& ex ) {
1124 fc_elog(
logger,
"caught assert on fetch_block_by_id, ${ex}, id ${id}, connection ${cid}",
1125 (
"ex", ex.to_string())(
"id", blkid)(
"cid", c->connection_id) );
1127 fc_elog(
logger,
"caught other exception fetching block id ${id}, connection ${cid}",
1128 (
"id", blkid)(
"cid", c->connection_id) );
1138 strand.post( [c = shared_from_this()]() {
1139 std::unique_lock<std::mutex> g_conn( c->conn_mtx );
1140 if( c->populate_handshake( c->last_handshake_sent ) ) {
1141 static_assert( std::is_same_v<
decltype( c->sent_handshake_count ),
int16_t>,
"INT16_MAX based on int16_t" );
1142 if( c->sent_handshake_count ==
INT16_MAX ) c->sent_handshake_count = 1;
1143 c->last_handshake_sent.generation = ++c->sent_handshake_count;
1146 peer_ilog( c,
"Sending handshake generation ${g}, lib ${lib}, head ${head}, id ${id}",
1161 peer_wlog(
this,
"heartbeat timed out for peer address");
1169 const tstamp timeout = std::max(
hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).
count());
1201 std::function<
void(boost::system::error_code, std::size_t)> callback,
1202 bool to_sync_queue) {
1217 std::vector<boost::asio::const_buffer> bufs;
1220 strand.post( [c{std::move(c)}, bufs{std::move(bufs)}]() {
1221 boost::asio::async_write( *c->socket, bufs,
1222 boost::asio::bind_executor( c->strand, [c,
socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
1224 c->buffer_queue.clear_out_queue();
1226 if( !c->socket_is_open() || socket != c->socket ) {
1227 peer_ilog( c,
"async write socket ${r} before callback", (
"r", c->socket_is_open() ?
"changed" :
"closed") );
1233 if( ec.value() != boost::asio::error::eof ) {
1234 peer_elog( c,
"Error sending to peer: ${i}", (
"i", ec.message() ) );
1236 peer_wlog( c,
"connection closure detected on write" );
1242 c->buffer_queue.out_callback( ec, w );
1244 c->enqueue_sync_block();
1245 c->do_queue_write();
1246 }
catch (
const std::bad_alloc& ) {
1248 }
catch (
const boost::interprocess::bad_alloc& ) {
1252 }
catch(
const std::exception& ex ) {
1253 peer_elog( c,
"std::exception in do_queue_write: ${s}", (
"s", ex.
what()) );
1255 peer_elog( c,
"Unknown exception in do_queue_write" );
1263 peer_dlog(
this,
"cancel sync reason = ${m}, write queue size ${o} bytes",
1275 peer_ilog(
this,
"sending empty request but not calling sync wait");
1281 bool connection::enqueue_sync_block() {
1282 if( !peer_requested ) {
1285 peer_dlog(
this,
"enqueue sync block ${num}", (
"num", peer_requested->last + 1) );
1287 uint32_t num = ++peer_requested->last;
1288 if(num == peer_requested->end_block) {
1289 peer_requested.reset();
1290 peer_dlog(
this,
"completing enqueue_sync_block ${num}", (
"num", num) );
1302 c->strand.post( [c, sb{std::move(sb)}]() {
1303 c->enqueue_block( sb,
true );
1306 c->strand.post( [c, num]() {
1307 peer_ilog( c,
"enqueue sync, unable to fetch block ${num}, sending benign_other go away", (
"num", num) );
1308 c->peer_requested.reset();
1326 if( !send_buffer ) {
1327 send_buffer = create_send_buffer( m );
1339 const char*
const header =
reinterpret_cast<const char* const
>(&payload_size);
1342 auto send_buffer = std::make_shared<vector<char>>(buffer_size);
1350 template<
typename T>
1356 const char*
const header =
reinterpret_cast<const char* const
>(&payload_size);
1359 auto send_buffer = std::make_shared<vector<char>>( buffer_size );
1374 if( !send_buffer ) {
1375 send_buffer = create_send_buffer( sb );
1382 static std::shared_ptr<std::vector<char>> create_send_buffer(
const signed_block_ptr& sb ) {
1386 fc_dlog(
logger,
"sending block ${bn}", (
"bn", sb->block_num()) );
1387 return buffer_factory::create_send_buffer( signed_block_which, *sb );
1395 if( !send_buffer ) {
1396 send_buffer = create_send_buffer( trx );
1407 return buffer_factory::create_send_buffer( packed_transaction_which, *trx );
1417 if (std::holds_alternative<go_away_message>(m)) {
1418 close_after_send = std::get<go_away_message>(m).reason;
1423 enqueue_buffer( send_buffer, close_after_send );
1428 peer_dlog(
this,
"enqueue block ${num}", (
"num", b->block_num()) );
1433 latest_blk_time = get_time();
1434 enqueue_buffer( sb,
no_reason, to_sync_queue);
1438 void connection::enqueue_buffer(
const std::shared_ptr<std::vector<char>>& send_buffer,
1443 queue_write(send_buffer,
1444 [conn{std::move(
self)}, close_after_send](boost::system::error_code ec, std::size_t ) {
1447 fc_ilog(
logger,
"sent a go away message: ${r}, closing connection ${cid}",
1448 (
"r",
reason_str(close_after_send))(
"cid", conn->connection_id) );
1457 void connection::cancel_wait() {
1458 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1459 response_expected_timer.cancel();
1463 void connection::sync_wait() {
1465 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1467 response_expected_timer.async_wait(
1468 boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) {
1469 c->sync_timeout( ec );
1474 void connection::fetch_wait() {
1476 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1478 response_expected_timer.async_wait(
1479 boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) {
1480 c->fetch_timeout(ec);
1485 void connection::sync_timeout( boost::system::error_code ec ) {
1489 }
else if( ec != boost::asio::error::operation_aborted ) {
1490 peer_elog(
this,
"setting timer for sync request got error ${ec}", (
"ec", ec.message()) );
1495 void connection::fetch_timeout( boost::system::error_code ec ) {
1497 my_impl->
dispatcher->retry_fetch( shared_from_this() );
1498 }
else if( ec != boost::asio::error::operation_aborted ) {
1499 peer_elog(
this,
"setting timer for fetch request got error ${ec}", (
"ec", ec.message() ) );
1511 void block_status_monitor::reset() {
1512 in_accepted_state_ =
true;
1516 void block_status_monitor::rejected() {
1520 if(!in_accepted_state_) {
1521 const auto elapsed = now - window_start_;
1522 if( elapsed < window_size_ ) {
1526 window_start_ = now;
1531 in_accepted_state_ =
false;
1532 window_start_ = now;
1538 :sync_known_lib_num( 0 )
1539 ,sync_last_requested_num( 0 )
1540 ,sync_next_expected_num( 1 )
1541 ,sync_req_span( req_span )
1543 ,sync_state(in_sync)
1547 constexpr auto sync_manager::stage_str(stages
s) {
1549 case in_sync :
return "in sync";
1550 case lib_catchup:
return "lib catchup";
1551 case head_catchup :
return "head catchup";
1552 default :
return "unkown";
1556 bool sync_manager::set_state(stages newstate) {
1557 if( sync_state == newstate ) {
1560 fc_ilog(
logger,
"old state ${os} becoming ${ns}", (
"os", stage_str( sync_state ))(
"ns", stage_str( newstate ) ) );
1561 sync_state = newstate;
1567 std::unique_lock<std::mutex> g( sync_mtx );
1568 if( sync_state == in_sync ) {
1569 sync_source.reset();
1573 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1574 if( c->last_handshake_recv.last_irreversible_block_num > sync_known_lib_num ) {
1575 sync_known_lib_num = c->last_handshake_recv.last_irreversible_block_num;
1582 std::lock_guard<std::mutex> g_conn( cc->conn_mtx );
1583 if( cc->current() && cc->last_handshake_recv.last_irreversible_block_num > highest_lib_num ) {
1584 highest_lib_num = cc->last_handshake_recv.last_irreversible_block_num;
1588 sync_known_lib_num = highest_lib_num;
1591 if( c == sync_source ) {
1595 std::tie( lib_num, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = my_impl->
get_chain_info();
1596 sync_next_expected_num = lib_num + 1;
1597 request_next_chunk( std::move(g) );
1603 void sync_manager::request_next_chunk( std::unique_lock<std::mutex> g_sync,
const connection_ptr& conn ) {
1606 std::tie( lib_block_num, std::ignore, fork_head_block_num,
1607 std::ignore, std::ignore, std::ignore ) = my_impl->
get_chain_info();
1609 fc_dlog(
logger,
"sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}",
1610 (
"r", sync_last_requested_num)(
"e", sync_next_expected_num)(
"k", sync_known_lib_num)(
"s", sync_req_span) );
1612 if( fork_head_block_num < sync_last_requested_num && sync_source && sync_source->current() ) {
1613 fc_ilog(
logger,
"ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}",
1614 (
"h", fork_head_block_num)(
"r", sync_last_requested_num)(
"e", sync_next_expected_num)
1615 (
"k", sync_known_lib_num)(
"s", sync_req_span)(
"c", sync_source->connection_id) );
1626 if (conn && conn->current() ) {
1627 new_sync_source = conn;
1631 new_sync_source.reset();
1633 if (!new_sync_source) {
1641 if (new_sync_source) {
1643 cptr = my_impl->
connections.find( new_sync_source );
1647 new_sync_source.reset();
1659 auto cstart_it = cptr;
1662 if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) {
1663 std::lock_guard<std::mutex> g_conn( (*cptr)->conn_mtx );
1664 if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) {
1665 new_sync_source = *cptr;
1671 }
while( cptr != cstart_it );
1678 if( !new_sync_source || !new_sync_source->current() || new_sync_source->is_transactions_only_connection() ) {
1679 fc_elog(
logger,
"Unable to continue syncing at this time");
1680 if( !new_sync_source ) sync_source.reset();
1681 sync_known_lib_num = lib_block_num;
1683 set_state( in_sync );
1687 bool request_sent =
false;
1688 if( sync_last_requested_num != sync_known_lib_num ) {
1691 if( end > sync_known_lib_num )
1692 end = sync_known_lib_num;
1693 if( end > 0 && end >= start ) {
1694 sync_last_requested_num = end;
1695 sync_source = new_sync_source;
1697 request_sent =
true;
1698 new_sync_source->strand.post( [new_sync_source, start, end]() {
1699 peer_ilog( new_sync_source,
"requesting range ${s} to ${e}", (
"s", start)(
"e", end) );
1700 new_sync_source->request_sync_blocks( start, end );
1704 if( !request_sent ) {
1713 if( ci->current() ) {
1714 ci->send_handshake();
1720 bool sync_manager::is_sync_required(
uint32_t fork_head_block_num ) {
1721 fc_dlog(
logger,
"last req = ${req}, last recv = ${recv} known = ${known} our head = ${head}",
1722 (
"req", sync_last_requested_num)(
"recv", sync_next_expected_num )(
"known", sync_known_lib_num )
1723 (
"head", fork_head_block_num ) );
1725 return( sync_last_requested_num < sync_known_lib_num ||
1726 fork_head_block_num < sync_last_requested_num );
1731 std::unique_lock<std::mutex> g_sync( sync_mtx );
1732 if( target > sync_known_lib_num) {
1733 sync_known_lib_num = target;
1738 std::tie( lib_num, std::ignore, fork_head_block_num,
1739 std::ignore, std::ignore, std::ignore ) = my_impl->
get_chain_info();
1741 if( !is_sync_required( fork_head_block_num ) || target <= lib_num ) {
1742 peer_dlog( c,
"We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
1743 (
"b", lib_num)(
"h", fork_head_block_num )(
"t", target ) );
1744 c->send_handshake();
1748 if( sync_state == in_sync ) {
1749 set_state( lib_catchup );
1751 sync_next_expected_num = std::max( lib_num + 1, sync_next_expected_num );
1754 peer_ilog( c,
"Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}",
1755 (
"cc", sync_last_requested_num)(
"t", target)(
"n", sync_next_expected_num) );
1757 request_next_chunk( std::move( g_sync ), c );
1762 std::unique_lock<std::mutex> g( sync_mtx );
1763 peer_ilog( c,
"reassign_fetch, our last req is ${cc}, next expected is ${ne}",
1764 (
"cc", sync_last_requested_num)(
"ne", sync_next_expected_num) );
1766 if( c == sync_source ) {
1767 c->cancel_sync(reason);
1769 request_next_chunk( std::move(g) );
1776 if( c->is_transactions_only_connection() )
return;
1782 std::tie( lib_num, std::ignore, head,
1787 auto current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
1788 int64_t network_latency_ns = current_time_ns - msg.
time;
1789 if( network_latency_ns < 0 ) {
1790 peer_wlog(c,
"Peer sent a handshake with a timestamp skewed by at least ${t}ms", (
"t", network_latency_ns/1000000));
1791 network_latency_ns = 0;
1794 uint32_t nblk_behind_by_net_latency =
static_cast<uint32_t>(network_latency_ns / block_interval_ns);
1796 uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency + 1;
1798 peer_dlog(c,
"Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
1799 (
"lat", network_latency_ns/1000000)(
"num", nblk_behind_by_net_latency)(
"tot_num", nblk_combined_latency));
1816 peer_ilog( c,
"handshake lib ${lib}, head ${head}, head id ${id}.. sync 0",
1826 if (head < peer_lib) {
1827 peer_ilog( c,
"handshake lib ${lib}, head ${head}, head id ${id}.. sync 1",
1830 if (c->sent_handshake_count > 0) {
1831 c->send_handshake();
1835 if (lib_num > msg.
head_num + nblk_combined_latency) {
1836 peer_ilog( c,
"handshake lib ${lib}, head ${head}, head id ${id}.. sync 2",
1850 if (head + nblk_combined_latency < msg.
head_num ) {
1851 peer_ilog( c,
"handshake lib ${lib}, head ${head}, head id ${id}.. sync 3",
1856 }
else if(head >= msg.
head_num + nblk_combined_latency) {
1857 peer_ilog( c,
"handshake lib ${lib}, head ${head}, head id ${id}.. sync 4",
1870 bool on_fork = true;
1872 controller& cc = chain_plug->chain();
1873 on_fork = cc.get_block_id_for_num( msg_head_num ) != msg_head_id;
1876 c->strand.post( [c]() {
1877 request_message req;
1878 req.req_blocks.mode = catch_up;
1879 req.req_trx.mode = none;
1886 peer_dlog( c,
"Block discrepancy is within network latency range.");
1895 std::lock_guard<std::mutex> g_conn( cc->conn_mtx );
1896 if( cc->fork_head_num > num || cc->fork_head ==
id ) {
1904 std::lock_guard<std::mutex> g( sync_mtx );
1905 peer_ilog( c,
"catch_up while in ${s}, fork head num = ${fhn} "
1906 "target LIB = ${lib} next_expected = ${ne}, id ${id}...",
1907 (
"s", stage_str( sync_state ))(
"fhn", num)(
"lib", sync_known_lib_num)
1908 (
"ne", sync_next_expected_num)(
"id",
id.
str().substr( 8, 16 )) );
1912 std::tie( lib, std::ignore, std::ignore,
1914 if( sync_state == lib_catchup || num < lib )
1916 set_state( head_catchup );
1918 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1920 c->fork_head_num =
num;
1925 peer_ilog( c,
"none notice while in ${s}, fork head num = ${fhn}, id ${id}...",
1926 (
"s", stage_str( sync_state ))(
"fhn", num)(
"id",
id.
str().substr(8,16)) );
1927 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1929 c->fork_head_num = 0;
1940 "sync_recv_notice only called on catch_up" );
1943 peer_elog( c,
"got a catch up with ids size = 0" );
1946 peer_ilog( c,
"notice_message, pending ${p}, blk_num ${n}, id ${id}...",
1948 if( !my_impl->
dispatcher->have_block(
id ) ) {
1952 c->send_handshake();
1957 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1958 c->last_handshake_recv.last_irreversible_block_num = msg.
known_trx.
pending;
1967 c->block_status_monitor_.rejected();
1968 std::unique_lock<std::mutex> g( sync_mtx );
1970 if( c->block_status_monitor_.max_events_violated()) {
1971 peer_wlog( c,
"block ${bn} not accepted, closing connection", (
"bn", blk_num) );
1972 sync_source.reset();
1977 c->send_handshake();
1983 std::unique_lock<std::mutex> g_sync( sync_mtx );
1984 if( blk_num <= sync_last_requested_num ) {
1985 peer_dlog( c,
"sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}",
1986 (
"r", sync_last_requested_num)(
"e", sync_next_expected_num)(
"k", sync_known_lib_num)(
"s", sync_req_span) );
1987 if (blk_num != sync_next_expected_num && !blk_applied) {
1988 auto sync_next_expected = sync_next_expected_num;
1990 peer_dlog( c,
"expected block ${ne} but got ${bn}", (
"ne", sync_next_expected)(
"bn", blk_num) );
1993 sync_next_expected_num = blk_num + 1;
1999 peer_dlog( c,
"got block ${bn}", (
"bn", blk_num) );
2000 if(
app().is_quiting() ) {
2001 c->close(
false,
true );
2004 c->block_status_monitor_.accepted();
2006 std::unique_lock<std::mutex> g_sync( sync_mtx );
2007 stages
state = sync_state;
2009 if(
state == head_catchup ) {
2010 peer_dlog( c,
"sync_manager in head_catchup state" );
2011 sync_source.reset();
2015 bool set_state_to_head_catchup =
false;
2017 std::unique_lock<std::mutex> g_cp_conn( cp->conn_mtx );
2018 uint32_t fork_head_num = cp->fork_head_num;
2021 if( fork_head_id == null_id ) {
2023 }
else if( fork_head_num < blk_num || fork_head_id == blk_id ) {
2024 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
2025 c->fork_head = null_id;
2026 c->fork_head_num = 0;
2028 set_state_to_head_catchup =
true;
2033 if( set_state_to_head_catchup ) {
2034 if( set_state( head_catchup ) ) {
2038 set_state( in_sync );
2041 }
else if(
state == lib_catchup ) {
2042 if( blk_num >= sync_known_lib_num ) {
2043 peer_dlog( c,
"All caught up with last known last irreversible block resending handshake" );
2044 set_state( in_sync );
2047 }
else if( blk_num >= sync_last_requested_num ) {
2048 request_next_chunk( std::move( g_sync) );
2061 std::lock_guard<std::mutex> g( blk_state_mtx );
2062 auto bptr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
2063 bool added = (bptr == blk_state.end());
2066 }
else if( !bptr->have_block ) {
2067 blk_state.modify( bptr, [](
auto& pb ) {
2068 pb.have_block =
true;
2075 std::lock_guard<std::mutex> g(blk_state_mtx);
2076 const auto blk_itr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
2077 return blk_itr != blk_state.end();
2081 std::lock_guard<std::mutex> g(blk_state_mtx);
2083 const auto& index = blk_state.get<by_peer_block_id>();
2084 auto blk_itr = index.find( blkid );
2085 if( blk_itr != index.end() ) {
2086 return blk_itr->have_block;
2093 std::lock_guard<std::mutex> g( local_txns_mtx );
2094 auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref(
id ), connection_id ) );
2095 bool added = (tptr == local_txns.end());
2099 expires = std::min( trx_expires, expires );
2103 .connection_id = connection_id} );
2109 std::lock_guard<std::mutex> g( local_txns_mtx );
2110 const auto tptr = local_txns.get<by_id>().find( tid );
2111 return tptr != local_txns.end();
2115 size_t start_size = 0, end_size = 0;
2117 std::unique_lock<std::mutex> g( local_txns_mtx );
2118 start_size = local_txns.size();
2119 auto& old = local_txns.get<by_expiry>();
2122 old.erase( ex_lo, ex_up );
2125 fc_dlog(
logger,
"expire_local_txns size ${s} removed ${r}", (
"s", start_size)(
"r", start_size - end_size ) );
2129 std::lock_guard<std::mutex> g(blk_state_mtx);
2130 auto& stale_blk = blk_state.get<by_block_num>();
2131 stale_blk.erase( stale_blk.lower_bound(1), stale_blk.upper_bound(lib_num) );
2136 fc_dlog(
logger,
"bcast block ${b}", (
"b", b->block_num()) );
2138 if( my_impl->
sync_master->syncing_with_peer() )
return;
2141 const auto bnum = b->block_num();
2143 fc_dlog(
logger,
"socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}",
2144 (
"s", cp->socket_is_open())(
"c", cp->connecting.load())(
"ss", cp->syncing.load())(
"cid", cp->connection_id) );
2145 if( !cp->current() )
return true;
2148 cp->strand.post( [
this, cp,
id, bnum, sb{std::move(sb)}]() {
2149 cp->latest_blk_time = cp->get_time();
2150 std::unique_lock<std::mutex> g_conn( cp->conn_mtx );
2151 bool has_block = cp->last_handshake_recv.last_irreversible_block_num >= bnum;
2155 peer_dlog( cp,
"not bcast block ${b}", (
"b", bnum) );
2158 peer_dlog( cp,
"bcast block ${b}", (
"b", bnum) );
2168 std::unique_lock<std::mutex> g( c->conn_mtx );
2171 c->last_req->req_blocks.mode !=
none &&
2172 !c->last_req->req_blocks.ids.empty() &&
2173 c->last_req->req_blocks.ids.back() ==
id) {
2175 c->last_req.reset();
2191 if( cp->is_blocks_only_connection() || !cp->current() ) {
2194 if( !
add_peer_txn(trx->id(), trx->expiration(), cp->connection_id, now) ) {
2199 fc_dlog(
logger,
"sending trx: ${id}, to connection ${cid}", (
"id", trx->id())(
"cid", cp->connection_id) );
2200 cp->strand.post( [cp, sb{std::move(sb)}]() {
2208 fc_dlog(
logger,
"not sending rejected transaction ${tid}", (
"tid", trx->id()) );
2216 peer_elog( c,
"passed a notice_message with something other than a normal on none known_trx" );
2227 peer_elog( c,
"passed a notice_message with something other than a normal on none known_blocks" );
2238 std::lock_guard<std::mutex> g_c_conn( c->conn_mtx );
2239 if( !c->last_req ) {
2242 peer_wlog( c,
"failed to fetch from peer" );
2243 if( c->last_req->req_blocks.mode ==
normal && !c->last_req->req_blocks.ids.empty() ) {
2244 bid = c->last_req->req_blocks.ids.back();
2246 peer_wlog( c,
"no retry, block mpde = ${b} trx mode = ${t}",
2247 (
"b",
modes_str( c->last_req->req_blocks.mode ))(
"t",
modes_str( c->last_req->req_trx.mode ) ) );
2250 last_req = *c->last_req;
2257 std::lock_guard<std::mutex> guard( conn->conn_mtx );
2258 if( conn->last_req ) {
2265 conn->strand.post( [conn, last_req{std::move(last_req)}]() {
2266 conn->enqueue( last_req );
2268 std::lock_guard<std::mutex> g_conn_conn( conn->conn_mtx );
2269 conn->last_req = last_req;
2278 if( c->connected() ) {
2279 c->enqueue( last_req );
2300 if (colon == std::string::npos || colon == 0) {
2308 auto connector_period_us = std::chrono::duration_cast<std::chrono::microseconds>( my_impl->
connector_period );
2309 std::lock_guard<std::mutex> g( c->conn_mtx );
2316 string::size_type colon = c->peer_address().find(
':');
2317 string::size_type colon2 = c->peer_address().find(
':', colon + 1);
2318 string host = c->peer_address().substr( 0, colon );
2319 string port = c->peer_address().substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1));
2320 c->set_connection_type( c->peer_address() );
2322 auto resolver = std::make_shared<tcp::resolver>( my_impl->
thread_pool->get_executor() );
2325 resolver->async_resolve( tcp::v4(), host, port, boost::asio::bind_executor( c->strand,
2326 [resolver, weak_conn, host, port](
const boost::system::error_code& err, tcp::resolver::results_type endpoints ) {
2327 auto c = weak_conn.lock();
2330 c->connect( resolver, endpoints );
2332 fc_elog( logger,
"Unable to resolve ${host}:${port} ${error}",
2333 (
"host", host)(
"port", port)(
"error", err.message() ) );
2334 c->connecting = false;
2335 ++c->consecutive_immediate_connection_close;
2343 void connection::connect(
const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints ) {
2347 boost::asio::async_connect( *
socket, endpoints,
2348 boost::asio::bind_executor(
strand,
2349 [resolver, c = shared_from_this(),
socket=
socket](
const boost::system::error_code& err,
const tcp::endpoint& endpoint ) {
2350 if( !err &&
socket->is_open() &&
socket == c->socket ) {
2351 if( c->start_session() ) {
2352 c->send_handshake();
2355 fc_elog(
logger,
"connection failed to ${host}:${port} ${error}",
2356 (
"host", endpoint.address().to_string())(
"port", endpoint.port())(
"error", err.message()));
2364 new_connection->connecting =
true;
2365 new_connection->strand.post( [
this, new_connection = std::move( new_connection )](){
2366 acceptor->async_accept( *new_connection->socket,
2367 boost::asio::bind_executor( new_connection->strand, [new_connection, socket=new_connection->socket,
this]( boost::system::error_code ec ) {
2369 uint32_t visitors = 0;
2370 uint32_t from_addr = 0;
2371 boost::system::error_code rec;
2372 const auto& paddr_add = socket->remote_endpoint( rec ).address();
2375 fc_elog( logger,
"Error getting remote endpoint: ${m}", (
"m", rec.message()));
2377 paddr_str = paddr_add.to_string();
2378 for_each_connection( [&visitors, &from_addr, &paddr_str]( auto& conn ) {
2379 if( conn->socket_is_open()) {
2380 if( conn->peer_address().empty()) {
2382 std::lock_guard<std::mutex> g_conn( conn->conn_mtx );
2383 if( paddr_str == conn->remote_endpoint_ip ) {
2390 if( from_addr < max_nodes_per_host && (max_client_count == 0 || visitors < max_client_count)) {
2391 fc_ilog( logger,
"Accepted new connection: " + paddr_str );
2392 new_connection->set_heartbeat_timeout( heartbeat_timeout );
2393 if( new_connection->start_session()) {
2394 std::lock_guard<std::shared_mutex> g_unique( connections_mtx );
2395 connections.insert( new_connection );
2399 if( from_addr >= max_nodes_per_host ) {
2400 fc_dlog( logger,
"Number of connections (${n}) from ${ra} exceeds limit ${l}",
2401 (
"n", from_addr + 1)(
"ra", paddr_str )(
"l", max_nodes_per_host ));
2403 fc_dlog( logger,
"max_client_count ${m} exceeded", (
"m", max_client_count));
2406 boost::system::error_code ec;
2407 socket->shutdown( tcp::socket::shutdown_both, ec );
2408 socket->close( ec );
2412 fc_elog( logger,
"Error accepting connection: ${m}", (
"m", ec.message()));
2414 switch (ec.value()) {
2434 std::size_t minimum_read =
2439 const size_t max_socket_read_watermark = 4096;
2440 std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);
2441 boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);
2442 boost::system::error_code ec;
2443 socket->set_option( read_watermark_opt, ec );
2445 peer_elog(
this,
"unable to set read watermark: ${e1}", (
"e1", ec.message()) );
2449 auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {
2450 if (ec || bytes_transferred >= minimum_read ) {
2453 return minimum_read - bytes_transferred;
2459 peer_elog(
this,
"write queue full ${s} bytes, giving up on connection, closing", (
"s", write_queue_size) );
2464 boost::asio::async_read( *
socket,
2466 boost::asio::bind_executor(
strand,
2467 [conn = shared_from_this(),
socket=
socket]( boost::system::error_code ec, std::size_t bytes_transferred ) {
2469 if( !conn->socket_is_open() ||
socket != conn->socket )
return;
2471 bool close_connection =
false;
2474 if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
2475 peer_elog( conn,
"async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",
2476 (
"bt",bytes_transferred)(
"btw",conn->pending_message_buffer.bytes_to_write()) );
2479 conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
2480 while (conn->pending_message_buffer.bytes_to_read() > 0) {
2481 uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
2488 auto index = conn->pending_message_buffer.read_index();
2489 conn->pending_message_buffer.peek(&message_length,
sizeof(message_length), index);
2491 peer_elog( conn,
"incoming message length unexpected (${i})", (
"i", message_length) );
2492 close_connection =
true;
2498 if (bytes_in_buffer >= total_message_bytes) {
2500 conn->consecutive_immediate_connection_close = 0;
2501 if (!conn->process_next_message(message_length)) {
2505 auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;
2506 auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();
2507 if (outstanding_message_bytes > available_buffer_bytes) {
2508 conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );
2511 conn->outstanding_read_bytes = outstanding_message_bytes;
2516 if( !close_connection ) conn->start_read_message();
2518 if (ec.value() != boost::asio::error::eof) {
2519 peer_elog( conn,
"Error reading message: ${m}", (
"m", ec.message() ) );
2521 peer_ilog( conn,
"Peer closed connection" );
2523 close_connection =
true;
2526 catch (
const std::bad_alloc& )
2530 catch (
const boost::interprocess::bad_alloc& )
2537 close_connection =
true;
2539 catch(
const std::exception &ex) {
2540 peer_elog( conn,
"Exception in handling read data: ${s}", (
"s",ex.
what()) );
2541 close_connection =
true;
2544 peer_elog( conn,
"Undefined exception handling read data" );
2545 close_connection =
true;
2548 if( close_connection ) {
2549 peer_elog( conn,
"Closing connection" );
2554 peer_elog(
this,
"Undefined exception in start_read_message, closing connection" );
2570 return process_next_block_message( message_length );
2573 return process_next_trx_message( message_length );
2580 std::visit( m, msg );
2592 bool connection::process_next_block_message(
uint32_t message_length) {
2600 const uint32_t blk_num = bh.block_num();
2601 if( my_impl->
dispatcher->have_block( blk_id ) ) {
2602 peer_dlog(
this,
"canceling wait, already received block ${num}, id ${id}...",
2603 (
"num", blk_num)(
"id", blk_id.
str().substr(8,16)) );
2604 my_impl->
sync_master->sync_recv_block( shared_from_this(), blk_id, blk_num,
false );
2610 peer_dlog(
this,
"received block ${num}, id ${id}..., latency: ${latency}",
2611 (
"num",
bh.block_num())(
"id", blk_id.
str().substr(8,16))
2613 if( !my_impl->
sync_master->syncing_with_peer() ) {
2615 std::tie( lib, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = my_impl->
get_chain_info();
2616 if( blk_num < lib ) {
2617 std::unique_lock<std::mutex> g(
conn_mtx );
2620 peer_ilog(
this,
"received block ${n} less than ${which}lib ${lib}",
2621 (
"n", blk_num)(
"which", blk_num < last_sent_lib ?
"sent " :
"")
2622 (
"lib", blk_num < last_sent_lib ? last_sent_lib : lib) );
2635 shared_ptr<signed_block> ptr = std::make_shared<signed_block>();
2641 bool has_webauthn_sig = is_webauthn_sig( ptr->producer_signature );
2644 auto exts = ptr->validate_and_extract_extensions();
2645 if( exts.count( additional_sigs_eid ) ) {
2646 const auto &additional_sigs = std::get<additional_block_signatures_extension>(exts.lower_bound( additional_sigs_eid )->second).signatures;
2647 has_webauthn_sig |= std::any_of( additional_sigs.begin(), additional_sigs.end(), is_webauthn_sig );
2650 if( has_webauthn_sig ) {
2651 peer_dlog(
this,
"WebAuthn signed block received, closing connection" );
2661 bool connection::process_next_trx_message(
uint32_t message_length) {
2663 peer_dlog(
this,
"p2p-accept-transaction=false - dropping txn" );
2673 shared_ptr<packed_transaction> ptr = std::make_shared<packed_transaction>();
2677 snprintf(reason, 72,
"Dropping trx, too many trx in progress %lu bytes", trx_in_progress_sz);
2685 bool have_trx = my_impl->
dispatcher->have_txn( ptr->id() );
2689 peer_dlog(
this,
"got a duplicate transaction - dropping" );
2700 std::lock_guard<std::mutex> g( chain_info_mtx );
2707 fc_dlog(
logger,
"updating chain info lib ${lib}, head ${head}, fork ${fork}",
2708 (
"lib", chain_lib_num)(
"head", chain_head_blk_num)(
"fork", chain_fork_head_blk_num) );
2712 std::tuple<uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type>
2714 std::lock_guard<std::mutex> g( chain_info_mtx );
2715 return std::make_tuple(
2716 chain_lib_num, chain_head_blk_num, chain_fork_head_blk_num,
2717 chain_lib_id, chain_head_blk_id, chain_fork_head_blk_id );
2726 peer_wlog(
this,
"Handshake message validation: last irreversible block (${i}) is greater than head block (${h})",
2731 peer_wlog(
this,
"Handshake message validation: p2p_address is null string" );
2735 peer_wlog(
this,
"Handshake message validation: p2p_address too large: ${p}",
2739 if (msg.
os.empty()) {
2740 peer_wlog(
this,
"Handshake message validation: os field is null string" );
2743 peer_wlog(
this,
"Handshake message validation: os field too large: ${p}",
2748 peer_wlog(
this,
"Handshake message validation: agent field too large: ${p}",
2753 peer_wlog(
this,
"Handshake message validation: token field invalid" );
2760 peer_dlog(
this,
"received chain_size_message");
2764 peer_dlog(
this,
"received handshake_message" );
2766 peer_elog(
this,
"bad handshake message");
2771 peer_dlog(
this,
"received handshake gen ${g}, lib ${lib}, head ${head}",
2774 std::unique_lock<std::mutex> g_conn(
conn_mtx );
2781 peer_elog(
this,
"Self connection detected node_id ${id}. Closing connection", (
"id", msg.
node_id) );
2792 std::unique_lock<std::mutex> g_conn(
conn_mtx );
2796 peer_dlog(
this,
"checking for duplicate" );
2797 std::shared_lock<std::shared_mutex> g_cnts( my_impl->
connections_mtx );
2799 if(check.get() ==
this)
2801 std::unique_lock<std::mutex> g_check_conn( check->conn_mtx );
2803 (
"c", check->connected())(
"l", check->last_handshake_recv.node_id)(
"r", msg.
node_id) );
2804 if(check->connected() && check->last_handshake_recv.node_id == msg.
node_id) {
2811 auto check_time = check->last_handshake_sent.time + check->last_handshake_recv.time;
2812 g_check_conn.unlock();
2813 if (msg.
time + c_time <= check_time)
2817 fc_dlog(
logger,
"my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'",
2824 fc_dlog(
logger,
"not duplicate, my_impl->node_id '${lhs}' < msg.node_id '${rhs}'",
2832 peer_dlog(
this,
"sending go_away duplicate, msg.p2p_address: ${add}", (
"add", msg.
p2p_address) );
2841 peer_dlog(
this,
"skipping duplicate check, addr == ${pa}, id = ${ni}",
2847 peer_elog(
this,
"Peer on a different chain. Closing connection" );
2854 peer_ilog(
this,
"Local network version different: ${nv} Remote version: ${mnv}",
2864 peer_elog(
this,
"Peer not authenticated. Closing connection." );
2874 connection_ptr c = weak.lock();
2876 controller& cc = chain_plug->chain();
2877 uint32_t lib_num = cc.last_irreversible_block_num();
2879 fc_dlog( logger,
"handshake check for fork lib_num = ${ln}, peer_lib = ${pl}, connection ${cid}",
2880 (
"ln", lib_num)(
"pl", peer_lib)(
"cid", c->connection_id) );
2882 if( peer_lib <= lib_num && peer_lib > 0 ) {
2883 bool on_fork = false;
2885 block_id_type peer_lib_id = cc.get_block_id_for_num( peer_lib );
2886 on_fork = (msg_lib_id != peer_lib_id);
2887 } catch( const unknown_block_exception& ) {
2889 fc_dlog( logger,
"peer last irreversible block ${pl} is unknown, connection ${cid}",
2890 (
"pl", peer_lib)(
"cid", c->connection_id) );
2892 fc_wlog( logger,
"caught an exception getting block id for ${pl}, connection ${cid}",
2893 (
"pl", peer_lib)(
"cid", c->connection_id) );
2897 c->strand.post( [c]() {
2898 peer_elog( c,
"Peer chain is forked, sending: forked go away" );
2899 c->no_retry = go_away_reason::forked;
2900 c->enqueue( go_away_message( go_away_reason::forked ) );
2919 my_impl->
sync_master->recv_handshake( shared_from_this(), msg );
2934 if ( retry )
peer_dlog(
this,
"received benign_other reason, retrying to connect");
2945 peer_ilog(
this,
"received time_message" );
2963 if( msg.
org == 0 ) {
2968 double offset = (double(
rec -
org) + double(msg.
xmt -
dst)) / 2;
2969 double NsecPerUsec{1000};
2973 (
"o", offset)(
"us", offset / NsecPerUsec ) ) );
2977 std::unique_lock<std::mutex> g_conn(
conn_mtx );
2988 peer_dlog(
this,
"received notice_message" );
2991 peer_elog(
this,
"Invalid notice_message, known_blocks.ids.size ${s}, closing connection",
2999 peer_dlog(
this,
"this is a ${m} notice with ${n} pending blocks: ${num} ${id}...",
3008 std::unique_lock<std::mutex> g_conn(
conn_mtx );
3017 my_impl->
dispatcher->recv_notice( shared_from_this(), msg,
false );
3022 peer_dlog(
this,
"this is a ${m} notice with ${n} blocks",
3031 my_impl->
sync_master->sync_recv_notice( shared_from_this(), msg );
3035 my_impl->
dispatcher->recv_notice( shared_from_this(), msg,
false );
3039 peer_elog(
this,
"bad notice_message : invalid known_blocks.mode ${m}",
3047 peer_elog(
this,
"Invalid request_message, req_blocks.ids.size ${s}, closing",
3055 peer_dlog(
this,
"received request_message:catch_up" );
3059 peer_dlog(
this,
"received request_message:normal" );
3078 peer_elog(
this,
"Invalid request_message, req_trx.ids.size ${s}", (
"s", msg.
req_trx.
ids.size()) );
3090 peer_requested.reset();
3093 if (peer_requested) {
3097 peer_requested->end_block = std::max(msg.
end_block, peer_requested->end_block);
3107 return trx->get_estimated_size();
3111 const auto& tid = trx->id();
3112 peer_dlog(
this,
"received packed_transaction ${id}", (
"id", tid) );
3116 [weak = weak_from_this(), trx](
const std::variant<fc::exception_ptr, transaction_trace_ptr>& result)
mutable {
3118 if (std::holds_alternative<fc::exception_ptr>(result)) {
3119 fc_dlog(
logger,
"bad packed_transaction : ${m}", (
"m", std::get<fc::exception_ptr>(result)->what()) );
3122 if( !trace->except ) {
3123 fc_dlog(
logger,
"chain accepted transaction, bcast ${id}", (
"id", trace->id) );
3125 fc_elog(
logger,
"bad packed_transaction : ${m}", (
"m", trace->except->what()));
3137 peer_dlog(
this,
"received signed_block ${num}, id ${id}", (
"num", ptr->block_num())(
"id",
id) );
3145 my_impl->
sync_master->sync_recv_block( shared_from_this(),
id, ptr->block_num(),
false );
3152 peer_elog(
this,
"bad block exception: #${n} ${id}...: ${m}",
3153 (
"n", ptr->block_num())(
"id",
id.str().substr(8,16))(
"m",ex.
to_string()));
3156 peer_elog(
this,
"bad block: #${n} ${id}...: unknown exception",
3157 (
"n", ptr->block_num())(
"id",
id.str().substr(8,16)));
3160 my_impl->
sync_master->rejected_block( shared_from_this(), ptr->block_num() );
3165 bool signal_producer = !!bsp;
3167 c->process_signed_block(
id, std::move(ptr), std::move(bsp) );
3170 if( signal_producer )
3177 uint32_t blk_num = msg->block_num();
3182 if( !c->socket_is_open() )
3187 c->strand.post( [sync_master = my_impl->
sync_master.get(),
3188 dispatcher = my_impl->
dispatcher.get(), c, blk_id, blk_num]() {
3189 dispatcher->add_peer_block( blk_id, c->connection_id );
3190 sync_master->sync_recv_block( c, blk_id, blk_num, false );
3196 fc_elog(
logger,
"Caught an unknown exception trying to recall block ID" );
3200 fc_dlog(
logger,
"received signed_block: #${n} block age in secs = ${age}, connection ${cid}",
3201 (
"n", blk_num)(
"age", age.
to_seconds())(
"cid", c->connection_id) );
3207 if( !accepted )
return;
3209 }
catch(
const unlinkable_block_exception &ex) {
3210 fc_elog(
logger,
"unlinkable_block_exception connection ${cid}: #${n} ${id}...: ${m}",
3211 (
"cid", c->connection_id)(
"n", blk_num)(
"id", blk_id.
str().substr(8,16))(
"m",ex.to_string()));
3214 fc_elog(
logger,
"block_validate_exception connection ${cid}: #${n} ${id}...: ${m}",
3215 (
"cid", c->connection_id)(
"n", blk_num)(
"id", blk_id.
str().substr(8,16))(
"m",ex.to_string()));
3217 }
catch(
const assert_exception &ex ) {
3218 fc_elog(
logger,
"block assert_exception connection ${cid}: #${n} ${id}...: ${m}",
3219 (
"cid", c->connection_id)(
"n", blk_num)(
"id", blk_id.
str().substr(8,16))(
"m",ex.to_string()));
3221 fc_elog(
logger,
"bad block exception connection ${cid}: #${n} ${id}...: ${m}",
3222 (
"cid", c->connection_id)(
"n", blk_num)(
"id", blk_id.
str().substr(8,16))(
"m",ex.to_string()));
3224 fc_elog(
logger,
"bad block connection ${cid}: #${n} ${id}...: unknown exception",
3225 (
"cid", c->connection_id)(
"n", blk_num)(
"id", blk_id.
str().substr(8,16)));
3229 boost::asio::post( my_impl->
thread_pool->get_executor(), [dispatcher = my_impl->
dispatcher.get(), cid=c->connection_id, blk_id, msg]() {
3230 fc_dlog( logger,
"accepted signed_block : #${n} ${id}...", (
"n", msg->block_num())(
"id", blk_id.str().substr(8,16)) );
3231 dispatcher->add_peer_block( blk_id, cid );
3233 c->strand.post( [sync_master = my_impl->
sync_master.get(), dispatcher = my_impl->
dispatcher.get(), c, blk_id, blk_num]() {
3234 dispatcher->recv_block( c, blk_id, blk_num );
3235 sync_master->sync_recv_block( c, blk_id, blk_num, true );
3238 c->strand.post( [sync_master = my_impl->
sync_master.get(), dispatcher = my_impl->
dispatcher.get(), c, blk_id, blk_num]() {
3239 sync_master->rejected_block( c, blk_num );
3240 dispatcher->rejected_block( blk_id );
3251 connector_check_timer->async_wait( [my = shared_from_this(), from_connection](boost::system::error_code ec) {
3252 std::unique_lock<std::mutex> g( my->connector_check_timer_mtx );
3253 int num_in_flight = --my->connector_checks_in_flight;
3256 my->connection_monitor(from_connection, num_in_flight == 0 );
3258 if( num_in_flight == 0 ) {
3259 if( my->in_shutdown )
return;
3260 fc_elog(
logger,
"Error from connection check monitor: ${m}", (
"m", ec.message()));
3261 my->start_conn_timer( my->connector_period, std::weak_ptr<connection>() );
3272 expire_timer->async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
3276 if( my->in_shutdown )
return;
3277 fc_elog(
logger,
"Error from transaction check monitor: ${m}", (
"m", ec.message()) );
3278 my->start_expire_timer();
3288 keepalive_timer->async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
3291 if( my->in_shutdown )
return;
3292 fc_wlog(
logger,
"Peer keepalive ticked sooner than expected: ${m}", (
"m", ec.message()) );
3297 if( c->socket_is_open() ) {
3298 c->strand.post([c, current_time]() {
3299 c->check_heartbeat(current_time);
3323 std::tie( lib, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) =
get_chain_info();
3335 auto from = from_connection.lock();
3339 size_t num_rm = 0, num_clients = 0, num_peers = 0;
3345 fc_ilog(
logger,
"p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}",
3352 (*it)->peer_address().empty() ? ++num_clients : ++num_peers;
3353 if( !(*it)->socket_is_open() && !(*it)->connecting) {
3354 if( !(*it)->peer_address().empty() ) {
3355 if( !(*it)->resolve_and_connect() ) {
3357 --num_peers; ++num_rm;
3361 --num_clients; ++num_rm;
3369 if( num_clients > 0 || num_peers > 0 )
3370 fc_ilog(
logger,
"p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}",
3372 fc_dlog(
logger,
"connection monitor, removed ${n} connections", (
"n", num_rm) );
3382 fc_dlog(
logger,
"signaled accepted_block, blk num = ${num}, id = ${id}", (
"num", bs->block_num)(
"id", bs->id) );
3387 fc_add_tag( blk_span,
"block_num", bs->block_num );
3388 fc_add_tag( blk_span,
"block_time", bs->block->timestamp.to_time_point() );
3390 dispatcher->bcast_block( bs->block, bs->id );
3400 auto id = block->calculate_id();
3401 fc_dlog(
logger,
"signaled pre_accepted_block, blk num = ${num}, id = ${id}", (
"num", block->block_num())(
"id",
id) );
3406 fc_add_tag(blk_span,
"block_num", block->block_num());
3407 fc_add_tag(blk_span,
"block_time", block->timestamp.to_time_point());
3408 fc_add_tag(blk_span,
"producer", block->producer.to_string());
3417 fc_dlog(
logger,
"on_irreversible_block, blk num = ${num}, id = ${id}", (
"num", block->block_num)(
"id", block->id) );
3424 const auto&
id =
results.second->id();
3426 fc_dlog(
logger,
"signaled NACK, trx-id = ${id} : ${why}", (
"id",
id)(
"why",
results.first->to_detail_string() ) );
3429 std::tie( std::ignore, head_blk_num, std::ignore, std::ignore, std::ignore, std::ignore ) =
get_chain_info();
3432 fc_dlog(
logger,
"signaled ACK, trx-id = ${id}", (
"id",
id) );
3448 bool found_producer_key =
false;
3452 fc_elog(
logger,
"Peer ${peer} sent a handshake with an unauthorized key: ${key}.",
3460 if(hash != msg.
token) {
3468 catch (
const std::exception& ) {
3478 fc_dlog(
logger,
"Peer sent a handshake with blank signature and token, but this node accepts only authenticated connections." );
3497 return private_key_itr->second.sign(
digest);
3505 namespace sc = std::chrono;
3508 std::tie( lib, std::ignore, head,
3515 hello.
time = sc::duration_cast<sc::nanoseconds>(sc::system_clock::now().time_since_epoch()).count();
3525#if defined( __APPLE__ )
3527#elif defined( __linux__ )
3529#elif defined( _WIN32 )
3550 (
"p2p-listen-endpoint", bpo::value<string>()->default_value(
"0.0.0.0:9876" ),
"The actual host:port used to listen for incoming p2p connections.")
3551 (
"p2p-server-address", bpo::value<string>(),
"An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint.")
3552 (
"p2p-peer-address", bpo::value< vector<string> >()->composing(),
3553 "The public endpoint of a peer node to connect to. Use multiple p2p-peer-address options as needed to compose a network.\n"
3554 " Syntax: host:port[:<trx>|<blk>]\n"
3555 " The optional 'trx' and 'blk' indicates to node that only transactions 'trx' or blocks 'blk' should be sent."
3557 " p2p.eos.io:9876\n"
3558 " p2p.trx.eos.io:9876:trx\n"
3559 " p2p.blk.eos.io:9876:blk\n")
3560 (
"p2p-max-nodes-per-host", bpo::value<int>()->default_value(
def_max_nodes_per_host),
"Maximum number of client nodes from any single IP address")
3561 (
"p2p-accept-transactions", bpo::value<bool>()->default_value(
true),
"Allow transactions received over p2p network to be evaluated and relayed if valid.")
3562 (
"agent-name", bpo::value<string>()->default_value(
"SYS Test Agent"),
"The name supplied to identify this node amongst the peers.")
3563 (
"allowed-connection", bpo::value<
vector<string>>()->multitoken()->default_value({
"any"},
"any"),
"Can be 'any' or 'producers' or 'specified' or 'none'. If 'specified', peer-key must be specified at least once. If only 'producers', peer-key is not required. 'producers' and 'specified' may be combined.")
3564 (
"peer-key", bpo::value<
vector<string>>()->composing()->multitoken(),
"Optional public key of peer allowed to connect. May be used multiple times.")
3565 (
"peer-private-key", boost::program_options::value<vector<string>>()->composing()->multitoken(),
3566 "Tuple of [PublicKey, WIF private key] (may specify multiple times)")
3567 (
"max-clients", bpo::value<int>()->default_value(
def_max_clients),
"Maximum number of clients from which connections are accepted, use 0 for no limit")
3568 (
"connection-cleanup-period", bpo::value<int>()->default_value(
def_conn_retry_wait),
"number of seconds to wait before cleaning up dead connections")
3569 (
"max-cleanup-time-msec", bpo::value<int>()->default_value(10),
"max connection cleanup time per cleanup call in milliseconds")
3570 (
"p2p-dedup-cache-expire-time-sec", bpo::value<uint32_t>()->default_value(10),
"Maximum time to track transaction for duplicate optimization")
3571 (
"net-threads", bpo::value<uint16_t>()->default_value(my->thread_pool_size),
3572 "Number of worker threads in net_plugin thread pool" )
3573 (
"sync-fetch-span", bpo::value<uint32_t>()->default_value(
def_sync_fetch_span),
"number of blocks to retrieve in a chunk from any individual peer during synchronization")
3574 (
"use-socket-read-watermark", bpo::value<bool>()->default_value(
false),
"Enable experimental socket read watermark optimization")
3575 (
"peer-log-format", bpo::value<string>()->default_value(
"[\"${_name}\" - ${_cid} ${_ip}:${_port}] " ),
3576 "The string used to format peers when logging messages about them. Variables are escaped with ${<variable name>}.\n"
3577 "Available Variables:\n"
3578 " _name \tself-reported name\n\n"
3579 " _cid \tassigned connection id\n\n"
3580 " _id \tself-reported ID (64 hex characters)\n\n"
3581 " _sid \tfirst 8 characters of _peer.id\n\n"
3582 " _ip \tremote IP address of peer\n\n"
3583 " _port \tremote port number of peer\n\n"
3584 " _lip \tlocal IP address connected to peer\n\n"
3585 " _lport \tlocal port number connected to peer\n\n")
3586 (
"p2p-keepalive-interval-ms", bpo::value<int>()->default_value(
def_keepalive_interval),
"peer heartbeat keepalive message interval in milliseconds")
3591 template<
typename T>
3603 my->connector_period = std::chrono::seconds( options.at(
"connection-cleanup-period" ).as<
int>());
3604 my->max_cleanup_time_ms = options.at(
"max-cleanup-time-msec").as<
int>();
3606 my->p2p_dedup_cache_expire_time_us =
fc::seconds( options.at(
"p2p-dedup-cache-expire-time-sec" ).as<
uint32_t>() );
3608 my->max_client_count = options.at(
"max-clients" ).as<
int>();
3609 my->max_nodes_per_host = options.at(
"p2p-max-nodes-per-host" ).as<
int>();
3610 my->p2p_accept_transactions = options.at(
"p2p-accept-transactions" ).as<
bool>();
3612 my->use_socket_read_watermark = options.at(
"use-socket-read-watermark" ).as<
bool>();
3613 my->keepalive_interval = std::chrono::milliseconds( options.at(
"p2p-keepalive-interval-ms" ).as<
int>() );
3614 SYS_ASSERT( my->keepalive_interval.count() > 0, chain::plugin_config_exception,
3615 "p2p-keepalive_interval-ms must be greater than 0" );
3617 if( options.count(
"p2p-keepalive-interval-ms" )) {
3618 my->heartbeat_timeout = std::chrono::milliseconds( options.at(
"p2p-keepalive-interval-ms" ).as<
int>() * 2 );
3621 if( options.count(
"p2p-listen-endpoint" ) && options.at(
"p2p-listen-endpoint").as<
string>().length()) {
3622 my->p2p_address = options.at(
"p2p-listen-endpoint" ).as<
string>();
3626 if( options.count(
"p2p-server-address" ) ) {
3627 my->p2p_server_address = options.at(
"p2p-server-address" ).as<
string>();
3632 my->thread_pool_size = options.at(
"net-threads" ).as<
uint16_t>();
3633 SYS_ASSERT( my->thread_pool_size > 0, chain::plugin_config_exception,
3634 "net-threads ${num} must be greater than 0", (
"num", my->thread_pool_size) );
3636 if( options.count(
"p2p-peer-address" )) {
3637 my->supplied_peers = options.at(
"p2p-peer-address" ).as<
vector<string> >();
3639 if( options.count(
"agent-name" )) {
3640 my->user_agent_name = options.at(
"agent-name" ).as<
string>();
3645 if( options.count(
"allowed-connection" )) {
3646 const std::vector<std::string> allowed_remotes = options[
"allowed-connection"].as<std::vector<std::string>>();
3647 for(
const std::string& allowed_remote : allowed_remotes ) {
3648 if( allowed_remote ==
"any" )
3650 else if( allowed_remote ==
"producers" )
3652 else if( allowed_remote ==
"specified" )
3654 else if( allowed_remote ==
"none" )
3661 plugin_config_exception,
3662 "At least one peer-key must accompany 'allowed-connection=specified'" );
3664 if( options.count(
"peer-key" )) {
3665 const std::vector<std::string> key_strings = options[
"peer-key"].as<std::vector<std::string>>();
3666 for(
const std::string& key_string : key_strings ) {
3671 if( options.count(
"peer-private-key" )) {
3672 const std::vector<std::string> key_id_to_wif_pair_strings = options[
"peer-private-key"].as<std::vector<std::string>>();
3673 for(
const std::string& key_id_to_wif_pair_string : key_id_to_wif_pair_strings ) {
3675 key_id_to_wif_pair_string );
3681 SYS_ASSERT( my->chain_plug, chain::missing_chain_plugin_exception,
"" );
3682 my->chain_id = my->chain_plug->get_chain_id();
3684 const controller& cc = my->chain_plug->chain();
3687 if( my->p2p_accept_transactions ) {
3688 my->p2p_accept_transactions =
false;
3689 string m = cc.
get_read_mode() == db_read_mode::IRREVERSIBLE ?
"irreversible" :
"read-only";
3690 wlog(
"p2p-accept-transactions set to false due to read-mode: ${m}", (
"m", m) );
3693 if( my->p2p_accept_transactions ) {
3694 my->chain_plug->enable_accept_transactions();
3704 fc_ilog(
logger,
"my node_id is ${id}", (
"id", my->node_id ));
3708 my->thread_pool.emplace(
"net", my->thread_pool_size );
3712 if( !my->p2p_accept_transactions && my->p2p_address.size() ) {
3714 "***********************************\n"
3715 "* p2p-accept-transactions = false *\n"
3716 "* Transactions not forwarded *\n"
3717 "***********************************\n" );
3720 tcp::endpoint listen_endpoint;
3721 if( my->p2p_address.size() > 0 ) {
3722 auto host = my->p2p_address.substr( 0, my->p2p_address.find(
':' ));
3723 auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size());
3724 tcp::resolver resolver( my->thread_pool->get_executor() );
3726 listen_endpoint = *resolver.resolve( tcp::v4(), host, port );
3728 my->acceptor.reset(
new tcp::acceptor( my_impl->
thread_pool->get_executor() ) );
3730 if( !my->p2p_server_address.empty() ) {
3731 my->p2p_address = my->p2p_server_address;
3733 if( listen_endpoint.address().to_v4() == address_v4::any()) {
3734 boost::system::error_code ec;
3735 auto host = host_name( ec );
3736 if( ec.value() != boost::system::errc::success ) {
3739 "Unable to retrieve host_name. ${msg}", (
"msg", ec.message()));
3742 auto port = my->p2p_address.substr( my->p2p_address.find(
':' ), my->p2p_address.size());
3743 my->p2p_address = host + port;
3751 my->on_accepted_block(
s );
3754 my->on_pre_accepted_block(
s );
3757 my->on_irreversible_block(
s );
3762 std::lock_guard<std::mutex> g( my->keepalive_timer_mtx );
3763 my->keepalive_timer.reset(
new boost::asio::steady_timer( my->thread_pool->get_executor() ) );
3770 if( my->acceptor ) {
3772 my->acceptor->open(listen_endpoint.protocol());
3773 my->acceptor->set_option(tcp::acceptor::reuse_address(
true));
3774 my->acceptor->bind(listen_endpoint);
3775 my->acceptor->listen();
3776 }
catch (
const std::exception& e) {
3777 elog(
"net_plugin::plugin_startup failed to bind to port ${port}, ${what}",
3778 (
"port", listen_endpoint.port())(
"what", e.what()) );
3782 fc_ilog(
logger,
"starting listener, max clients is ${mc}",(
"mc",my->max_client_count) );
3783 my->start_listen_loop();
3787 my->start_monitors();
3788 my->update_chain_info();
3789 for(
const auto& seed_node : my->supplied_peers ) {
3790 my->connect( seed_node );
3808 my->in_shutdown =
true;
3810 std::lock_guard<std::mutex> g( my->connector_check_timer_mtx );
3811 if( my->connector_check_timer )
3812 my->connector_check_timer->cancel();
3814 std::lock_guard<std::mutex> g( my->expire_timer_mtx );
3815 if( my->expire_timer )
3816 my->expire_timer->cancel();
3818 std::lock_guard<std::mutex> g( my->keepalive_timer_mtx );
3819 if( my->keepalive_timer )
3820 my->keepalive_timer->cancel();
3824 fc_ilog(
logger,
"close ${s} connections", (
"s", my->connections.size()) );
3825 std::lock_guard<std::shared_mutex> g( my->connections_mtx );
3826 for(
auto& con : my->connections ) {
3827 fc_dlog(
logger,
"close: ${cid}", (
"cid", con->connection_id) );
3828 con->close(
false,
true );
3830 my->connections.clear();
3833 if( my->thread_pool ) {
3834 my->thread_pool->stop();
3837 if( my->acceptor ) {
3838 boost::system::error_code ec;
3839 my->acceptor->cancel( ec );
3840 my->acceptor->close( ec );
3843 app().
post( 0, [me = my](){} );
3853 return my->connect( host );
3859 return "already connected";
3862 fc_dlog(
logger,
"calling active connector: ${h}", (
"h", host) );
3863 if( c->resolve_and_connect() ) {
3864 fc_dlog(
logger,
"adding new connection to the list: ${host} ${cid}", (
"host", host)(
"cid", c->connection_id) );
3868 return "added connection";
3872 std::lock_guard<std::shared_mutex> g( my->connections_mtx );
3873 for(
auto itr = my->connections.begin(); itr != my->connections.end(); ++itr ) {
3874 if( (*itr)->peer_address() == host ) {
3875 fc_ilog(
logger,
"disconnecting: ${cid}", (
"cid", (*itr)->connection_id) );
3877 my->connections.erase(itr);
3878 return "connection removed";
3881 return "no known connection for host";
3885 std::shared_lock<std::shared_mutex> g( my->connections_mtx );
3886 auto con = my->find_connection( host );
3888 return con->get_status();
3889 return std::optional<connection_status>();
3894 std::shared_lock<std::shared_mutex> g( my->connections_mtx );
3895 result.reserve( my->connections.size() );
3896 for(
const auto& c : my->connections ) {
3897 result.push_back( c->get_status() );
3905 if( c->peer_address() == host )
return c;
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
@ started
the plugin is actively running
abstract_plugin * find_plugin(const string &name) const
auto post(int priority, Func &&func)
auto get_channel() -> std::enable_if_t< is_channel_decl< ChannelDecl >::value, typename ChannelDecl::channel_type & >
virtual state get_state() const override
Used to generate a useful error report when an exception is thrown.
std::string to_detail_string(log_level ll=log_level::all) const
const char * what() const noexcept override
std::string to_string(log_level ll=log_level::info) const
static variant from_string(const string &utf8_str, const parse_type ptype=parse_type::legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
static void update(const fc::string &name, logger &log)
bool is_enabled(log_level e) const
abstraction for a message buffer that spans a chain of physical buffers
mb_datastream< buffer_len > create_datastream()
std::vector< boost::asio::mutable_buffer > get_buffer_sequence_for_boost_async_read()
mb_peek_datastream< buffer_len > create_peek_datastream()
void advance_read_ptr(uint32_t bytes)
uint32_t bytes_to_write() const
constexpr int64_t to_seconds() const
An order-preserving dictionary of variants.
static sha256 hash(const char *d, uint32_t dlen)
An order-preserving dictionary of variants.
auto events() const
returns number of consecutive rbws
void accepted()
called when a block is accepted (sync_recv_block)
block_status_monitor & operator=(block_status_monitor &&)=delete
block_status_monitor(fc::microseconds window_size=fc::microseconds(2 *1000), uint32_t max_rejected_windows=13)
block_status_monitor(const block_status_monitor &)=delete
block_status_monitor & operator=(const block_status_monitor &)=delete
assignment not allowed
void reset()
reset to initial state
bool max_events_violated() const
indicates if the max number of consecutive rbws has been reached or exceeded
~block_status_monitor()=default
void rejected()
called when a block is rejected
block_status_monitor(block_status_monitor &&)=delete
bool is_trusted_producer(const account_name &producer) const
signed_block_ptr fetch_block_by_number(uint32_t block_num) const
block_id_type fork_db_pending_head_block_id() const
signal< void(const block_state_ptr &)> accepted_block
signal< void(const block_state_ptr &)> irreversible_block
db_read_mode get_read_mode() const
block_state_ptr create_block_state(const block_id_type &id, const signed_block_ptr &b) const
uint32_t head_block_num() const
block_id_type head_block_id() const
signal< void(const signed_block_ptr &)> pre_accepted_block
uint32_t last_irreversible_block_num() const
uint32_t fork_db_pending_head_block_num() const
signed_block_ptr fetch_block_by_id(block_id_type id) const
block_id_type last_irreversible_block_id() const
block_id_type get_block_id_for_num(uint32_t block_num) const
block_state_ptr fetch_block_state_by_id(block_id_type id) const
bool accept_block(const chain::signed_block_ptr &block, const chain::block_id_type &id, const chain::block_state_ptr &bsp)
void accept_transaction(const chain::packed_transaction_ptr &trx, chain::plugin_interface::next_function< chain::transaction_trace_ptr > next)
void set_heartbeat_timeout(std::chrono::milliseconds msec)
void request_sync_blocks(uint32_t start, uint32_t end)
void cancel_sync(go_away_reason)
bool enqueue_sync_block()
connection_status get_status() const
tstamp rec
receive timestamp
void enqueue_buffer(const std::shared_ptr< std::vector< char > > &send_buffer, go_away_reason close_after_send, bool to_sync_queue=false)
bool resolve_and_connect()
handshake_message last_handshake_recv
fc::time_point last_close
void send_time()
Populate and queue time_message.
tstamp xmt
transmit timestamp
void set_connection_type(const string &peer_addr)
const uint32_t connection_id
boost::asio::steady_timer response_expected_timer
void blk_send(const block_id_type &blkid)
bool process_next_message(uint32_t message_length)
Process the next message from the pending message buffer.
std::atomic< bool > connecting
void handle_message(const signed_block &msg)=delete
int16_t sent_handshake_count
std::atomic< std::size_t > outstanding_read_bytes
fc::variant_object get_logger_variant() const
void connect(const std::shared_ptr< tcp::resolver > &resolver, tcp::resolver::results_type endpoints)
fc::time_point last_dropped_trx_msg_time
void start_read_message()
void blk_send_branch(const block_id_type &msg_head_id)
string remote_endpoint_ip
void handle_message(const handshake_message &msg)
std::atomic< go_away_reason > no_retry
tstamp org
originate timestamp
void process_signed_block(const block_id_type &id, signed_block_ptr msg, block_state_ptr bsp)
std::mutex response_expected_timer_mtx
std::atomic< uint16_t > consecutive_immediate_connection_close
bool is_blocks_only_connection() const
void close(bool reconnect=true, bool shutdown=false)
queued_buffer buffer_queue
string short_conn_node_id
block_status_monitor block_status_monitor_
boost::asio::io_context::strand strand
void blk_send_branch_impl(uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num)
string log_remote_endpoint_port
void handle_message(const packed_transaction &msg)=delete
bool is_transactions_only_connection() const
static tstamp get_time()
Read system time and convert to a 64 bit integer.
handshake_message last_handshake_sent
std::atomic< bool > syncing
std::optional< request_message > last_req
string log_remote_endpoint_ip
bool is_valid(const handshake_message &msg) const
const string & peer_address() const
void check_heartbeat(tstamp current_time)
Check heartbeat time and send Time_message.
void sync_timeout(boost::system::error_code ec)
std::atomic< uint16_t > protocol_version
std::atomic< uint32_t > trx_in_progress_size
void enqueue(const net_message &msg)
std::shared_ptr< tcp::socket > socket
void fetch_timeout(boost::system::error_code ec)
bool socket_is_open() const
void queue_write(const std::shared_ptr< vector< char > > &buff, std::function< void(boost::system::error_code, std::size_t)> callback, bool to_sync_queue=false)
bool populate_handshake(handshake_message &hello)
fc::message_buffer< 1024 *1024 > pending_message_buffer
tstamp dst
destination timestamp
string local_endpoint_port
void enqueue_block(const signed_block_ptr &sb, bool to_sync_queue=false)
bool have_txn(const transaction_id_type &tid) const
void rejected_transaction(const packed_transaction_ptr &trx, uint32_t head_blk_num)
boost::asio::io_context::strand strand
void expire_blocks(uint32_t bnum)
void bcast_block(const signed_block_ptr &b, const block_id_type &id)
bool peer_has_block(const block_id_type &blkid, uint32_t connection_id) const
void rejected_block(const block_id_type &id)
dispatch_manager(boost::asio::io_context &io_context)
void recv_notice(const connection_ptr &conn, const notice_message &msg, bool generated)
void recv_block(const connection_ptr &conn, const block_id_type &msg, uint32_t bnum)
bool have_block(const block_id_type &blkid) const
bool add_peer_block(const block_id_type &blkid, uint32_t connection_id)
void bcast_transaction(const packed_transaction_ptr &trx)
void retry_fetch(const connection_ptr &conn)
bool add_peer_txn(const transaction_id_type id, const time_point_sec &trx_expires, uint32_t connection_id, const time_point_sec &now=time_point::now())
unique_ptr< boost::asio::steady_timer > expire_timer
uint32_t max_client_count
uint16_t thread_pool_size
std::mutex keepalive_timer_mtx
const std::chrono::system_clock::duration peer_authentication_interval
Peer clock may be no more than 1 second skewed from our clock, including network latency.
std::map< chain::public_key_type, chain::private_key_type > private_keys
overlapping with producer keys, also authenticating non-producing nodes
void ticker()
Peer heartbeat ticker.
string p2p_server_address
unique_ptr< sync_manager > sync_master
chain::signature_type sign_compact(const chain::public_key_type &signer, const fc::sha256 &digest) const
Returns a signature of the digest using the corresponding private key of the signer.
connection_ptr find_connection(const string &host) const
boost::asio::steady_timer::duration connector_period
std::chrono::milliseconds keepalive_interval
void start_expire_timer()
bool authenticate_peer(const handshake_message &msg) const
Determine if a peer is allowed to connect.
fc::microseconds p2p_dedup_cache_expire_time_us
unique_ptr< dispatch_manager > dispatcher
void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr< connection > from_connection)
void on_pre_accepted_block(const signed_block_ptr &bs)
string connect(const string &host)
std::mutex expire_timer_mtx
bool use_socket_read_watermark
std::atomic< uint32_t > current_connection_id
unique_ptr< boost::asio::steady_timer > keepalive_timer
possible_connections allowed_connections
bool p2p_accept_transactions
producer_plugin * producer_plug
std::shared_mutex connections_mtx
void on_irreversible_block(const block_state_ptr &blk)
unique_ptr< tcp::acceptor > acceptor
boost::asio::steady_timer::duration resp_expected_period
chain_plugin * chain_plug
uint32_t max_nodes_per_host
unique_ptr< boost::asio::steady_timer > connector_check_timer
void on_accepted_block(const block_state_ptr &bs)
std::atomic< bool > in_shutdown
std::tuple< uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type > get_chain_info() const
compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription
std::mutex connector_check_timer_mtx
vector< chain::public_key_type > allowed_peers
peer keys allowed to connect
vector< string > supplied_peers
void connection_monitor(std::weak_ptr< connection > from_connection, bool reschedule)
std::chrono::milliseconds heartbeat_timeout
std::optional< sysio::chain::named_thread_pool > thread_pool
static constexpr uint16_t to_protocol_version(uint16_t v)
std::set< connection_ptr > connections
chain::public_key_type get_authentication_key() const
Retrieve public key used to authenticate with peers.
int connector_checks_in_flight
void transaction_ack(const std::pair< fc::exception_ptr, packed_transaction_ptr > &)
boost::asio::steady_timer::duration txn_exp_period
vector< connection_status > connections() const
void plugin_initialize(const variables_map &options)
string disconnect(const string &endpoint)
std::optional< connection_status > status(const string &endpoint) const
void handle_sighup() override
string connect(const string &endpoint)
virtual void set_program_options(options_description &cli, options_description &cfg) override
chain::signature_type sign_compact(const chain::public_key_type &key, const fc::sha256 &digest) const
bool is_producer_key(const chain::public_key_type &key) const
void log_failed_transaction(const transaction_id_type &trx_id, const chain::packed_transaction_ptr &packed_trx_ptr, const char *reason) const
bool ready_to_send() const
void fill_out_buffer(std::vector< boost::asio::const_buffer > &bufs)
bool add_write_queue(const std::shared_ptr< vector< char > > &buff, std::function< void(boost::system::error_code, std::size_t)> callback, bool to_sync_queue)
void out_callback(boost::system::error_code ec, std::size_t w)
uint32_t write_queue_size() const
bool is_out_queue_empty() const
sync_manager(uint32_t span)
void sync_recv_notice(const connection_ptr &c, const notice_message &msg)
void recv_handshake(const connection_ptr &c, const handshake_message &msg)
void rejected_block(const connection_ptr &c, uint32_t blk_num)
std::unique_lock< std::mutex > locked_sync_mutex()
void reset_last_requested_num(const std::unique_lock< std::mutex > &lock)
void sync_reassign_fetch(const connection_ptr &c, go_away_reason reason)
void sync_reset_lib_num(const connection_ptr &conn, bool closing)
bool syncing_with_peer() const
void sync_update_expected(const connection_ptr &c, const block_id_type &blk_id, uint32_t blk_num, bool blk_applied)
void sync_recv_block(const connection_ptr &c, const block_id_type &blk_id, uint32_t blk_num, bool blk_applied)
static void send_handshakes()
client::connection_ptr connection_ptr
Defines exception's used by fc.
#define FC_CAPTURE_AND_RETHROW(...)
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define FC_LOG_AND_RETHROW()
#define FC_LOG_AND_DROP(...)
void close(T *e, websocketpp::connection_hdl hdl)
#define fc_create_span(TRACE_VARNAME, SPAN_STR)
#define fc_add_tag(SPAN_VARNAME, TAG_KEY_STR, TAG_VALUE_STR)
#define fc_create_trace_with_id(TRACE_STR, TRACE_ID)
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
#define fc_ilog(LOGGER, FORMAT,...)
#define fc_elog(LOGGER, FORMAT,...)
#define fc_wlog(LOGGER, FORMAT,...)
#define fc_dlog(LOGGER, FORMAT,...)
static const Segment ds(Segment::ds)
static const Reg8 bh(Operand::BH)
void unpack(Stream &s, std::deque< T > &value)
void pack(Stream &s, const std::deque< T > &value)
size_t pack_size(const T &v)
fc::sha256 digest(const T &value)
constexpr microseconds milliseconds(int64_t s)
void rand_pseudo_bytes(char *buf, int count)
constexpr microseconds seconds(int64_t s)
constexpr std::size_t get_index()
fc::crypto::public_key public_key_type
std::shared_ptr< transaction_trace > transaction_trace_ptr
std::shared_ptr< const packed_transaction > packed_transaction_ptr
key Invalid authority Invalid transaction Invalid block ID Invalid packed transaction Invalid chain ID Invalid symbol Signature type is not a currently activated type Block can not be found block_validate_exception
std::shared_ptr< signed_block > signed_block_ptr
std::shared_ptr< block_state > block_state_ptr
fc::crypto::signature signature_type
constexpr auto message_header_size
void for_each_block_connection(Function f)
constexpr uint16_t net_version_max
enum_type & operator|=(enum_type &lhs, const enum_type &rhs)
constexpr uint16_t proto_explicit_sync
void verify_strand_in_this_thread(const Strand &strand, const char *func, int line)
size_t calc_trx_size(const packed_transaction_ptr &trx)
constexpr size_t max_handshake_str_length
constexpr uint16_t proto_dup_goaway_resolution
constexpr uint16_t proto_base
constexpr uint16_t proto_wire_sysio_initial
std::chrono::system_clock::duration::rep tstamp
constexpr auto modes_str(id_list_modes m)
std::shared_ptr< std::vector< char > > send_buffer_type
constexpr auto def_max_clients
std::string peer_log_format
T dejsonify(const string &s)
@ duplicate
the connection is redundant
@ self
the connection is to itself
@ no_reason
no reason to go away
@ wrong_chain
the peer's chain id doesn't match
@ wrong_version
the peer's network version doesn't match
@ unlinkable
the peer sent a block we couldn't use
@ fatal_other
a catch-all for errors we don't have discriminated
@ authentication
peer failed authenicatio
@ validation
the peer sent a block that failed validation
@ benign_other
reasons such as a timeout. not fatal but warrant resetting
multi_index_container< node_transaction_state, indexed_by< ordered_unique< tag< by_id >, composite_key< node_transaction_state, member< node_transaction_state, transaction_id_type, &node_transaction_state::id >, member< node_transaction_state, uint32_t, &node_transaction_state::connection_id > >, composite_key_compare< sha256_less, std::less< uint32_t > > >, ordered_non_unique< tag< by_expiry >, member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > > > > node_transaction_index
constexpr uint32_t signed_block_which
constexpr uint16_t proto_block_id_notify
multi_index_container< sysio::peer_block_state, indexed_by< ordered_unique< tag< by_id >, composite_key< peer_block_state, member< peer_block_state, uint32_t, &sysio::peer_block_state::connection_id >, member< peer_block_state, block_id_type, &sysio::peer_block_state::id > >, composite_key_compare< std::less< uint32_t >, sha256_less > >, ordered_non_unique< tag< by_peer_block_id >, composite_key< peer_block_state, member< peer_block_state, block_id_type, &sysio::peer_block_state::id >, member< peer_block_state, bool, &sysio::peer_block_state::have_block > >, composite_key_compare< sha256_less, std::greater< bool > > >, ordered_non_unique< tag< by_block_num >, member< sysio::peer_block_state, uint32_t, &sysio::peer_block_state::block_num > > > > peer_block_state_index
constexpr uint16_t proto_pruned_types
std::weak_ptr< connection > connection_wptr
constexpr auto def_send_buffer_size_mb
constexpr uint16_t proto_heartbeat_interval
std::variant< handshake_message, chain_size_message, go_away_message, time_message, notice_message, request_message, sync_request_message, signed_block, packed_transaction > net_message
constexpr uint32_t packed_transaction_which
constexpr auto def_send_buffer_size
constexpr auto def_sync_fetch_span
constexpr auto def_max_trx_in_progress_size
constexpr auto reason_str(go_away_reason rsn)
constexpr auto def_max_nodes_per_host
constexpr uint16_t net_version_range
constexpr size_t max_p2p_address_length
constexpr uint16_t net_version_base
constexpr auto def_resp_expected_wait
constexpr auto def_max_write_queue_size
constexpr auto def_keepalive_interval
constexpr auto def_txn_expire_wait
constexpr auto def_conn_retry_wait
const fc::string logger_name("net_plugin_impl")
constexpr uint16_t proto_dup_node_id_goaway
std::shared_ptr< connection > connection_ptr
constexpr auto def_max_consecutive_immediate_connection_close
void for_each_connection(Function f)
#define peer_elog(PEER, FORMAT,...)
#define peer_dlog(PEER, FORMAT,...)
#define peer_ilog(PEER, FORMAT,...)
#define peer_wlog(PEER, FORMAT,...)
#define T(meth, val, expected)
static constexpr int medium
static constexpr int highest
const send_buffer_type & get_send_buffer(const signed_block_ptr &sb)
caches result for subsequent calls, only provide same signed_block_ptr instance for each invocation.
static send_buffer_type create_send_buffer(uint32_t which, const T &v)
const send_buffer_type & get_send_buffer(const net_message &m)
caches result for subsequent calls, only provide same net_message instance for each invocation
static send_buffer_type create_send_buffer(const net_message &m)
send_buffer_type send_buffer
static constexpr uint16_t extension_id()
handshake_message last_handshake
fc::sha256 node_id
for duplicate notification
chain_id_type chain_id
used to identify chain
uint16_t network_version
incremental value above a computed base
uint32_t last_irreversible_block_num
fc::sha256 node_id
used to identify peers and prevent self-connect
fc::sha256 token
digest of time to prove we own the private key of the key above
chain::public_key_type key
authentication key; may be a producer or peer key, or empty
chain::signature_type sig
signature for the digest
int64_t time
time message created in nanoseconds from epoch
block_id_type last_irreversible_block_id
void operator()(const handshake_message &msg) const
void operator()(const T &) const
void operator()(const request_message &msg) const
void operator()(const chain_size_message &msg) const
msg_handler(const connection_ptr &conn)
void operator()(const go_away_message &msg) const
void operator()(const sync_request_message &msg) const
void operator()(const time_message &msg) const
void operator()(const notice_message &msg) const
uint32_t connection_id
time after which this may be purged.
ordered_blk_ids known_blocks
ordered_txn_ids known_trx
peer_sync_state(uint32_t start=0, uint32_t end=0, uint32_t last_acted=0)
time_point start_time
time request made or received
uint32_t last
last sent or received
ordered_blk_ids req_blocks
tstamp xmt
transmit timestamp
tstamp dst
destination timestamp
tstamp rec
receive timestamp
tstamp org
origin timestamp
const send_buffer_type & get_send_buffer(const packed_transaction_ptr &trx)
caches result for subsequent calls, only provide same packed_transaction_ptr instance for each invoca...