Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
net_plugin.cpp
Go to the documentation of this file.
2
12
14#include <fc/network/ip.hpp>
15#include <fc/io/json.hpp>
16#include <fc/io/raw.hpp>
17#include <fc/log/appender.hpp>
19#include <fc/log/trace.hpp>
21#include <fc/crypto/rand.hpp>
23
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/ip/host_name.hpp>
26#include <boost/asio/steady_timer.hpp>
27
28#include <atomic>
29#include <shared_mutex>
30
31using namespace sysio::chain::plugin_interface;
32
33namespace sysio {
34 static appbase::abstract_plugin& _net_plugin = app().register_plugin<net_plugin>();
35
36 using std::vector;
37
38 using boost::asio::ip::tcp;
39 using boost::asio::ip::address_v4;
40 using boost::asio::ip::host_name;
41 using boost::multi_index_container;
42
43 using fc::time_point;
47
48 class connection;
49
50 using connection_ptr = std::shared_ptr<connection>;
51 using connection_wptr = std::weak_ptr<connection>;
52
53 template <typename Strand>
54 void verify_strand_in_this_thread(const Strand& strand, const char* func, int line) {
55 if( !strand.running_in_this_thread() ) {
56 elog( "wrong strand: ${f} : line ${n}, exiting", ("f", func)("n", line) );
57 app().quit();
58 }
59 }
60
66
67 struct by_expiry;
68
69 typedef multi_index_container<
71 indexed_by<
72 ordered_unique<
73 tag<by_id>,
74 composite_key< node_transaction_state,
75 member<node_transaction_state, transaction_id_type, &node_transaction_state::id>,
76 member<node_transaction_state, uint32_t, &node_transaction_state::connection_id>
77 >,
78 composite_key_compare< sha256_less, std::less<uint32_t> >
79 >,
80 ordered_non_unique<
81 tag< by_expiry >,
82 member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > >
83 >
84 >
86
91 bool have_block = false; // true if we have received the block, false if only received id notification
92 };
93
94 struct by_peer_block_id;
95 struct by_block_num;
96
97 typedef multi_index_container<
99 indexed_by<
100 ordered_unique< tag<by_id>,
101 composite_key< peer_block_state,
102 member<peer_block_state, uint32_t, &sysio::peer_block_state::connection_id>,
103 member<peer_block_state, block_id_type, &sysio::peer_block_state::id>
104 >,
105 composite_key_compare< std::less<uint32_t>, sha256_less >
106 >,
107 ordered_non_unique< tag<by_peer_block_id>,
108 composite_key< peer_block_state,
109 member<peer_block_state, block_id_type, &sysio::peer_block_state::id>,
110 member<peer_block_state, bool, &sysio::peer_block_state::have_block>
111 >,
112 composite_key_compare< sha256_less, std::greater<bool> >
113 >,
114 ordered_non_unique< tag<by_block_num>, member<sysio::peer_block_state, uint32_t, &sysio::peer_block_state::block_num > >
115 >
117
118
120 private:
121 enum stages {
122 lib_catchup,
123 head_catchup,
124 in_sync
125 };
126
127 static constexpr int64_t block_interval_ns =
128 std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();
129
130 mutable std::mutex sync_mtx;
131 uint32_t sync_known_lib_num{0};
132 uint32_t sync_last_requested_num{0};
133 uint32_t sync_next_expected_num{0};
134 uint32_t sync_req_span{0};
135 connection_ptr sync_source;
136 std::atomic<stages> sync_state{in_sync};
137
138 private:
139 constexpr static auto stage_str( stages s );
140 bool set_state( stages s );
141 bool is_sync_required( uint32_t fork_head_block_num );
142 void request_next_chunk( std::unique_lock<std::mutex> g_sync, const connection_ptr& conn = connection_ptr() );
143 void start_sync( const connection_ptr& c, uint32_t target );
144 bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id );
145
146 public:
147 explicit sync_manager( uint32_t span );
148 static void send_handshakes();
149 bool syncing_with_peer() const { return sync_state == lib_catchup; }
150 void sync_reset_lib_num( const connection_ptr& conn, bool closing );
151 void sync_reassign_fetch( const connection_ptr& c, go_away_reason reason );
152 void rejected_block( const connection_ptr& c, uint32_t blk_num );
153 void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied );
154 void sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied );
155 void recv_handshake( const connection_ptr& c, const handshake_message& msg );
156 void sync_recv_notice( const connection_ptr& c, const notice_message& msg );
157 inline std::unique_lock<std::mutex> locked_sync_mutex() {
158 return std::unique_lock<std::mutex>(sync_mtx);
159 }
160 inline void reset_last_requested_num(const std::unique_lock<std::mutex>& lock) {
161 sync_last_requested_num = 0;
162 }
163 };
164
166 mutable std::mutex blk_state_mtx;
167 peer_block_state_index blk_state;
168 mutable std::mutex local_txns_mtx;
169 node_transaction_index local_txns;
170
171 public:
172 boost::asio::io_context::strand strand;
173
174 explicit dispatch_manager(boost::asio::io_context& io_context)
175 : strand( io_context ) {}
176
178 void rejected_transaction(const packed_transaction_ptr& trx, uint32_t head_blk_num);
179 void bcast_block( const signed_block_ptr& b, const block_id_type& id );
180 void rejected_block(const block_id_type& id);
181
182 void recv_block(const connection_ptr& conn, const block_id_type& msg, uint32_t bnum);
183 void expire_blocks( uint32_t bnum );
184 void recv_notice(const connection_ptr& conn, const notice_message& msg, bool generated);
185
186 void retry_fetch(const connection_ptr& conn);
187
188 bool add_peer_block( const block_id_type& blkid, uint32_t connection_id );
189 bool peer_has_block(const block_id_type& blkid, uint32_t connection_id) const;
190 bool have_block(const block_id_type& blkid) const;
191
192 bool add_peer_txn( const transaction_id_type id, const time_point_sec& trx_expires, uint32_t connection_id,
193 const time_point_sec& now = time_point::now() );
194 bool have_txn( const transaction_id_type& tid ) const;
195 void expire_txns();
196 };
197
201 constexpr auto def_send_buffer_size_mb = 4;
204 constexpr auto def_max_trx_in_progress_size = 100*1024*1024; // 100 MB
205 constexpr auto def_max_consecutive_immediate_connection_close = 9; // back off if client keeps closing
206 constexpr auto def_max_clients = 25; // 0 for unlimited clients
207 constexpr auto def_max_nodes_per_host = 1;
208 constexpr auto def_conn_retry_wait = 30;
209 constexpr auto def_txn_expire_wait = std::chrono::seconds(3);
210 constexpr auto def_resp_expected_wait = std::chrono::seconds(5);
211 constexpr auto def_sync_fetch_span = 100;
212 constexpr auto def_keepalive_interval = 10000;
213
214 constexpr auto message_header_size = sizeof(uint32_t);
217
218 class net_plugin_impl : public std::enable_shared_from_this<net_plugin_impl> {
219 public:
221 std::atomic<uint32_t> current_connection_id{0};
222
225
232
235 std::map<chain::public_key_type,
238 None = 0,
239 Producers = 1 << 0,
240 Specified = 1 << 1,
241 Any = 1 << 2
242 };
244
245 boost::asio::steady_timer::duration connector_period{0};
246 boost::asio::steady_timer::duration txn_exp_period{0};
247 boost::asio::steady_timer::duration resp_expected_period{0};
248 std::chrono::milliseconds keepalive_interval{std::chrono::milliseconds{def_keepalive_interval}};
249 std::chrono::milliseconds heartbeat_timeout{keepalive_interval * 2};
250
256
258 const std::chrono::system_clock::duration peer_authentication_interval{std::chrono::seconds{1}};
259
263
269 mutable std::shared_mutex connections_mtx;
270 std::set< connection_ptr > connections; // todo: switch to a thread safe container to avoid big mutex over complete collection
271
275
278
281
282 std::atomic<bool> in_shutdown{false};
283
284 compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;
285
287 std::optional<sysio::chain::named_thread_pool> thread_pool;
288
289 private:
290 mutable std::mutex chain_info_mtx; // protects chain_*
291 uint32_t chain_lib_num{0};
292 uint32_t chain_head_blk_num{0};
293 uint32_t chain_fork_head_blk_num{0};
294 block_id_type chain_lib_id;
295 block_id_type chain_head_blk_id;
296 block_id_type chain_fork_head_blk_id;
297
298 public:
299 void update_chain_info();
300 // lib_num, head_block_num, fork_head_blk_num, lib_id, head_blk_id, fork_head_blk_id
301 std::tuple<uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type> get_chain_info() const;
302
303 void start_listen_loop();
304
305 void on_accepted_block( const block_state_ptr& bs );
306 void on_pre_accepted_block( const signed_block_ptr& bs );
307 void transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>&);
308 void on_irreversible_block( const block_state_ptr& blk );
309
310 void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
311 void start_expire_timer();
312 void start_monitors();
313
314 void expire();
315 void connection_monitor(std::weak_ptr<connection> from_connection, bool reschedule);
322 void ticker();
330 bool authenticate_peer(const handshake_message& msg) const;
347
348 constexpr static uint16_t to_protocol_version(uint16_t v);
349
350 connection_ptr find_connection(const string& host)const; // must call with held mutex
351 string connect( const string& host );
352 };
353
354 const fc::string logger_name("net_plugin_impl");
356 std::string peer_log_format;
357
358 // peer_[x]log must be called from thread in connection strand
359#define peer_dlog( PEER, FORMAT, ... ) \
360 FC_MULTILINE_MACRO_BEGIN \
361 if( logger.is_enabled( fc::log_level::debug ) ) { \
362 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
363 logger.log( FC_LOG_MESSAGE( debug, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
364 } \
365 FC_MULTILINE_MACRO_END
366
367#define peer_ilog( PEER, FORMAT, ... ) \
368 FC_MULTILINE_MACRO_BEGIN \
369 if( logger.is_enabled( fc::log_level::info ) ) { \
370 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
371 logger.log( FC_LOG_MESSAGE( info, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
372 } \
373 FC_MULTILINE_MACRO_END
374
375#define peer_wlog( PEER, FORMAT, ... ) \
376 FC_MULTILINE_MACRO_BEGIN \
377 if( logger.is_enabled( fc::log_level::warn ) ) { \
378 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
379 logger.log( FC_LOG_MESSAGE( warn, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
380 } \
381 FC_MULTILINE_MACRO_END
382
383#define peer_elog( PEER, FORMAT, ... ) \
384 FC_MULTILINE_MACRO_BEGIN \
385 if( logger.is_enabled( fc::log_level::error ) ) { \
386 verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
387 logger.log( FC_LOG_MESSAGE( error, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
388 } \
389 FC_MULTILINE_MACRO_END
390
391
392 template<class enum_type, class=typename std::enable_if<std::is_enum<enum_type>::value>::type>
393 inline enum_type& operator|=(enum_type& lhs, const enum_type& rhs)
394 {
395 using T = std::underlying_type_t <enum_type>;
396 return lhs = static_cast<enum_type>(static_cast<T>(lhs) | static_cast<T>(rhs));
397 }
398
399 static net_plugin_impl *my_impl;
400
413 constexpr uint16_t net_version_base = 0x04b5;
419#pragma GCC diagnostic push
420#pragma GCC diagnostic ignored "-Wunused-variable"
421 constexpr uint16_t proto_base = 0;
422 constexpr uint16_t proto_explicit_sync = 1; // version at time of sysio 1.0
423 constexpr uint16_t proto_block_id_notify = 2; // reserved. feature was removed. next net_version should be 3
424 constexpr uint16_t proto_pruned_types = 3; // sysio 2.1: supports new signed_block & packed_transaction types
425 constexpr uint16_t proto_heartbeat_interval = 4; // sysio 2.1: supports configurable heartbeat interval
426 constexpr uint16_t proto_dup_goaway_resolution = 5; // sysio 2.1: support peer address based duplicate connection resolution
427 constexpr uint16_t proto_dup_node_id_goaway = 6; // sysio 2.1: support peer node_id based duplicate connection resolution
428 constexpr uint16_t proto_wire_sysio_initial = 7; // wire_sysio client, needed because none of the 2.1 versions are supported
429#pragma GCC diagnostic pop
430
432
437 explicit peer_sync_state(uint32_t start = 0, uint32_t end = 0, uint32_t last_acted = 0)
438 :start_block( start ), end_block( end ), last( last_acted ),
439 start_time(time_point::now())
440 {}
445 };
446
447 // thread safe
448 class queued_buffer : boost::noncopyable {
449 public:
451 std::lock_guard<std::mutex> g( _mtx );
452 _write_queue.clear();
453 _sync_write_queue.clear();
454 _write_queue_size = 0;
455 }
456
458 std::lock_guard<std::mutex> g( _mtx );
459 while ( _out_queue.size() > 0 ) {
460 _out_queue.pop_front();
461 }
462 }
463
465 std::lock_guard<std::mutex> g( _mtx );
466 return _write_queue_size;
467 }
468
469 bool is_out_queue_empty() const {
470 std::lock_guard<std::mutex> g( _mtx );
471 return _out_queue.empty();
472 }
473
474 bool ready_to_send() const {
475 std::lock_guard<std::mutex> g( _mtx );
476 // if out_queue is not empty then async_write is in progress
477 return ((!_sync_write_queue.empty() || !_write_queue.empty()) && _out_queue.empty());
478 }
479
480 // @param callback must not callback into queued_buffer
481 bool add_write_queue( const std::shared_ptr<vector<char>>& buff,
482 std::function<void( boost::system::error_code, std::size_t )> callback,
483 bool to_sync_queue ) {
484 std::lock_guard<std::mutex> g( _mtx );
485 if( to_sync_queue ) {
486 _sync_write_queue.push_back( {buff, callback} );
487 } else {
488 _write_queue.push_back( {buff, callback} );
489 }
490 _write_queue_size += buff->size();
491 if( _write_queue_size > 2 * def_max_write_queue_size ) {
492 return false;
493 }
494 return true;
495 }
496
497 void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs ) {
498 std::lock_guard<std::mutex> g( _mtx );
499 if( _sync_write_queue.size() > 0 ) { // always send msgs from sync_write_queue first
500 fill_out_buffer( bufs, _sync_write_queue );
501 } else { // postpone real_time write_queue if sync queue is not empty
502 fill_out_buffer( bufs, _write_queue );
503 SYS_ASSERT( _write_queue_size == 0, plugin_exception, "write queue size expected to be zero" );
504 }
505 }
506
507 void out_callback( boost::system::error_code ec, std::size_t w ) {
508 std::lock_guard<std::mutex> g( _mtx );
509 for( auto& m : _out_queue ) {
510 m.callback( ec, w );
511 }
512 }
513
514 private:
515 struct queued_write;
516 void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs,
517 deque<queued_write>& w_queue ) {
518 while ( w_queue.size() > 0 ) {
519 auto& m = w_queue.front();
520 bufs.push_back( boost::asio::buffer( *m.buff ));
521 _write_queue_size -= m.buff->size();
522 _out_queue.emplace_back( m );
523 w_queue.pop_front();
524 }
525 }
526
527 private:
528 struct queued_write {
529 std::shared_ptr<vector<char>> buff;
530 std::function<void( boost::system::error_code, std::size_t )> callback;
531 };
532
533 mutable std::mutex _mtx;
534 uint32_t _write_queue_size{0};
535 deque<queued_write> _write_queue;
536 deque<queued_write> _sync_write_queue; // sync_write_queue will be sent first
537 deque<queued_write> _out_queue;
538
539 }; // queued_buffer
540
541
547 private:
548 bool in_accepted_state_ {true};
549 fc::microseconds window_size_{2*1000};
550 fc::time_point window_start_;
551 uint32_t events_{0};
552 const uint32_t max_consecutive_rejected_windows_{13};
553
554 public:
561 uint32_t max_rejected_windows = 13) :
562 window_size_(window_size) {}
567 void reset();
569 void accepted() { reset(); }
571 void rejected();
573 auto events() const { return events_; }
575 bool max_events_violated() const { return events_ >= max_consecutive_rejected_windows_; }
579 };
580
581 class connection : public std::enable_shared_from_this<connection> {
582 public:
583 explicit connection( const string& endpoint );
584 connection();
585
586 ~connection() = default;
587
588 bool start_session();
589
590 bool socket_is_open() const { return socket_open.load(); } // thread safe, atomic
591 const string& peer_address() const { return peer_addr; } // thread safe, const
592
593 void set_connection_type( const string& peer_addr );
594 bool is_transactions_only_connection()const { return connection_type == transactions_only; }
595 bool is_blocks_only_connection()const { return connection_type == blocks_only; }
596 void set_heartbeat_timeout(std::chrono::milliseconds msec) {
597 std::chrono::system_clock::duration dur = msec;
598 hb_timeout = dur.count();
599 }
600
601 private:
602 static const string unknown;
603
604 void update_endpoints();
605
606 std::optional<peer_sync_state> peer_requested; // this peer is requesting info from us
607
608 std::atomic<bool> socket_open{false};
609
610 const string peer_addr;
611 enum connection_types : char {
612 both,
613 transactions_only,
614 blocks_only
615 };
616
617 std::atomic<connection_types> connection_type{both};
618
619 public:
620 boost::asio::io_context::strand strand;
621 std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction
622
624 std::atomic<std::size_t> outstanding_read_bytes{0}; // accessed only from strand threads
625
627
635
636 std::atomic<uint32_t> trx_in_progress_size{0};
640 std::atomic<bool> connecting{true};
641 std::atomic<bool> syncing{false};
642
643 std::atomic<uint16_t> protocol_version = 0;
646 std::atomic<uint16_t> consecutive_immediate_connection_close = 0;
647
649 boost::asio::steady_timer response_expected_timer;
650
651 std::atomic<go_away_reason> no_retry{no_reason};
652
653 mutable std::mutex conn_mtx; //< mtx for last_req .. remote_endpoint_ip
654 std::optional<request_message> last_req;
661
663
668 // Members set from network data
674 // timestamp for the lastest message
676 tstamp hb_timeout{std::chrono::milliseconds{def_keepalive_interval}.count()};
678
679 bool connected();
680 bool current();
681
684 void close( bool reconnect = true, bool shutdown = false );
685 private:
686 static void _close( connection* self, bool reconnect, bool shutdown ); // for easy capture
687
688 bool process_next_block_message(uint32_t message_length);
689 bool process_next_trx_message(uint32_t message_length);
690 public:
691
693
694 bool resolve_and_connect();
695 void connect( const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints );
696 void start_read_message();
697
706 bool process_next_message(uint32_t message_length);
707
708 void send_handshake();
709
715 void check_heartbeat( tstamp current_time );
718 void send_time();
721 void send_time(const time_message& msg);
729 static tstamp get_time() {
730 return std::chrono::system_clock::now().time_since_epoch().count();
731 }
734 void blk_send_branch( const block_id_type& msg_head_id );
735 void blk_send_branch_impl( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num );
736 void blk_send(const block_id_type& blkid);
737 void stop_send();
738
739 void enqueue( const net_message &msg );
740 void enqueue_block( const signed_block_ptr& sb, bool to_sync_queue = false);
741 void enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
742 go_away_reason close_after_send,
743 bool to_sync_queue = false);
745 void flush_queues();
746 bool enqueue_sync_block();
747 void request_sync_blocks(uint32_t start, uint32_t end);
748
749 void cancel_wait();
750 void sync_wait();
751 void fetch_wait();
752 void sync_timeout(boost::system::error_code ec);
753 void fetch_timeout(boost::system::error_code ec);
754
755 void queue_write(const std::shared_ptr<vector<char>>& buff,
756 std::function<void(boost::system::error_code, std::size_t)> callback,
757 bool to_sync_queue = false);
758 void do_queue_write();
759
760 bool is_valid( const handshake_message& msg ) const;
761
762 void handle_message( const handshake_message& msg );
763 void handle_message( const chain_size_message& msg );
764 void handle_message( const go_away_message& msg );
778 void handle_message( const time_message& msg );
780 void handle_message( const notice_message& msg );
781 void handle_message( const request_message& msg );
782 void handle_message( const sync_request_message& msg );
783 void handle_message( const signed_block& msg ) = delete; // signed_block_ptr overload used instead
784 void handle_message( const block_id_type& id, signed_block_ptr msg );
785 void handle_message( const packed_transaction& msg ) = delete; // packed_transaction_ptr overload used instead
787
789
792 mvo( "_name", log_p2p_address)
793 ( "_cid", connection_id )
794 ( "_id", conn_node_id )
795 ( "_sid", short_conn_node_id )
796 ( "_ip", log_remote_endpoint_ip )
797 ( "_port", log_remote_endpoint_port )
798 ( "_lip", local_endpoint_ip )
799 ( "_lport", local_endpoint_port );
800 return mvo;
801 }
802 };
803
804 const string connection::unknown = "<unknown>";
805
806 // called from connection strand
807 struct msg_handler : public fc::visitor<void> {
809 explicit msg_handler( const connection_ptr& conn) : c(conn) {}
810
811 template<typename T>
812 void operator()( const T& ) const {
813 SYS_ASSERT( false, plugin_config_exception, "Not implemented, call handle_message directly instead" );
814 }
815
816 void operator()( const handshake_message& msg ) const {
817 // continue call to handle_message on connection strand
818 peer_dlog( c, "handle handshake_message" );
819 c->handle_message( msg );
820 }
821
822 void operator()( const chain_size_message& msg ) const {
823 // continue call to handle_message on connection strand
824 peer_dlog( c, "handle chain_size_message" );
825 c->handle_message( msg );
826 }
827
828 void operator()( const go_away_message& msg ) const {
829 // continue call to handle_message on connection strand
830 peer_dlog( c, "handle go_away_message" );
831 c->handle_message( msg );
832 }
833
834 void operator()( const time_message& msg ) const {
835 // continue call to handle_message on connection strand
836 peer_dlog( c, "handle time_message" );
837 c->handle_message( msg );
838 }
839
840 void operator()( const notice_message& msg ) const {
841 // continue call to handle_message on connection strand
842 peer_dlog( c, "handle notice_message" );
843 c->handle_message( msg );
844 }
845
846 void operator()( const request_message& msg ) const {
847 // continue call to handle_message on connection strand
848 peer_dlog( c, "handle request_message" );
849 c->handle_message( msg );
850 }
851
852 void operator()( const sync_request_message& msg ) const {
853 // continue call to handle_message on connection strand
854 peer_dlog( c, "handle sync_request_message" );
855 c->handle_message( msg );
856 }
857 };
858
859 template<typename Function>
860 void for_each_connection( Function f ) {
861 std::shared_lock<std::shared_mutex> g( my_impl->connections_mtx );
862 for( auto& c : my_impl->connections ) {
863 if( !f( c ) ) return;
864 }
865 }
866
867 template<typename Function>
868 void for_each_block_connection( Function f ) {
869 std::shared_lock<std::shared_mutex> g( my_impl->connections_mtx );
870 for( auto& c : my_impl->connections ) {
871 if( c->is_transactions_only_connection() ) continue;
872 if( !f( c ) ) return;
873 }
874 }
875
876 //---------------------------------------------------------------------------
877
878 connection::connection( const string& endpoint )
879 : peer_addr( endpoint ),
880 strand( my_impl->thread_pool->get_executor() ),
881 socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
882 log_p2p_address( endpoint ),
883 connection_id( ++my_impl->current_connection_id ),
884 response_expected_timer( my_impl->thread_pool->get_executor() ),
885 last_handshake_recv(),
886 last_handshake_sent()
887 {
888 fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
889 }
890
892 : peer_addr(),
893 strand( my_impl->thread_pool->get_executor() ),
894 socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
895 connection_id( ++my_impl->current_connection_id ),
896 response_expected_timer( my_impl->thread_pool->get_executor() ),
897 last_handshake_recv(),
898 last_handshake_sent()
899 {
900 fc_dlog( logger, "new connection object created" );
901 }
902
903 // called from connection strand
904 void connection::update_endpoints() {
905 boost::system::error_code ec;
906 boost::system::error_code ec2;
907 auto rep = socket->remote_endpoint(ec);
908 auto lep = socket->local_endpoint(ec2);
909 log_remote_endpoint_ip = ec ? unknown : rep.address().to_string();
910 log_remote_endpoint_port = ec ? unknown : std::to_string(rep.port());
911 local_endpoint_ip = ec2 ? unknown : lep.address().to_string();
912 local_endpoint_port = ec2 ? unknown : std::to_string(lep.port());
913 std::lock_guard<std::mutex> g_conn( conn_mtx );
915 }
916
917 // called from connection strand
918 void connection::set_connection_type( const string& peer_add ) {
919 // host:port:[<trx>|<blk>]
920 string::size_type colon = peer_add.find(':');
921 string::size_type colon2 = peer_add.find(':', colon + 1);
922 string::size_type end = colon2 == string::npos
923 ? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex
924 string host = peer_add.substr( 0, colon );
925 string port = peer_add.substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1));
926 string type = colon2 == string::npos ? "" : end == string::npos ?
927 peer_add.substr( colon2 + 1 ) : peer_add.substr( colon2 + 1, end - (colon2 + 1) );
928
929 if( type.empty() ) {
930 fc_dlog( logger, "Setting connection ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) );
931 connection_type = both;
932 } else if( type == "trx" ) {
933 fc_dlog( logger, "Setting connection ${c} type for: ${peer} to transactions only", ("c", connection_id)("peer", peer_add) );
934 connection_type = transactions_only;
935 } else if( type == "blk" ) {
936 fc_dlog( logger, "Setting connection ${c} type for: ${peer} to blocks only", ("c", connection_id)("peer", peer_add) );
937 connection_type = blocks_only;
938 } else {
939 fc_wlog( logger, "Unknown connection ${c} type: ${t}, for ${peer}", ("c", connection_id)("t", type)("peer", peer_add) );
940 }
941 }
942
945 stat.peer = peer_addr;
946 stat.connecting = connecting;
947 stat.syncing = syncing;
948 std::lock_guard<std::mutex> g( conn_mtx );
950 return stat;
951 }
952
953 // called from connection stand
956
957 update_endpoints();
958 boost::asio::ip::tcp::no_delay nodelay( true );
959 boost::system::error_code ec;
960 socket->set_option( nodelay, ec );
961 if( ec ) {
962 peer_elog( this, "connection failed (set_option): ${e1}", ( "e1", ec.message() ) );
963 close();
964 return false;
965 } else {
966 peer_dlog( this, "connected" );
967 socket_open = true;
969 return true;
970 }
971 }
972
974 return socket_is_open() && !connecting;
975 }
976
978 return (connected() && !syncing);
979 }
980
984
985 void connection::close( bool reconnect, bool shutdown ) {
986 strand.post( [self = shared_from_this(), reconnect, shutdown]() {
987 connection::_close( self.get(), reconnect, shutdown );
988 });
989 }
990
991 // called from connection strand
992 void connection::_close( connection* self, bool reconnect, bool shutdown ) {
993 self->socket_open = false;
994 boost::system::error_code ec;
995 if( self->socket->is_open() ) {
996 self->socket->shutdown( tcp::socket::shutdown_both, ec );
997 self->socket->close( ec );
998 }
999 self->socket.reset( new tcp::socket( my_impl->thread_pool->get_executor() ) );
1000 self->flush_queues();
1001 self->connecting = false;
1002 self->syncing = false;
1003 self->block_status_monitor_.reset();
1004 ++self->consecutive_immediate_connection_close;
1005 bool has_last_req = false;
1006 {
1007 std::lock_guard<std::mutex> g_conn( self->conn_mtx );
1008 has_last_req = self->last_req.has_value();
1009 self->last_handshake_recv = handshake_message();
1010 self->last_handshake_sent = handshake_message();
1011 self->last_close = fc::time_point::now();
1012 self->conn_node_id = fc::sha256();
1013 }
1014 if( has_last_req && !shutdown ) {
1015 my_impl->dispatcher->retry_fetch( self->shared_from_this() );
1016 }
1017 self->peer_requested.reset();
1018 self->sent_handshake_count = 0;
1019 if( !shutdown) my_impl->sync_master->sync_reset_lib_num( self->shared_from_this(), true );
1020 peer_ilog( self, "closing" );
1021 self->cancel_wait();
1022
1023 if( reconnect && !shutdown ) {
1024 my_impl->start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() );
1025 }
1026 }
1027
1028 // called from connection strand
1029 void connection::blk_send_branch( const block_id_type& msg_head_id ) {
1030 uint32_t head_num = 0;
1031 std::tie( std::ignore, std::ignore, head_num,
1032 std::ignore, std::ignore, std::ignore ) = my_impl->get_chain_info();
1033
1034 peer_dlog(this, "head_num = ${h}",("h",head_num));
1035 if(head_num == 0) {
1036 notice_message note;
1037 note.known_blocks.mode = normal;
1038 note.known_blocks.pending = 0;
1039 enqueue(note);
1040 return;
1041 }
1042 std::unique_lock<std::mutex> g_conn( conn_mtx );
1043 if( last_handshake_recv.generation >= 1 ) {
1044 peer_dlog( this, "maybe truncating branch at = ${h}:${id}",
1046 }
1047
1049 g_conn.unlock();
1050 const auto lib_num = block_header::num_from_id(lib_id);
1051 if( lib_num == 0 ) return; // if last_irreversible_block_id is null (we have not received handshake or reset)
1052
1053 app().post( priority::medium, [chain_plug = my_impl->chain_plug, c = shared_from_this(),
1054 lib_num, head_num, msg_head_id]() {
1055 auto msg_head_num = block_header::num_from_id(msg_head_id);
1056 bool on_fork = msg_head_num == 0;
1057 bool unknown_block = false;
1058 if( !on_fork ) {
1059 try {
1060 const controller& cc = chain_plug->chain();
1061 block_id_type my_id = cc.get_block_id_for_num( msg_head_num );
1062 on_fork = my_id != msg_head_id;
1063 } catch( const unknown_block_exception& ) {
1064 unknown_block = true;
1065 } catch( ... ) {
1066 on_fork = true;
1067 }
1068 }
1069 if( unknown_block ) {
1070 c->strand.post( [msg_head_num, c]() {
1071 peer_ilog( c, "Peer asked for unknown block ${mn}, sending: benign_other go away", ("mn", msg_head_num) );
1072 c->no_retry = benign_other;
1073 c->enqueue( go_away_message( benign_other ) );
1074 } );
1075 } else {
1076 if( on_fork ) msg_head_num = 0;
1077 // if peer on fork, start at their last lib, otherwise we can start at msg_head+1
1078 c->strand.post( [c, msg_head_num, lib_num, head_num]() {
1079 c->blk_send_branch_impl( msg_head_num, lib_num, head_num );
1080 } );
1081 }
1082 } );
1083 }
1084
1085 // called from connection strand
1086 void connection::blk_send_branch_impl( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ) {
1087 if( !peer_requested ) {
1088 auto last = msg_head_num != 0 ? msg_head_num : lib_num;
1089 peer_requested = peer_sync_state( last+1, head_num, last );
1090 } else {
1091 auto last = msg_head_num != 0 ? msg_head_num : std::min( peer_requested->last, lib_num );
1092 uint32_t end = std::max( peer_requested->end_block, head_num );
1093 peer_requested = peer_sync_state( last+1, end, last );
1094 }
1095 if( peer_requested->start_block <= peer_requested->end_block ) {
1096 peer_ilog( this, "enqueue ${s} - ${e}", ("s", peer_requested->start_block)("e", peer_requested->end_block) );
1098 } else {
1099 peer_ilog( this, "nothing to enqueue" );
1100 peer_requested.reset();
1101 }
1102 }
1103
1105 connection_wptr weak = shared_from_this();
1106 app().post( priority::medium, [blkid, weak{std::move(weak)}]() {
1107 connection_ptr c = weak.lock();
1108 if( !c ) return;
1109 try {
1110 controller& cc = my_impl->chain_plug->chain();
1111 signed_block_ptr b = cc.fetch_block_by_id( blkid );
1112 if( b ) {
1113 fc_dlog( logger, "fetch_block_by_id num ${n}, connection ${cid}",
1114 ("n", b->block_num())("cid", c->connection_id) );
1115 my_impl->dispatcher->add_peer_block( blkid, c->connection_id );
1116 c->strand.post( [c, b{std::move(b)}]() {
1117 c->enqueue_block( b );
1118 } );
1119 } else {
1120 fc_ilog( logger, "fetch block by id returned null, id ${id}, connection ${cid}",
1121 ("id", blkid)("cid", c->connection_id) );
1122 }
1123 } catch( const assert_exception& ex ) {
1124 fc_elog( logger, "caught assert on fetch_block_by_id, ${ex}, id ${id}, connection ${cid}",
1125 ("ex", ex.to_string())("id", blkid)("cid", c->connection_id) );
1126 } catch( ... ) {
1127 fc_elog( logger, "caught other exception fetching block id ${id}, connection ${cid}",
1128 ("id", blkid)("cid", c->connection_id) );
1129 }
1130 });
1131 }
1132
1134 syncing = false;
1135 }
1136
1138 strand.post( [c = shared_from_this()]() {
1139 std::unique_lock<std::mutex> g_conn( c->conn_mtx );
1140 if( c->populate_handshake( c->last_handshake_sent ) ) {
1141 static_assert( std::is_same_v<decltype( c->sent_handshake_count ), int16_t>, "INT16_MAX based on int16_t" );
1142 if( c->sent_handshake_count == INT16_MAX ) c->sent_handshake_count = 1; // do not wrap
1143 c->last_handshake_sent.generation = ++c->sent_handshake_count;
1144 auto last_handshake_sent = c->last_handshake_sent;
1145 g_conn.unlock();
1146 peer_ilog( c, "Sending handshake generation ${g}, lib ${lib}, head ${head}, id ${id}",
1149 ("head", last_handshake_sent.head_num)("id", last_handshake_sent.head_id.str().substr(8,16)) );
1150 c->enqueue( last_handshake_sent );
1151 }
1152 });
1153 }
1154
1155 // called from connection strand
1157 if( latest_msg_time > 0 ) {
1158 if( current_time > latest_msg_time + hb_timeout ) {
1160 if( !peer_address().empty() ) {
1161 peer_wlog(this, "heartbeat timed out for peer address");
1162 close(true);
1163 } else {
1164 peer_wlog(this, "heartbeat timed out");
1165 close(false);
1166 }
1167 return;
1168 } else {
1169 const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count());
1170 if ( current_time > latest_blk_time + timeout ) {
1172 return;
1173 }
1174 }
1175 }
1176
1177 send_time();
1178 }
1179
1180 // called from connection strand
1182 time_message xpkt;
1183 xpkt.org = rec;
1184 xpkt.rec = dst;
1185 xpkt.xmt = get_time();
1186 org = xpkt.xmt;
1187 enqueue(xpkt);
1188 }
1189
1190 // called from connection strand
1192 time_message xpkt;
1193 xpkt.org = msg.xmt;
1194 xpkt.rec = msg.dst;
1195 xpkt.xmt = get_time();
1196 enqueue(xpkt);
1197 }
1198
1199 // called from connection strand
1200 void connection::queue_write(const std::shared_ptr<vector<char>>& buff,
1201 std::function<void(boost::system::error_code, std::size_t)> callback,
1202 bool to_sync_queue) {
1203 if( !buffer_queue.add_write_queue( buff, callback, to_sync_queue )) {
1204 peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) );
1205 close();
1206 return;
1207 }
1209 }
1210
1211 // called from connection strand
1214 return;
1215 connection_ptr c(shared_from_this());
1216
1217 std::vector<boost::asio::const_buffer> bufs;
1219
1220 strand.post( [c{std::move(c)}, bufs{std::move(bufs)}]() {
1221 boost::asio::async_write( *c->socket, bufs,
1222 boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
1223 try {
1224 c->buffer_queue.clear_out_queue();
1225 // May have closed connection and cleared buffer_queue
1226 if( !c->socket_is_open() || socket != c->socket ) {
1227 peer_ilog( c, "async write socket ${r} before callback", ("r", c->socket_is_open() ? "changed" : "closed") );
1228 c->close();
1229 return;
1230 }
1231
1232 if( ec ) {
1233 if( ec.value() != boost::asio::error::eof ) {
1234 peer_elog( c, "Error sending to peer: ${i}", ( "i", ec.message() ) );
1235 } else {
1236 peer_wlog( c, "connection closure detected on write" );
1237 }
1238 c->close();
1239 return;
1240 }
1241
1242 c->buffer_queue.out_callback( ec, w );
1243
1244 c->enqueue_sync_block();
1245 c->do_queue_write();
1246 } catch ( const std::bad_alloc& ) {
1247 throw;
1248 } catch ( const boost::interprocess::bad_alloc& ) {
1249 throw;
1250 } catch( const fc::exception& ex ) {
1251 peer_elog( c, "fc::exception in do_queue_write: ${s}", ("s", ex.to_string()) );
1252 } catch( const std::exception& ex ) {
1253 peer_elog( c, "std::exception in do_queue_write: ${s}", ("s", ex.what()) );
1254 } catch( ... ) {
1255 peer_elog( c, "Unknown exception in do_queue_write" );
1256 }
1257 }));
1258 });
1259 }
1260
1261 // called from connection strand
1262 void connection::cancel_sync(go_away_reason reason) {
1263 peer_dlog( this, "cancel sync reason = ${m}, write queue size ${o} bytes",
1264 ("m", reason_str( reason ))("o", buffer_queue.write_queue_size()) );
1265 cancel_wait();
1266 flush_queues();
1267 switch (reason) {
1268 case validation :
1269 case fatal_other : {
1270 no_retry = reason;
1271 enqueue( go_away_message( reason ));
1272 break;
1273 }
1274 default:
1275 peer_ilog(this, "sending empty request but not calling sync wait");
1276 enqueue( ( sync_request_message ) {0,0} );
1277 }
1278 }
1279
1280 // called from connection strand
1281 bool connection::enqueue_sync_block() {
1282 if( !peer_requested ) {
1283 return false;
1284 } else {
1285 peer_dlog( this, "enqueue sync block ${num}", ("num", peer_requested->last + 1) );
1286 }
1287 uint32_t num = ++peer_requested->last;
1288 if(num == peer_requested->end_block) {
1289 peer_requested.reset();
1290 peer_dlog( this, "completing enqueue_sync_block ${num}", ("num", num) );
1291 }
1292 connection_wptr weak = shared_from_this();
1293 app().post( priority::medium, [num, weak{std::move(weak)}]() {
1294 connection_ptr c = weak.lock();
1295 if( !c ) return;
1296 controller& cc = my_impl->chain_plug->chain();
1298 try {
1299 sb = cc.fetch_block_by_number( num );
1300 } FC_LOG_AND_DROP();
1301 if( sb ) {
1302 c->strand.post( [c, sb{std::move(sb)}]() {
1303 c->enqueue_block( sb, true );
1304 });
1305 } else {
1306 c->strand.post( [c, num]() {
1307 peer_ilog( c, "enqueue sync, unable to fetch block ${num}, sending benign_other go away", ("num", num) );
1308 c->peer_requested.reset(); // unable to provide requested blocks
1309 c->no_retry = benign_other;
1310 c->enqueue( go_away_message( benign_other ) );
1311 });
1312 }
1313 });
1314
1315 return true;
1316 }
1317
1318 //------------------------------------------------------------------------
1319
1320 using send_buffer_type = std::shared_ptr<std::vector<char>>;
1321
1323
1326 if( !send_buffer ) {
1327 send_buffer = create_send_buffer( m );
1328 }
1329 return send_buffer;
1330 }
1331
1332 protected:
1334
1335 protected:
1337 const uint32_t payload_size = fc::raw::pack_size( m );
1338
1339 const char* const header = reinterpret_cast<const char* const>(&payload_size); // avoid variable size encoding of uint32_t
1340 const size_t buffer_size = message_header_size + payload_size;
1341
1342 auto send_buffer = std::make_shared<vector<char>>(buffer_size);
1343 fc::datastream<char*> ds( send_buffer->data(), buffer_size);
1344 ds.write( header, message_header_size );
1345 fc::raw::pack( ds, m );
1346
1347 return send_buffer;
1348 }
1349
1350 template< typename T>
1351 static send_buffer_type create_send_buffer( uint32_t which, const T& v ) {
1352 // match net_message static_variant pack
1353 const uint32_t which_size = fc::raw::pack_size( unsigned_int( which ) );
1354 const uint32_t payload_size = which_size + fc::raw::pack_size( v );
1355
1356 const char* const header = reinterpret_cast<const char* const>(&payload_size); // avoid variable size encoding of uint32_t
1357 const size_t buffer_size = message_header_size + payload_size;
1358
1359 auto send_buffer = std::make_shared<vector<char>>( buffer_size );
1360 fc::datastream<char*> ds( send_buffer->data(), buffer_size );
1361 ds.write( header, message_header_size );
1362 fc::raw::pack( ds, unsigned_int( which ) );
1363 fc::raw::pack( ds, v );
1364
1365 return send_buffer;
1366 }
1367
1368 };
1369
1371
1374 if( !send_buffer ) {
1375 send_buffer = create_send_buffer( sb );
1376 }
1377 return send_buffer;
1378 }
1379
1380 private:
1381
1382 static std::shared_ptr<std::vector<char>> create_send_buffer( const signed_block_ptr& sb ) {
1384 // this implementation is to avoid copy of signed_block to net_message
1385 // matches which of net_message for signed_block
1386 fc_dlog( logger, "sending block ${bn}", ("bn", sb->block_num()) );
1387 return buffer_factory::create_send_buffer( signed_block_which, *sb );
1388 }
1389 };
1390
1392
1395 if( !send_buffer ) {
1396 send_buffer = create_send_buffer( trx );
1397 }
1398 return send_buffer;
1399 }
1400
1401 private:
1402
1403 static std::shared_ptr<std::vector<char>> create_send_buffer( const packed_transaction_ptr& trx ) {
1405 // this implementation is to avoid copy of packed_transaction to net_message
1406 // matches which of net_message for packed_transaction
1407 return buffer_factory::create_send_buffer( packed_transaction_which, *trx );
1408 }
1409 };
1410
1411 //------------------------------------------------------------------------
1412
1413 // called from connection strand
1414 void connection::enqueue( const net_message& m ) {
1415 verify_strand_in_this_thread( strand, __func__, __LINE__ );
1416 go_away_reason close_after_send = no_reason;
1417 if (std::holds_alternative<go_away_message>(m)) {
1418 close_after_send = std::get<go_away_message>(m).reason;
1419 }
1420
1421 buffer_factory buff_factory;
1422 auto send_buffer = buff_factory.get_send_buffer( m );
1423 enqueue_buffer( send_buffer, close_after_send );
1424 }
1425
1426 // called from connection strand
1427 void connection::enqueue_block( const signed_block_ptr& b, bool to_sync_queue) {
1428 peer_dlog( this, "enqueue block ${num}", ("num", b->block_num()) );
1429 verify_strand_in_this_thread( strand, __func__, __LINE__ );
1430
1431 block_buffer_factory buff_factory;
1432 auto sb = buff_factory.get_send_buffer( b );
1433 latest_blk_time = get_time();
1434 enqueue_buffer( sb, no_reason, to_sync_queue);
1435 }
1436
1437 // called from connection strand
1438 void connection::enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
1439 go_away_reason close_after_send,
1440 bool to_sync_queue)
1441 {
1442 connection_ptr self = shared_from_this();
1443 queue_write(send_buffer,
1444 [conn{std::move(self)}, close_after_send](boost::system::error_code ec, std::size_t ) {
1445 if (ec) return;
1446 if (close_after_send != no_reason) {
1447 fc_ilog( logger, "sent a go away message: ${r}, closing connection ${cid}",
1448 ("r", reason_str(close_after_send))("cid", conn->connection_id) );
1449 conn->close();
1450 return;
1451 }
1452 },
1453 to_sync_queue);
1454 }
1455
1456 // thread safe
1457 void connection::cancel_wait() {
1458 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1459 response_expected_timer.cancel();
1460 }
1461
1462 // thread safe
1463 void connection::sync_wait() {
1464 connection_ptr c(shared_from_this());
1465 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1466 response_expected_timer.expires_from_now( my_impl->resp_expected_period );
1467 response_expected_timer.async_wait(
1468 boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) {
1469 c->sync_timeout( ec );
1470 } ) );
1471 }
1472
1473 // thread safe
1474 void connection::fetch_wait() {
1475 connection_ptr c( shared_from_this() );
1476 std::lock_guard<std::mutex> g( response_expected_timer_mtx );
1477 response_expected_timer.expires_from_now( my_impl->resp_expected_period );
1478 response_expected_timer.async_wait(
1479 boost::asio::bind_executor( c->strand, [c]( boost::system::error_code ec ) {
1480 c->fetch_timeout(ec);
1481 } ) );
1482 }
1483
1484 // called from connection strand
1485 void connection::sync_timeout( boost::system::error_code ec ) {
1486 if( !ec ) {
1487 my_impl->sync_master->sync_reassign_fetch( shared_from_this(), benign_other );
1488 close(true);
1489 } else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
1490 peer_elog( this, "setting timer for sync request got error ${ec}", ("ec", ec.message()) );
1491 }
1492 }
1493
1494 // called from connection strand
1495 void connection::fetch_timeout( boost::system::error_code ec ) {
1496 if( !ec ) {
1497 my_impl->dispatcher->retry_fetch( shared_from_this() );
1498 } else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
1499 peer_elog( this, "setting timer for fetch request got error ${ec}", ("ec", ec.message() ) );
1500 }
1501 }
1502
1503 // called from connection strand
1504 void connection::request_sync_blocks(uint32_t start, uint32_t end) {
1505 sync_request_message srm = {start,end};
1506 enqueue( net_message(srm) );
1507 sync_wait();
1508 }
1509
1510 //-----------------------------------------------------------
1511 void block_status_monitor::reset() {
1512 in_accepted_state_ = true;
1513 events_ = 0;
1514 }
1515
1516 void block_status_monitor::rejected() {
1517 const auto now = fc::time_point::now();
1518
1519 // in rejected state
1520 if(!in_accepted_state_) {
1521 const auto elapsed = now - window_start_;
1522 if( elapsed < window_size_ ) {
1523 return;
1524 }
1525 ++events_;
1526 window_start_ = now;
1527 return;
1528 }
1529
1530 // switching to rejected state
1531 in_accepted_state_ = false;
1532 window_start_ = now;
1533 events_ = 0;
1534 }
1535 //-----------------------------------------------------------
1536
1537 sync_manager::sync_manager( uint32_t req_span )
1538 :sync_known_lib_num( 0 )
1539 ,sync_last_requested_num( 0 )
1540 ,sync_next_expected_num( 1 )
1541 ,sync_req_span( req_span )
1542 ,sync_source()
1543 ,sync_state(in_sync)
1544 {
1545 }
1546
1547 constexpr auto sync_manager::stage_str(stages s) {
1548 switch (s) {
1549 case in_sync : return "in sync";
1550 case lib_catchup: return "lib catchup";
1551 case head_catchup : return "head catchup";
1552 default : return "unkown";
1553 }
1554 }
1555
1556 bool sync_manager::set_state(stages newstate) {
1557 if( sync_state == newstate ) {
1558 return false;
1559 }
1560 fc_ilog( logger, "old state ${os} becoming ${ns}", ("os", stage_str( sync_state ))( "ns", stage_str( newstate ) ) );
1561 sync_state = newstate;
1562 return true;
1563 }
1564
1565 // called from c's connection strand
1567 std::unique_lock<std::mutex> g( sync_mtx );
1568 if( sync_state == in_sync ) {
1569 sync_source.reset();
1570 }
1571 if( !c ) return;
1572 if( !closing ) {
1573 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1574 if( c->last_handshake_recv.last_irreversible_block_num > sync_known_lib_num ) {
1575 sync_known_lib_num = c->last_handshake_recv.last_irreversible_block_num;
1576 }
1577 } else {
1578 // Closing connection, therefore its view of LIB can no longer be considered as we will no longer be connected.
1579 // Determine current LIB of remaining peers as our sync_known_lib_num.
1580 uint32_t highest_lib_num = 0;
1581 for_each_block_connection( [&highest_lib_num]( const auto& cc ) {
1582 std::lock_guard<std::mutex> g_conn( cc->conn_mtx );
1583 if( cc->current() && cc->last_handshake_recv.last_irreversible_block_num > highest_lib_num ) {
1584 highest_lib_num = cc->last_handshake_recv.last_irreversible_block_num;
1585 }
1586 return true;
1587 } );
1588 sync_known_lib_num = highest_lib_num;
1589
1590 // if closing the connection we are currently syncing from, then reset our last requested and next expected.
1591 if( c == sync_source ) {
1593 // if starting to sync need to always start from lib as we might be on our own fork
1594 uint32_t lib_num = 0;
1595 std::tie( lib_num, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = my_impl->get_chain_info();
1596 sync_next_expected_num = lib_num + 1;
1597 request_next_chunk( std::move(g) );
1598 }
1599 }
1600 }
1601
1602 // call with g_sync locked, called from conn's connection strand
1603 void sync_manager::request_next_chunk( std::unique_lock<std::mutex> g_sync, const connection_ptr& conn ) {
1604 uint32_t fork_head_block_num = 0;
1605 uint32_t lib_block_num = 0;
1606 std::tie( lib_block_num, std::ignore, fork_head_block_num,
1607 std::ignore, std::ignore, std::ignore ) = my_impl->get_chain_info();
1608
1609 fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}",
1610 ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) );
1611
1612 if( fork_head_block_num < sync_last_requested_num && sync_source && sync_source->current() ) {
1613 fc_ilog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}",
1614 ("h", fork_head_block_num)("r", sync_last_requested_num)("e", sync_next_expected_num)
1615 ("k", sync_known_lib_num)("s", sync_req_span)("c", sync_source->connection_id) );
1616 return;
1617 }
1618
1619 /* ----------
1620 * next chunk provider selection criteria
1621 * a provider is supplied and able to be used, use it.
1622 * otherwise select the next available from the list, round-robin style.
1623 */
1624
1625 connection_ptr new_sync_source = sync_source;
1626 if (conn && conn->current() ) {
1627 new_sync_source = conn;
1628 } else {
1629 std::shared_lock<std::shared_mutex> g( my_impl->connections_mtx );
1630 if( my_impl->connections.size() == 0 ) {
1631 new_sync_source.reset();
1632 } else if( my_impl->connections.size() == 1 ) {
1633 if (!new_sync_source) {
1634 new_sync_source = *my_impl->connections.begin();
1635 }
1636 } else {
1637 // init to a linear array search
1638 auto cptr = my_impl->connections.begin();
1639 auto cend = my_impl->connections.end();
1640 // do we remember the previous source?
1641 if (new_sync_source) {
1642 //try to find it in the list
1643 cptr = my_impl->connections.find( new_sync_source );
1644 cend = cptr;
1645 if( cptr == my_impl->connections.end() ) {
1646 //not there - must have been closed! cend is now connections.end, so just flatten the ring.
1647 new_sync_source.reset();
1648 cptr = my_impl->connections.begin();
1649 } else {
1650 //was found - advance the start to the next. cend is the old source.
1651 if( ++cptr == my_impl->connections.end() && cend != my_impl->connections.end() ) {
1652 cptr = my_impl->connections.begin();
1653 }
1654 }
1655 }
1656
1657 //scan the list of peers looking for another able to provide sync blocks.
1658 if( cptr != my_impl->connections.end() ) {
1659 auto cstart_it = cptr;
1660 do {
1661 //select the first one which is current and has valid lib and break out.
1662 if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) {
1663 std::lock_guard<std::mutex> g_conn( (*cptr)->conn_mtx );
1664 if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) {
1665 new_sync_source = *cptr;
1666 break;
1667 }
1668 }
1669 if( ++cptr == my_impl->connections.end() )
1670 cptr = my_impl->connections.begin();
1671 } while( cptr != cstart_it );
1672 }
1673 // no need to check the result, either source advanced or the whole list was checked and the old source is reused.
1674 }
1675 }
1676
1677 // verify there is an available source
1678 if( !new_sync_source || !new_sync_source->current() || new_sync_source->is_transactions_only_connection() ) {
1679 fc_elog( logger, "Unable to continue syncing at this time");
1680 if( !new_sync_source ) sync_source.reset();
1681 sync_known_lib_num = lib_block_num;
1683 set_state( in_sync ); // probably not, but we can't do anything else
1684 return;
1685 }
1686
1687 bool request_sent = false;
1688 if( sync_last_requested_num != sync_known_lib_num ) {
1689 uint32_t start = sync_next_expected_num;
1690 uint32_t end = start + sync_req_span - 1;
1691 if( end > sync_known_lib_num )
1692 end = sync_known_lib_num;
1693 if( end > 0 && end >= start ) {
1694 sync_last_requested_num = end;
1695 sync_source = new_sync_source;
1696 g_sync.unlock();
1697 request_sent = true;
1698 new_sync_source->strand.post( [new_sync_source, start, end]() {
1699 peer_ilog( new_sync_source, "requesting range ${s} to ${e}", ("s", start)("e", end) );
1700 new_sync_source->request_sync_blocks( start, end );
1701 } );
1702 }
1703 }
1704 if( !request_sent ) {
1705 g_sync.unlock();
1707 }
1708 }
1709
1710 // static, thread safe
1712 for_each_connection( []( auto& ci ) {
1713 if( ci->current() ) {
1714 ci->send_handshake();
1715 }
1716 return true;
1717 } );
1718 }
1719
1720 bool sync_manager::is_sync_required( uint32_t fork_head_block_num ) {
1721 fc_dlog( logger, "last req = ${req}, last recv = ${recv} known = ${known} our head = ${head}",
1722 ("req", sync_last_requested_num)( "recv", sync_next_expected_num )( "known", sync_known_lib_num )
1723 ("head", fork_head_block_num ) );
1724
1725 return( sync_last_requested_num < sync_known_lib_num ||
1726 fork_head_block_num < sync_last_requested_num );
1727 }
1728
1729 // called from c's connection strand
1730 void sync_manager::start_sync(const connection_ptr& c, uint32_t target) {
1731 std::unique_lock<std::mutex> g_sync( sync_mtx );
1732 if( target > sync_known_lib_num) {
1733 sync_known_lib_num = target;
1734 }
1735
1736 uint32_t lib_num = 0;
1737 uint32_t fork_head_block_num = 0;
1738 std::tie( lib_num, std::ignore, fork_head_block_num,
1739 std::ignore, std::ignore, std::ignore ) = my_impl->get_chain_info();
1740
1741 if( !is_sync_required( fork_head_block_num ) || target <= lib_num ) {
1742 peer_dlog( c, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
1743 ("b", lib_num)( "h", fork_head_block_num )( "t", target ) );
1744 c->send_handshake();
1745 return;
1746 }
1747
1748 if( sync_state == in_sync ) {
1749 set_state( lib_catchup );
1750 }
1751 sync_next_expected_num = std::max( lib_num + 1, sync_next_expected_num );
1752
1753 // p2p_high_latency_test.py test depends on this exact log statement.
1754 peer_ilog( c, "Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}",
1755 ("cc", sync_last_requested_num)("t", target)("n", sync_next_expected_num) );
1756
1757 request_next_chunk( std::move( g_sync ), c );
1758 }
1759
1760 // called from connection strand
1762 std::unique_lock<std::mutex> g( sync_mtx );
1763 peer_ilog( c, "reassign_fetch, our last req is ${cc}, next expected is ${ne}",
1764 ("cc", sync_last_requested_num)("ne", sync_next_expected_num) );
1765
1766 if( c == sync_source ) {
1767 c->cancel_sync(reason);
1769 request_next_chunk( std::move(g) );
1770 }
1771 }
1772
1773 // called from c's connection strand
1775
1776 if( c->is_transactions_only_connection() ) return;
1777
1778 uint32_t lib_num = 0;
1780 uint32_t head = 0;
1781 block_id_type head_id;
1782 std::tie( lib_num, std::ignore, head,
1783 std::ignore, std::ignore, head_id ) = my_impl->get_chain_info();
1784
1785 sync_reset_lib_num(c, false);
1786
1787 auto current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
1788 int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds
1789 if( network_latency_ns < 0 ) {
1790 peer_wlog(c, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000));
1791 network_latency_ns = 0;
1792 }
1793 // number of blocks syncing node is behind from a peer node
1794 uint32_t nblk_behind_by_net_latency = static_cast<uint32_t>(network_latency_ns / block_interval_ns);
1795 // 2x for time it takes for message to reach back to peer node, +1 to compensate for integer division truncation
1796 uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency + 1;
1797 // message in the log below is used in p2p_high_latency_test.py test
1798 peer_dlog(c, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
1799 ("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));
1800
1801 //--------------------------------
1802 // sync need checks; (lib == last irreversible block)
1803 //
1804 // 0. my head block id == peer head id means we are all caught up block wise
1805 // 1. my head block num < peer lib - start sync locally
1806 // 2. my lib > peer head num + nblk_combined_latency - send last_irr_catch_up notice if not the first generation
1807 //
1808 // 3 my head block num + nblk_combined_latency < peer head block num - update sync state and send a catchup request
1809 // 4 my head block num >= peer block num + nblk_combined_latency send a notice catchup if this is not the first generation
1810 // 4.1 if peer appears to be on a different fork ( our_id_for( msg.head_num ) != msg.head_id )
1811 // then request peer's blocks
1812 //
1813 //-----------------------------
1814
1815 if (head_id == msg.head_id) {
1816 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 0",
1817 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1818 c->syncing = false;
1819 notice_message note;
1820 note.known_blocks.mode = none;
1821 note.known_trx.mode = catch_up;
1822 note.known_trx.pending = 0;
1823 c->enqueue( note );
1824 return;
1825 }
1826 if (head < peer_lib) {
1827 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 1",
1828 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1829 c->syncing = false;
1830 if (c->sent_handshake_count > 0) {
1831 c->send_handshake();
1832 }
1833 return;
1834 }
1835 if (lib_num > msg.head_num + nblk_combined_latency) {
1836 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 2",
1837 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1838 if (msg.generation > 1 || c->protocol_version > proto_base) {
1839 notice_message note;
1840 note.known_trx.pending = lib_num;
1843 note.known_blocks.pending = head;
1844 c->enqueue( note );
1845 }
1846 c->syncing = true;
1847 return;
1848 }
1849
1850 if (head + nblk_combined_latency < msg.head_num ) {
1851 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 3",
1852 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1853 c->syncing = false;
1854 verify_catchup(c, msg.head_num, msg.head_id);
1855 return;
1856 } else if(head >= msg.head_num + nblk_combined_latency) {
1857 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 4",
1858 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1859 if (msg.generation > 1 || c->protocol_version > proto_base) {
1860 notice_message note;
1861 note.known_trx.mode = none;
1862 note.known_blocks.mode = catch_up;
1863 note.known_blocks.pending = head;
1864 note.known_blocks.ids.push_back(head_id);
1865 c->enqueue( note );
1866 }
1867 c->syncing = false;
1868 app().post( priority::medium, [chain_plug = my_impl->chain_plug, c,
1869 msg_head_num = msg.head_num, msg_head_id = msg.head_id]() {
1870 bool on_fork = true;
1871 try {
1872 controller& cc = chain_plug->chain();
1873 on_fork = cc.get_block_id_for_num( msg_head_num ) != msg_head_id;
1874 } catch( ... ) {}
1875 if( on_fork ) {
1876 c->strand.post( [c]() {
1877 request_message req;
1878 req.req_blocks.mode = catch_up;
1879 req.req_trx.mode = none;
1880 c->enqueue( req );
1881 } );
1882 }
1883 } );
1884 return;
1885 } else {
1886 peer_dlog( c, "Block discrepancy is within network latency range.");
1887 }
1888 }
1889
1890 // called from c's connection strand
1891 bool sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) {
1892 request_message req;
1893 req.req_blocks.mode = catch_up;
1894 for_each_block_connection( [num, &id, &req]( const auto& cc ) {
1895 std::lock_guard<std::mutex> g_conn( cc->conn_mtx );
1896 if( cc->fork_head_num > num || cc->fork_head == id ) {
1897 req.req_blocks.mode = none;
1898 return false;
1899 }
1900 return true;
1901 } );
1902 if( req.req_blocks.mode == catch_up ) {
1903 {
1904 std::lock_guard<std::mutex> g( sync_mtx );
1905 peer_ilog( c, "catch_up while in ${s}, fork head num = ${fhn} "
1906 "target LIB = ${lib} next_expected = ${ne}, id ${id}...",
1907 ("s", stage_str( sync_state ))("fhn", num)("lib", sync_known_lib_num)
1908 ("ne", sync_next_expected_num)("id", id.str().substr( 8, 16 )) );
1909 }
1910 uint32_t lib;
1911 block_id_type head_id;
1912 std::tie( lib, std::ignore, std::ignore,
1913 std::ignore, std::ignore, head_id ) = my_impl->get_chain_info();
1914 if( sync_state == lib_catchup || num < lib )
1915 return false;
1916 set_state( head_catchup );
1917 {
1918 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1919 c->fork_head = id;
1920 c->fork_head_num = num;
1921 }
1922
1923 req.req_blocks.ids.emplace_back( head_id );
1924 } else {
1925 peer_ilog( c, "none notice while in ${s}, fork head num = ${fhn}, id ${id}...",
1926 ("s", stage_str( sync_state ))("fhn", num)("id", id.str().substr(8,16)) );
1927 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1928 c->fork_head = block_id_type();
1929 c->fork_head_num = 0;
1930 }
1931 req.req_trx.mode = none;
1932 c->enqueue( req );
1933 return true;
1934 }
1935
1936 // called from c's connection strand
1938 peer_dlog( c, "sync_manager got ${m} block notice", ("m", modes_str( msg.known_blocks.mode )) );
1939 SYS_ASSERT( msg.known_blocks.mode == catch_up || msg.known_blocks.mode == last_irr_catch_up, plugin_exception,
1940 "sync_recv_notice only called on catch_up" );
1941 if (msg.known_blocks.mode == catch_up) {
1942 if (msg.known_blocks.ids.size() == 0) {
1943 peer_elog( c, "got a catch up with ids size = 0" );
1944 } else {
1945 const block_id_type& id = msg.known_blocks.ids.back();
1946 peer_ilog( c, "notice_message, pending ${p}, blk_num ${n}, id ${id}...",
1947 ("p", msg.known_blocks.pending)("n", block_header::num_from_id(id))("id",id.str().substr(8,16)) );
1948 if( !my_impl->dispatcher->have_block( id ) ) {
1949 verify_catchup( c, msg.known_blocks.pending, id );
1950 } else {
1951 // we already have the block, so update peer with our view of the world
1952 c->send_handshake();
1953 }
1954 }
1955 } else if (msg.known_blocks.mode == last_irr_catch_up) {
1956 {
1957 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1958 c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
1959 }
1960 sync_reset_lib_num(c, false);
1961 start_sync(c, msg.known_trx.pending);
1962 }
1963 }
1964
1965 // called from connection strand
1967 c->block_status_monitor_.rejected();
1968 std::unique_lock<std::mutex> g( sync_mtx );
1970 if( c->block_status_monitor_.max_events_violated()) {
1971 peer_wlog( c, "block ${bn} not accepted, closing connection", ("bn", blk_num) );
1972 sync_source.reset();
1973 g.unlock();
1974 c->close();
1975 } else {
1976 g.unlock();
1977 c->send_handshake();
1978 }
1979 }
1980
1981 // called from connection strand
1982 void sync_manager::sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied ) {
1983 std::unique_lock<std::mutex> g_sync( sync_mtx );
1984 if( blk_num <= sync_last_requested_num ) {
1985 peer_dlog( c, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}",
1986 ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) );
1987 if (blk_num != sync_next_expected_num && !blk_applied) {
1988 auto sync_next_expected = sync_next_expected_num;
1989 g_sync.unlock();
1990 peer_dlog( c, "expected block ${ne} but got ${bn}", ("ne", sync_next_expected)("bn", blk_num) );
1991 return;
1992 }
1993 sync_next_expected_num = blk_num + 1;
1994 }
1995 }
1996
1997 // called from c's connection strand
1998 void sync_manager::sync_recv_block(const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied) {
1999 peer_dlog( c, "got block ${bn}", ("bn", blk_num) );
2000 if( app().is_quiting() ) {
2001 c->close( false, true );
2002 return;
2003 }
2004 c->block_status_monitor_.accepted();
2005 sync_update_expected( c, blk_id, blk_num, blk_applied );
2006 std::unique_lock<std::mutex> g_sync( sync_mtx );
2007 stages state = sync_state;
2008 peer_dlog( c, "state ${s}", ("s", stage_str( state )) );
2009 if( state == head_catchup ) {
2010 peer_dlog( c, "sync_manager in head_catchup state" );
2011 sync_source.reset();
2012 g_sync.unlock();
2013
2014 block_id_type null_id;
2015 bool set_state_to_head_catchup = false;
2016 for_each_block_connection( [&null_id, blk_num, &blk_id, &c, &set_state_to_head_catchup]( const auto& cp ) {
2017 std::unique_lock<std::mutex> g_cp_conn( cp->conn_mtx );
2018 uint32_t fork_head_num = cp->fork_head_num;
2019 block_id_type fork_head_id = cp->fork_head;
2020 g_cp_conn.unlock();
2021 if( fork_head_id == null_id ) {
2022 // continue
2023 } else if( fork_head_num < blk_num || fork_head_id == blk_id ) {
2024 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
2025 c->fork_head = null_id;
2026 c->fork_head_num = 0;
2027 } else {
2028 set_state_to_head_catchup = true;
2029 }
2030 return true;
2031 } );
2032
2033 if( set_state_to_head_catchup ) {
2034 if( set_state( head_catchup ) ) {
2036 }
2037 } else {
2038 set_state( in_sync );
2040 }
2041 } else if( state == lib_catchup ) {
2042 if( blk_num >= sync_known_lib_num ) {
2043 peer_dlog( c, "All caught up with last known last irreversible block resending handshake" );
2044 set_state( in_sync );
2045 g_sync.unlock();
2047 } else if( blk_num >= sync_last_requested_num ) {
2048 request_next_chunk( std::move( g_sync) );
2049 } else {
2050 g_sync.unlock();
2051 peer_dlog( c, "calling sync_wait" );
2052 c->sync_wait();
2053 }
2054 }
2055 }
2056
2057 //------------------------------------------------------------------------
2058
2059 // thread safe
2060 bool dispatch_manager::add_peer_block( const block_id_type& blkid, uint32_t connection_id) {
2061 std::lock_guard<std::mutex> g( blk_state_mtx );
2062 auto bptr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
2063 bool added = (bptr == blk_state.end());
2064 if( added ) {
2065 blk_state.insert( {blkid, block_header::num_from_id( blkid ), connection_id, true} );
2066 } else if( !bptr->have_block ) {
2067 blk_state.modify( bptr, []( auto& pb ) {
2068 pb.have_block = true;
2069 });
2070 }
2071 return added;
2072 }
2073
2074 bool dispatch_manager::peer_has_block( const block_id_type& blkid, uint32_t connection_id ) const {
2075 std::lock_guard<std::mutex> g(blk_state_mtx);
2076 const auto blk_itr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
2077 return blk_itr != blk_state.end();
2078 }
2079
2080 bool dispatch_manager::have_block( const block_id_type& blkid ) const {
2081 std::lock_guard<std::mutex> g(blk_state_mtx);
2082 // by_peer_block_id sorts have_block by greater so have_block == true will be the first one found
2083 const auto& index = blk_state.get<by_peer_block_id>();
2084 auto blk_itr = index.find( blkid );
2085 if( blk_itr != index.end() ) {
2086 return blk_itr->have_block;
2087 }
2088 return false;
2089 }
2090
2092 uint32_t connection_id, const time_point_sec& now ) {
2093 std::lock_guard<std::mutex> g( local_txns_mtx );
2094 auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref( id ), connection_id ) );
2095 bool added = (tptr == local_txns.end());
2096 if( added ) {
2097 // expire at either transaction expiration or configured max expire time whichever is less
2098 time_point_sec expires = now + my_impl->p2p_dedup_cache_expire_time_us;
2099 expires = std::min( trx_expires, expires );
2100 local_txns.insert( node_transaction_state{
2101 .id = id,
2102 .expires = expires,
2103 .connection_id = connection_id} );
2104 }
2105 return added;
2106 }
2107
2109 std::lock_guard<std::mutex> g( local_txns_mtx );
2110 const auto tptr = local_txns.get<by_id>().find( tid );
2111 return tptr != local_txns.end();
2112 }
2113
2115 size_t start_size = 0, end_size = 0;
2116
2117 std::unique_lock<std::mutex> g( local_txns_mtx );
2118 start_size = local_txns.size();
2119 auto& old = local_txns.get<by_expiry>();
2120 auto ex_lo = old.lower_bound( fc::time_point_sec( 0 ) );
2121 auto ex_up = old.upper_bound( time_point::now() );
2122 old.erase( ex_lo, ex_up );
2123 g.unlock();
2124
2125 fc_dlog( logger, "expire_local_txns size ${s} removed ${r}", ("s", start_size)( "r", start_size - end_size ) );
2126 }
2127
2129 std::lock_guard<std::mutex> g(blk_state_mtx);
2130 auto& stale_blk = blk_state.get<by_block_num>();
2131 stale_blk.erase( stale_blk.lower_bound(1), stale_blk.upper_bound(lib_num) );
2132 }
2133
2134 // thread safe
2136 fc_dlog( logger, "bcast block ${b}", ("b", b->block_num()) );
2137
2138 if( my_impl->sync_master->syncing_with_peer() ) return;
2139
2140 block_buffer_factory buff_factory;
2141 const auto bnum = b->block_num();
2142 for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) {
2143 fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}",
2144 ("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) );
2145 if( !cp->current() ) return true;
2146 send_buffer_type sb = buff_factory.get_send_buffer( b );
2147
2148 cp->strand.post( [this, cp, id, bnum, sb{std::move(sb)}]() {
2149 cp->latest_blk_time = cp->get_time();
2150 std::unique_lock<std::mutex> g_conn( cp->conn_mtx );
2151 bool has_block = cp->last_handshake_recv.last_irreversible_block_num >= bnum;
2152 g_conn.unlock();
2153 if( !has_block ) {
2154 if( !add_peer_block( id, cp->connection_id ) ) {
2155 peer_dlog( cp, "not bcast block ${b}", ("b", bnum) );
2156 return;
2157 }
2158 peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
2159 cp->enqueue_buffer( sb, no_reason );
2160 }
2161 });
2162 return true;
2163 } );
2164 }
2165
2166 // called from c's connection strand
2168 std::unique_lock<std::mutex> g( c->conn_mtx );
2169 if (c &&
2170 c->last_req &&
2171 c->last_req->req_blocks.mode != none &&
2172 !c->last_req->req_blocks.ids.empty() &&
2173 c->last_req->req_blocks.ids.back() == id) {
2174 peer_dlog( c, "resetting last_req" );
2175 c->last_req.reset();
2176 }
2177 g.unlock();
2178
2179 peer_dlog(c, "canceling wait");
2180 c->cancel_wait();
2181 }
2182
2184 fc_dlog( logger, "rejected block ${id}", ("id", id) );
2185 }
2186
2188 trx_buffer_factory buff_factory;
2189 const auto now = fc::time_point::now();
2190 for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) {
2191 if( cp->is_blocks_only_connection() || !cp->current() ) {
2192 return true;
2193 }
2194 if( !add_peer_txn(trx->id(), trx->expiration(), cp->connection_id, now) ) {
2195 return true;
2196 }
2197
2198 send_buffer_type sb = buff_factory.get_send_buffer( trx );
2199 fc_dlog( logger, "sending trx: ${id}, to connection ${cid}", ("id", trx->id())("cid", cp->connection_id) );
2200 cp->strand.post( [cp, sb{std::move(sb)}]() {
2201 cp->enqueue_buffer( sb, no_reason );
2202 } );
2203 return true;
2204 } );
2205 }
2206
2208 fc_dlog( logger, "not sending rejected transaction ${tid}", ("tid", trx->id()) );
2209 // keep rejected transaction around for awhile so we don't broadcast it, don't remove from local_txns
2210 }
2211
2212 // called from c's connection strand
2213 void dispatch_manager::recv_notice(const connection_ptr& c, const notice_message& msg, bool generated) {
2214 if (msg.known_trx.mode == normal) {
2215 } else if (msg.known_trx.mode != none) {
2216 peer_elog( c, "passed a notice_message with something other than a normal on none known_trx" );
2217 return;
2218 }
2219 if (msg.known_blocks.mode == normal) {
2220 // known_blocks.ids is never > 1
2221 if( !msg.known_blocks.ids.empty() ) {
2222 if( msg.known_blocks.pending == 1 ) { // block id notify of 2.0.0, ignore
2223 return;
2224 }
2225 }
2226 } else if (msg.known_blocks.mode != none) {
2227 peer_elog( c, "passed a notice_message with something other than a normal on none known_blocks" );
2228 return;
2229 }
2230 }
2231
2232 // called from c's connection strand
2234 peer_dlog( c, "retry fetch" );
2235 request_message last_req;
2236 block_id_type bid;
2237 {
2238 std::lock_guard<std::mutex> g_c_conn( c->conn_mtx );
2239 if( !c->last_req ) {
2240 return;
2241 }
2242 peer_wlog( c, "failed to fetch from peer" );
2243 if( c->last_req->req_blocks.mode == normal && !c->last_req->req_blocks.ids.empty() ) {
2244 bid = c->last_req->req_blocks.ids.back();
2245 } else {
2246 peer_wlog( c, "no retry, block mpde = ${b} trx mode = ${t}",
2247 ("b", modes_str( c->last_req->req_blocks.mode ))( "t", modes_str( c->last_req->req_trx.mode ) ) );
2248 return;
2249 }
2250 last_req = *c->last_req;
2251 }
2252 for_each_block_connection( [this, &c, &last_req, &bid]( auto& conn ) {
2253 if( conn == c )
2254 return true;
2255
2256 {
2257 std::lock_guard<std::mutex> guard( conn->conn_mtx );
2258 if( conn->last_req ) {
2259 return true;
2260 }
2261 }
2262
2263 bool sendit = peer_has_block( bid, conn->connection_id );
2264 if( sendit ) {
2265 conn->strand.post( [conn, last_req{std::move(last_req)}]() {
2266 conn->enqueue( last_req );
2267 conn->fetch_wait();
2268 std::lock_guard<std::mutex> g_conn_conn( conn->conn_mtx );
2269 conn->last_req = last_req;
2270 } );
2271 return false;
2272 }
2273 return true;
2274 } );
2275
2276 // at this point no other peer has it, re-request or do nothing?
2277 peer_wlog( c, "no peer has last_req" );
2278 if( c->connected() ) {
2279 c->enqueue( last_req );
2280 c->fetch_wait();
2281 }
2282 }
2283
2284 //------------------------------------------------------------------------
2285
2286 // called from any thread
2288 switch ( no_retry ) {
2289 case no_reason:
2290 case wrong_version:
2291 case benign_other:
2292 case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
2293 break;
2294 default:
2295 fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
2296 return false;
2297 }
2298
2299 string::size_type colon = peer_address().find(':');
2300 if (colon == std::string::npos || colon == 0) {
2301 fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address()) );
2302 return false;
2303 }
2304
2305 connection_ptr c = shared_from_this();
2306
2308 auto connector_period_us = std::chrono::duration_cast<std::chrono::microseconds>( my_impl->connector_period );
2309 std::lock_guard<std::mutex> g( c->conn_mtx );
2310 if( last_close == fc::time_point() || last_close > fc::time_point::now() - fc::microseconds( connector_period_us.count() ) ) {
2311 return true; // true so doesn't remove from valid connections
2312 }
2313 }
2314
2315 strand.post([c]() {
2316 string::size_type colon = c->peer_address().find(':');
2317 string::size_type colon2 = c->peer_address().find(':', colon + 1);
2318 string host = c->peer_address().substr( 0, colon );
2319 string port = c->peer_address().substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1));
2320 c->set_connection_type( c->peer_address() );
2321
2322 auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool->get_executor() );
2323 connection_wptr weak_conn = c;
2324 // Note: need to add support for IPv6 too
2325 resolver->async_resolve( tcp::v4(), host, port, boost::asio::bind_executor( c->strand,
2326 [resolver, weak_conn, host, port]( const boost::system::error_code& err, tcp::resolver::results_type endpoints ) {
2327 auto c = weak_conn.lock();
2328 if( !c ) return;
2329 if( !err ) {
2330 c->connect( resolver, endpoints );
2331 } else {
2332 fc_elog( logger, "Unable to resolve ${host}:${port} ${error}",
2333 ("host", host)("port", port)( "error", err.message() ) );
2334 c->connecting = false;
2335 ++c->consecutive_immediate_connection_close;
2336 }
2337 } ) );
2338 } );
2339 return true;
2340 }
2341
2342 // called from connection strand
2343 void connection::connect( const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints ) {
2344 connecting = true;
2347 boost::asio::async_connect( *socket, endpoints,
2348 boost::asio::bind_executor( strand,
2349 [resolver, c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
2350 if( !err && socket->is_open() && socket == c->socket ) {
2351 if( c->start_session() ) {
2352 c->send_handshake();
2353 }
2354 } else {
2355 fc_elog( logger, "connection failed to ${host}:${port} ${error}",
2356 ("host", endpoint.address().to_string())("port", endpoint.port())( "error", err.message()));
2357 c->close( false );
2358 }
2359 } ) );
2360 }
2361
2363 connection_ptr new_connection = std::make_shared<connection>();
2364 new_connection->connecting = true;
2365 new_connection->strand.post( [this, new_connection = std::move( new_connection )](){
2366 acceptor->async_accept( *new_connection->socket,
2367 boost::asio::bind_executor( new_connection->strand, [new_connection, socket=new_connection->socket, this]( boost::system::error_code ec ) {
2368 if( !ec ) {
2369 uint32_t visitors = 0;
2370 uint32_t from_addr = 0;
2371 boost::system::error_code rec;
2372 const auto& paddr_add = socket->remote_endpoint( rec ).address();
2373 string paddr_str;
2374 if( rec ) {
2375 fc_elog( logger, "Error getting remote endpoint: ${m}", ("m", rec.message()));
2376 } else {
2377 paddr_str = paddr_add.to_string();
2378 for_each_connection( [&visitors, &from_addr, &paddr_str]( auto& conn ) {
2379 if( conn->socket_is_open()) {
2380 if( conn->peer_address().empty()) {
2381 ++visitors;
2382 std::lock_guard<std::mutex> g_conn( conn->conn_mtx );
2383 if( paddr_str == conn->remote_endpoint_ip ) {
2384 ++from_addr;
2385 }
2386 }
2387 }
2388 return true;
2389 } );
2390 if( from_addr < max_nodes_per_host && (max_client_count == 0 || visitors < max_client_count)) {
2391 fc_ilog( logger, "Accepted new connection: " + paddr_str );
2392 new_connection->set_heartbeat_timeout( heartbeat_timeout );
2393 if( new_connection->start_session()) {
2394 std::lock_guard<std::shared_mutex> g_unique( connections_mtx );
2395 connections.insert( new_connection );
2396 }
2397
2398 } else {
2399 if( from_addr >= max_nodes_per_host ) {
2400 fc_dlog( logger, "Number of connections (${n}) from ${ra} exceeds limit ${l}",
2401 ("n", from_addr + 1)( "ra", paddr_str )( "l", max_nodes_per_host ));
2402 } else {
2403 fc_dlog( logger, "max_client_count ${m} exceeded", ("m", max_client_count));
2404 }
2405 // new_connection never added to connections and start_session not called, lifetime will end
2406 boost::system::error_code ec;
2407 socket->shutdown( tcp::socket::shutdown_both, ec );
2408 socket->close( ec );
2409 }
2410 }
2411 } else {
2412 fc_elog( logger, "Error accepting connection: ${m}", ("m", ec.message()));
2413 // For the listed error codes below, recall start_listen_loop()
2414 switch (ec.value()) {
2415 case ECONNABORTED:
2416 case EMFILE:
2417 case ENFILE:
2418 case ENOBUFS:
2419 case ENOMEM:
2420 case EPROTO:
2421 break;
2422 default:
2423 return;
2424 }
2425 }
2427 }));
2428 } );
2429 }
2430
2431 // only called from strand thread
2433 try {
2434 std::size_t minimum_read =
2435 std::atomic_exchange<decltype(outstanding_read_bytes.load())>( &outstanding_read_bytes, 0 );
2436 minimum_read = minimum_read != 0 ? minimum_read : message_header_size;
2437
2438 if (my_impl->use_socket_read_watermark) {
2439 const size_t max_socket_read_watermark = 4096;
2440 std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);
2441 boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);
2442 boost::system::error_code ec;
2443 socket->set_option( read_watermark_opt, ec );
2444 if( ec ) {
2445 peer_elog( this, "unable to set read watermark: ${e1}", ("e1", ec.message()) );
2446 }
2447 }
2448
2449 auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {
2450 if (ec || bytes_transferred >= minimum_read ) {
2451 return 0;
2452 } else {
2453 return minimum_read - bytes_transferred;
2454 }
2455 };
2456
2457 uint32_t write_queue_size = buffer_queue.write_queue_size();
2458 if( write_queue_size > def_max_write_queue_size ) {
2459 peer_elog( this, "write queue full ${s} bytes, giving up on connection, closing", ("s", write_queue_size) );
2460 close( false );
2461 return;
2462 }
2463
2464 boost::asio::async_read( *socket,
2466 boost::asio::bind_executor( strand,
2467 [conn = shared_from_this(), socket=socket]( boost::system::error_code ec, std::size_t bytes_transferred ) {
2468 // may have closed connection and cleared pending_message_buffer
2469 if( !conn->socket_is_open() || socket != conn->socket ) return;
2470
2471 bool close_connection = false;
2472 try {
2473 if( !ec ) {
2474 if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
2475 peer_elog( conn, "async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",
2476 ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()) );
2477 }
2478 SYS_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write(), plugin_exception, "");
2479 conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
2480 while (conn->pending_message_buffer.bytes_to_read() > 0) {
2481 uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
2482
2483 if (bytes_in_buffer < message_header_size) {
2484 conn->outstanding_read_bytes = message_header_size - bytes_in_buffer;
2485 break;
2486 } else {
2487 uint32_t message_length;
2488 auto index = conn->pending_message_buffer.read_index();
2489 conn->pending_message_buffer.peek(&message_length, sizeof(message_length), index);
2490 if(message_length > def_send_buffer_size*2 || message_length == 0) {
2491 peer_elog( conn, "incoming message length unexpected (${i})", ("i", message_length) );
2492 close_connection = true;
2493 break;
2494 }
2495
2496 auto total_message_bytes = message_length + message_header_size;
2497
2498 if (bytes_in_buffer >= total_message_bytes) {
2499 conn->pending_message_buffer.advance_read_ptr(message_header_size);
2500 conn->consecutive_immediate_connection_close = 0;
2501 if (!conn->process_next_message(message_length)) {
2502 return;
2503 }
2504 } else {
2505 auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;
2506 auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();
2507 if (outstanding_message_bytes > available_buffer_bytes) {
2508 conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );
2509 }
2510
2511 conn->outstanding_read_bytes = outstanding_message_bytes;
2512 break;
2513 }
2514 }
2515 }
2516 if( !close_connection ) conn->start_read_message();
2517 } else {
2518 if (ec.value() != boost::asio::error::eof) {
2519 peer_elog( conn, "Error reading message: ${m}", ( "m", ec.message() ) );
2520 } else {
2521 peer_ilog( conn, "Peer closed connection" );
2522 }
2523 close_connection = true;
2524 }
2525 }
2526 catch ( const std::bad_alloc& )
2527 {
2528 throw;
2529 }
2530 catch ( const boost::interprocess::bad_alloc& )
2531 {
2532 throw;
2533 }
2534 catch(const fc::exception &ex)
2535 {
2536 peer_elog( conn, "Exception in handling read data ${s}", ("s",ex.to_string()) );
2537 close_connection = true;
2538 }
2539 catch(const std::exception &ex) {
2540 peer_elog( conn, "Exception in handling read data: ${s}", ("s",ex.what()) );
2541 close_connection = true;
2542 }
2543 catch (...) {
2544 peer_elog( conn, "Undefined exception handling read data" );
2545 close_connection = true;
2546 }
2547
2548 if( close_connection ) {
2549 peer_elog( conn, "Closing connection" );
2550 conn->close();
2551 }
2552 }));
2553 } catch (...) {
2554 peer_elog( this, "Undefined exception in start_read_message, closing connection" );
2555 close();
2556 }
2557 }
2558
2559 // called from connection strand
2561 try {
2563
2564 // if next message is a block we already have, exit early
2566 unsigned_int which{};
2567 fc::raw::unpack( peek_ds, which );
2568 if( which == signed_block_which ) {
2570 return process_next_block_message( message_length );
2571
2572 } else if( which == packed_transaction_which ) {
2573 return process_next_trx_message( message_length );
2574
2575 } else {
2577 net_message msg;
2578 fc::raw::unpack( ds, msg );
2579 msg_handler m( shared_from_this() );
2580 std::visit( m, msg );
2581 }
2582
2583 } catch( const fc::exception& e ) {
2584 peer_elog( this, "Exception in handling message: ${s}", ("s", e.to_detail_string()) );
2585 close();
2586 return false;
2587 }
2588 return true;
2589 }
2590
2591 // called from connection strand
2592 bool connection::process_next_block_message(uint32_t message_length) {
2594 unsigned_int which{};
2595 fc::raw::unpack( peek_ds, which ); // throw away
2596 block_header bh;
2597 fc::raw::unpack( peek_ds, bh );
2598
2599 const block_id_type blk_id = bh.calculate_id();
2600 const uint32_t blk_num = bh.block_num();
2601 if( my_impl->dispatcher->have_block( blk_id ) ) {
2602 peer_dlog( this, "canceling wait, already received block ${num}, id ${id}...",
2603 ("num", blk_num)("id", blk_id.str().substr(8,16)) );
2604 my_impl->sync_master->sync_recv_block( shared_from_this(), blk_id, blk_num, false );
2605 cancel_wait();
2606
2607 pending_message_buffer.advance_read_ptr( message_length );
2608 return true;
2609 }
2610 peer_dlog( this, "received block ${num}, id ${id}..., latency: ${latency}",
2611 ("num", bh.block_num())("id", blk_id.str().substr(8,16))
2612 ("latency", (fc::time_point::now() - bh.timestamp).count()/1000) );
2613 if( !my_impl->sync_master->syncing_with_peer() ) { // guard against peer thinking it needs to send us old blocks
2614 uint32_t lib = 0;
2615 std::tie( lib, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = my_impl->get_chain_info();
2616 if( blk_num < lib ) {
2617 std::unique_lock<std::mutex> g( conn_mtx );
2618 const auto last_sent_lib = last_handshake_sent.last_irreversible_block_num;
2619 g.unlock();
2620 peer_ilog( this, "received block ${n} less than ${which}lib ${lib}",
2621 ("n", blk_num)("which", blk_num < last_sent_lib ? "sent " : "")
2622 ("lib", blk_num < last_sent_lib ? last_sent_lib : lib) );
2623 my_impl->sync_master->reset_last_requested_num(my_impl->sync_master->locked_sync_mutex());
2624 enqueue( (sync_request_message) {0, 0} );
2626 cancel_wait();
2627
2628 pending_message_buffer.advance_read_ptr( message_length );
2629 return true;
2630 }
2631 }
2632
2634 fc::raw::unpack( ds, which );
2635 shared_ptr<signed_block> ptr = std::make_shared<signed_block>();
2636 fc::raw::unpack( ds, *ptr );
2637
2638 auto is_webauthn_sig = []( const fc::crypto::signature& s ) {
2640 };
2641 bool has_webauthn_sig = is_webauthn_sig( ptr->producer_signature );
2642
2643 constexpr auto additional_sigs_eid = additional_block_signatures_extension::extension_id();
2644 auto exts = ptr->validate_and_extract_extensions();
2645 if( exts.count( additional_sigs_eid ) ) {
2646 const auto &additional_sigs = std::get<additional_block_signatures_extension>(exts.lower_bound( additional_sigs_eid )->second).signatures;
2647 has_webauthn_sig |= std::any_of( additional_sigs.begin(), additional_sigs.end(), is_webauthn_sig );
2648 }
2649
2650 if( has_webauthn_sig ) {
2651 peer_dlog( this, "WebAuthn signed block received, closing connection" );
2652 close();
2653 return false;
2654 }
2655
2656 handle_message( blk_id, std::move( ptr ) );
2657 return true;
2658 }
2659
2660 // called from connection strand
2661 bool connection::process_next_trx_message(uint32_t message_length) {
2662 if( !my_impl->p2p_accept_transactions ) {
2663 peer_dlog( this, "p2p-accept-transaction=false - dropping txn" );
2664 pending_message_buffer.advance_read_ptr( message_length );
2665 return true;
2666 }
2667
2668 const unsigned long trx_in_progress_sz = this->trx_in_progress_size.load();
2669
2671 unsigned_int which{};
2672 fc::raw::unpack( ds, which );
2673 shared_ptr<packed_transaction> ptr = std::make_shared<packed_transaction>();
2674 fc::raw::unpack( ds, *ptr );
2675 if( trx_in_progress_sz > def_max_trx_in_progress_size) {
2676 char reason[72];
2677 snprintf(reason, 72, "Dropping trx, too many trx in progress %lu bytes", trx_in_progress_sz);
2678 my_impl->producer_plug->log_failed_transaction(ptr->id(), ptr, reason);
2681 peer_wlog(this, reason);
2682 }
2683 return true;
2684 }
2685 bool have_trx = my_impl->dispatcher->have_txn( ptr->id() );
2686 my_impl->dispatcher->add_peer_txn( ptr->id(), ptr->expiration(), connection_id );
2687
2688 if( have_trx ) {
2689 peer_dlog( this, "got a duplicate transaction - dropping" );
2690 return true;
2691 }
2692
2693 handle_message( std::move( ptr ) );
2694 return true;
2695 }
2696
2697 // call only from main application thread
2699 controller& cc = chain_plug->chain();
2700 std::lock_guard<std::mutex> g( chain_info_mtx );
2701 chain_lib_num = cc.last_irreversible_block_num();
2702 chain_lib_id = cc.last_irreversible_block_id();
2703 chain_head_blk_num = cc.head_block_num();
2704 chain_head_blk_id = cc.head_block_id();
2705 chain_fork_head_blk_num = cc.fork_db_pending_head_block_num();
2706 chain_fork_head_blk_id = cc.fork_db_pending_head_block_id();
2707 fc_dlog( logger, "updating chain info lib ${lib}, head ${head}, fork ${fork}",
2708 ("lib", chain_lib_num)("head", chain_head_blk_num)("fork", chain_fork_head_blk_num) );
2709 }
2710
2711 // lib_num, head_blk_num, fork_head_blk_num, lib_id, head_blk_id, fork_head_blk_id
2712 std::tuple<uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type>
2714 std::lock_guard<std::mutex> g( chain_info_mtx );
2715 return std::make_tuple(
2716 chain_lib_num, chain_head_blk_num, chain_fork_head_blk_num,
2717 chain_lib_id, chain_head_blk_id, chain_fork_head_blk_id );
2718 }
2719
2720 bool connection::is_valid( const handshake_message& msg ) const {
2721 // Do some basic validation of an incoming handshake_message, so things
2722 // that really aren't handshake messages can be quickly discarded without
2723 // affecting state.
2724 bool valid = true;
2725 if (msg.last_irreversible_block_num > msg.head_num) {
2726 peer_wlog( this, "Handshake message validation: last irreversible block (${i}) is greater than head block (${h})",
2727 ("i", msg.last_irreversible_block_num)("h", msg.head_num) );
2728 valid = false;
2729 }
2730 if (msg.p2p_address.empty()) {
2731 peer_wlog( this, "Handshake message validation: p2p_address is null string" );
2732 valid = false;
2733 } else if( msg.p2p_address.length() > max_handshake_str_length ) {
2734 // see max_handshake_str_length comment in protocol.hpp
2735 peer_wlog( this, "Handshake message validation: p2p_address too large: ${p}",
2736 ("p", msg.p2p_address.substr(0, max_handshake_str_length) + "...") );
2737 valid = false;
2738 }
2739 if (msg.os.empty()) {
2740 peer_wlog( this, "Handshake message validation: os field is null string" );
2741 valid = false;
2742 } else if( msg.os.length() > max_handshake_str_length ) {
2743 peer_wlog( this, "Handshake message validation: os field too large: ${p}",
2744 ("p", msg.os.substr(0, max_handshake_str_length) + "...") );
2745 valid = false;
2746 }
2747 if( msg.agent.length() > max_handshake_str_length ) {
2748 peer_wlog( this, "Handshake message validation: agent field too large: ${p}",
2749 ("p", msg.agent.substr(0, max_handshake_str_length) + "...") );
2750 valid = false;
2751 }
2752 if ((msg.sig != chain::signature_type() || msg.token != sha256()) && (msg.token != fc::sha256::hash(msg.time))) {
2753 peer_wlog( this, "Handshake message validation: token field invalid" );
2754 valid = false;
2755 }
2756 return valid;
2757 }
2758
2760 peer_dlog(this, "received chain_size_message");
2761 }
2762
2764 peer_dlog( this, "received handshake_message" );
2765 if( !is_valid( msg ) ) {
2766 peer_elog( this, "bad handshake message");
2769 return;
2770 }
2771 peer_dlog( this, "received handshake gen ${g}, lib ${lib}, head ${head}",
2772 ("g", msg.generation)("lib", msg.last_irreversible_block_num)("head", msg.head_num) );
2773
2774 std::unique_lock<std::mutex> g_conn( conn_mtx );
2775 last_handshake_recv = msg;
2776 g_conn.unlock();
2777
2778 connecting = false;
2779 if (msg.generation == 1) {
2780 if( msg.node_id == my_impl->node_id) {
2781 peer_elog( this, "Self connection detected node_id ${id}. Closing connection", ("id", msg.node_id) );
2784 return;
2785 }
2786
2788 if( peer_address().empty() ) {
2790 }
2791
2792 std::unique_lock<std::mutex> g_conn( conn_mtx );
2793 if( peer_address().empty() || last_handshake_recv.node_id == fc::sha256()) {
2794 auto c_time = last_handshake_sent.time;
2795 g_conn.unlock();
2796 peer_dlog( this, "checking for duplicate" );
2797 std::shared_lock<std::shared_mutex> g_cnts( my_impl->connections_mtx );
2798 for(const auto& check : my_impl->connections) {
2799 if(check.get() == this)
2800 continue;
2801 std::unique_lock<std::mutex> g_check_conn( check->conn_mtx );
2802 fc_dlog( logger, "dup check: connected ${c}, ${l} =? ${r}",
2803 ("c", check->connected())("l", check->last_handshake_recv.node_id)("r", msg.node_id) );
2804 if(check->connected() && check->last_handshake_recv.node_id == msg.node_id) {
2806 // It's possible that both peers could arrive here at relatively the same time, so
2807 // we need to avoid the case where they would both tell a different connection to go away.
2808 // Using the sum of the initial handshake times of the two connections, we will
2809 // arbitrarily (but consistently between the two peers) keep one of them.
2810
2811 auto check_time = check->last_handshake_sent.time + check->last_handshake_recv.time;
2812 g_check_conn.unlock();
2813 if (msg.time + c_time <= check_time)
2814 continue;
2816 if (my_impl->p2p_address < msg.p2p_address) {
2817 fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'",
2818 ("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) );
2819 // only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate,
2820 // so there is no chance for both connections to be closed
2821 continue;
2822 }
2823 } else if (my_impl->node_id < msg.node_id) {
2824 fc_dlog( logger, "not duplicate, my_impl->node_id '${lhs}' < msg.node_id '${rhs}'",
2825 ("lhs", my_impl->node_id)("rhs", msg.node_id) );
2826 // only the connection from lower node_id to higher node_id will be considered as a duplicate,
2827 // so there is no chance for both connections to be closed
2828 continue;
2829 }
2830
2831 g_cnts.unlock();
2832 peer_dlog( this, "sending go_away duplicate, msg.p2p_address: ${add}", ("add", msg.p2p_address) );
2834 gam.node_id = conn_node_id;
2835 enqueue(gam);
2837 return;
2838 }
2839 }
2840 } else {
2841 peer_dlog( this, "skipping duplicate check, addr == ${pa}, id = ${ni}",
2842 ("pa", peer_address())( "ni", last_handshake_recv.node_id ) );
2843 g_conn.unlock();
2844 }
2845
2846 if( msg.chain_id != my_impl->chain_id ) {
2847 peer_elog( this, "Peer on a different chain. Closing connection" );
2850 return;
2851 }
2853 if( protocol_version != net_version ) {
2854 peer_ilog( this, "Local network version different: ${nv} Remote version: ${mnv}",
2855 ("nv", net_version)("mnv", protocol_version.load()) );
2856 } else {
2857 peer_ilog( this, "Local network version: ${nv}", ("nv", net_version) );
2858 }
2859
2860 conn_node_id = msg.node_id;
2861 short_conn_node_id = conn_node_id.str().substr( 0, 7 );
2862
2863 if( !my_impl->authenticate_peer( msg ) ) {
2864 peer_elog( this, "Peer not authenticated. Closing connection." );
2867 return;
2868 }
2869
2871 connection_wptr weak = shared_from_this();
2872 app().post( priority::medium, [peer_lib, chain_plug = my_impl->chain_plug, weak{std::move(weak)},
2873 msg_lib_id = msg.last_irreversible_block_id]() {
2874 connection_ptr c = weak.lock();
2875 if( !c ) return;
2876 controller& cc = chain_plug->chain();
2877 uint32_t lib_num = cc.last_irreversible_block_num();
2878
2879 fc_dlog( logger, "handshake check for fork lib_num = ${ln}, peer_lib = ${pl}, connection ${cid}",
2880 ("ln", lib_num)("pl", peer_lib)("cid", c->connection_id) );
2881
2882 if( peer_lib <= lib_num && peer_lib > 0 ) {
2883 bool on_fork = false;
2884 try {
2885 block_id_type peer_lib_id = cc.get_block_id_for_num( peer_lib );
2886 on_fork = (msg_lib_id != peer_lib_id);
2887 } catch( const unknown_block_exception& ) {
2888 // allow this for now, will be checked on sync
2889 fc_dlog( logger, "peer last irreversible block ${pl} is unknown, connection ${cid}",
2890 ("pl", peer_lib)("cid", c->connection_id) );
2891 } catch( ... ) {
2892 fc_wlog( logger, "caught an exception getting block id for ${pl}, connection ${cid}",
2893 ("pl", peer_lib)("cid", c->connection_id) );
2894 on_fork = true;
2895 }
2896 if( on_fork ) {
2897 c->strand.post( [c]() {
2898 peer_elog( c, "Peer chain is forked, sending: forked go away" );
2899 c->no_retry = go_away_reason::forked;
2900 c->enqueue( go_away_message( go_away_reason::forked ) );
2901 } );
2902 }
2903 }
2904 });
2905
2906 // we don't support the 2.1 packed_transaction & signed_block, so tell 2.1 clients we are 2.0
2911 return;
2912 }
2913
2914 if( sent_handshake_count == 0 ) {
2916 }
2917 }
2918
2919 my_impl->sync_master->recv_handshake( shared_from_this(), msg );
2920 }
2921
2923 peer_wlog( this, "received go_away_message, reason = ${r}", ("r", reason_str( msg.reason )) );
2924
2925 bool retry = no_retry == no_reason; // if no previous go away message
2926 no_retry = msg.reason;
2927 if( msg.reason == duplicate ) {
2928 conn_node_id = msg.node_id;
2929 }
2930 if( msg.reason == wrong_version ) {
2931 if( !retry ) no_retry = fatal_other; // only retry once on wrong version
2932 }
2933 else if ( msg.reason == benign_other ) {
2934 if ( retry ) peer_dlog( this, "received benign_other reason, retrying to connect");
2935 }
2936 else {
2937 retry = false;
2938 }
2939 flush_queues();
2940
2941 close( retry ); // reconnect if wrong_version
2942 }
2943
2945 peer_ilog( this, "received time_message" );
2946
2947 /* We've already lost however many microseconds it took to dispatch
2948 * the message, but it can't be helped.
2949 */
2950 msg.dst = get_time();
2951
2952 // If the transmit timestamp is zero, the peer is horribly broken.
2953 if(msg.xmt == 0)
2954 return; /* invalid timestamp */
2955
2956 if(msg.xmt == xmt)
2957 return; /* duplicate packet */
2958
2959 xmt = msg.xmt;
2960 rec = msg.rec;
2961 dst = msg.dst;
2962
2963 if( msg.org == 0 ) {
2964 send_time( msg );
2965 return; // We don't have enough data to perform the calculation yet.
2966 }
2967
2968 double offset = (double(rec - org) + double(msg.xmt - dst)) / 2;
2969 double NsecPerUsec{1000};
2970
2972 logger.log( FC_LOG_MESSAGE( all, "Clock offset is ${o}ns (${us}us)",
2973 ("o", offset)( "us", offset / NsecPerUsec ) ) );
2974 org = 0;
2975 rec = 0;
2976
2977 std::unique_lock<std::mutex> g_conn( conn_mtx );
2978 if( last_handshake_recv.generation == 0 ) {
2979 g_conn.unlock();
2981 }
2982 }
2983
2985 // peer tells us about one or more blocks or txns. When done syncing, forward on
2986 // notices of previously unknown blocks or txns,
2987 //
2988 peer_dlog( this, "received notice_message" );
2989 connecting = false;
2990 if( msg.known_blocks.ids.size() > 1 ) {
2991 peer_elog( this, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection",
2992 ("s", msg.known_blocks.ids.size()) );
2993 close( false );
2994 return;
2995 }
2996 if( msg.known_trx.mode != none ) {
2998 const block_id_type& blkid = msg.known_blocks.ids.empty() ? block_id_type{} : msg.known_blocks.ids.back();
2999 peer_dlog( this, "this is a ${m} notice with ${n} pending blocks: ${num} ${id}...",
3000 ("m", modes_str( msg.known_blocks.mode ))("n", msg.known_blocks.pending)
3001 ("num", block_header::num_from_id( blkid ))("id", blkid.str().substr( 8, 16 )) );
3002 }
3003 }
3004 switch (msg.known_trx.mode) {
3005 case none:
3006 break;
3007 case last_irr_catch_up: {
3008 std::unique_lock<std::mutex> g_conn( conn_mtx );
3010 g_conn.unlock();
3011 break;
3012 }
3013 case catch_up : {
3014 break;
3015 }
3016 case normal: {
3017 my_impl->dispatcher->recv_notice( shared_from_this(), msg, false );
3018 }
3019 }
3020
3021 if( msg.known_blocks.mode != none ) {
3022 peer_dlog( this, "this is a ${m} notice with ${n} blocks",
3023 ("m", modes_str( msg.known_blocks.mode ))( "n", msg.known_blocks.pending ) );
3024 }
3025 switch (msg.known_blocks.mode) {
3026 case none : {
3027 break;
3028 }
3029 case last_irr_catch_up:
3030 case catch_up: {
3031 my_impl->sync_master->sync_recv_notice( shared_from_this(), msg );
3032 break;
3033 }
3034 case normal : {
3035 my_impl->dispatcher->recv_notice( shared_from_this(), msg, false );
3036 break;
3037 }
3038 default: {
3039 peer_elog( this, "bad notice_message : invalid known_blocks.mode ${m}",
3040 ("m", static_cast<uint32_t>(msg.known_blocks.mode)) );
3041 }
3042 }
3043 }
3044
3046 if( msg.req_blocks.ids.size() > 1 ) {
3047 peer_elog( this, "Invalid request_message, req_blocks.ids.size ${s}, closing",
3048 ("s", msg.req_blocks.ids.size()) );
3049 close();
3050 return;
3051 }
3052
3053 switch (msg.req_blocks.mode) {
3054 case catch_up :
3055 peer_dlog( this, "received request_message:catch_up" );
3056 blk_send_branch( msg.req_blocks.ids.empty() ? block_id_type() : msg.req_blocks.ids.back() );
3057 break;
3058 case normal :
3059 peer_dlog( this, "received request_message:normal" );
3060 if( !msg.req_blocks.ids.empty() ) {
3061 blk_send( msg.req_blocks.ids.back() );
3062 }
3063 break;
3064 default:;
3065 }
3066
3067
3068 switch (msg.req_trx.mode) {
3069 case catch_up :
3070 break;
3071 case none :
3072 if( msg.req_blocks.mode == none ) {
3073 stop_send();
3074 }
3075 // no break
3076 case normal :
3077 if( !msg.req_trx.ids.empty() ) {
3078 peer_elog( this, "Invalid request_message, req_trx.ids.size ${s}", ("s", msg.req_trx.ids.size()) );
3079 close();
3080 return;
3081 }
3082 break;
3083 default:;
3084 }
3085 }
3086
3088 peer_dlog( this, "peer requested ${start} to ${end}", ("start", msg.start_block)("end", msg.end_block) );
3089 if( msg.end_block == 0 ) {
3090 peer_requested.reset();
3091 flush_queues();
3092 } else {
3093 if (peer_requested) {
3094 // This happens when peer already requested some range and sync is still in progress
3095 // It could be higher in case of peer requested head catchup and current request is lib catchup
3096 // So to make sure peer will receive all requested blocks we assign end_block to highest value
3097 peer_requested->end_block = std::max(msg.end_block, peer_requested->end_block);
3098 }
3099 else {
3100 peer_requested = peer_sync_state( msg.start_block, msg.end_block, msg.start_block-1);
3101 }
3103 }
3104 }
3105
3107 return trx->get_estimated_size();
3108 }
3109
3111 const auto& tid = trx->id();
3112 peer_dlog( this, "received packed_transaction ${id}", ("id", tid) );
3113
3115 my_impl->chain_plug->accept_transaction( trx,
3116 [weak = weak_from_this(), trx](const std::variant<fc::exception_ptr, transaction_trace_ptr>& result) mutable {
3117 // next (this lambda) called from application thread
3118 if (std::holds_alternative<fc::exception_ptr>(result)) {
3119 fc_dlog( logger, "bad packed_transaction : ${m}", ("m", std::get<fc::exception_ptr>(result)->what()) );
3120 } else {
3121 const transaction_trace_ptr& trace = std::get<transaction_trace_ptr>(result);
3122 if( !trace->except ) {
3123 fc_dlog( logger, "chain accepted transaction, bcast ${id}", ("id", trace->id) );
3124 } else {
3125 fc_elog( logger, "bad packed_transaction : ${m}", ("m", trace->except->what()));
3126 }
3127 }
3128 connection_ptr conn = weak.lock();
3129 if( conn ) {
3130 conn->trx_in_progress_size -= calc_trx_size( trx );
3131 }
3132 });
3133 }
3134
3135 // called from connection strand
3137 peer_dlog( this, "received signed_block ${num}, id ${id}", ("num", ptr->block_num())("id", id) );
3138
3139 controller& cc = my_impl->chain_plug->chain();
3140 block_state_ptr bsp;
3141 bool exception = false;
3142 try {
3143 if( cc.fetch_block_state_by_id( id ) ) {
3144 my_impl->dispatcher->add_peer_block( id, connection_id );
3145 my_impl->sync_master->sync_recv_block( shared_from_this(), id, ptr->block_num(), false );
3146 return;
3147 }
3148 // this may return null if block is not immediately ready to be processed
3149 bsp = cc.create_block_state( id, ptr );
3150 } catch( const fc::exception& ex ) {
3151 exception = true;
3152 peer_elog(this, "bad block exception: #${n} ${id}...: ${m}",
3153 ("n", ptr->block_num())("id", id.str().substr(8,16))("m",ex.to_string()));
3154 } catch( ... ) {
3155 exception = true;
3156 peer_elog(this, "bad block: #${n} ${id}...: unknown exception",
3157 ("n", ptr->block_num())("id", id.str().substr(8,16)));
3158 }
3159 if( exception ) {
3160 my_impl->sync_master->rejected_block( shared_from_this(), ptr->block_num() );
3161 my_impl->dispatcher->rejected_block( id );
3162 return;
3163 }
3164
3165 bool signal_producer = !!bsp; // ready to process immediately, so signal producer to interrupt start_block
3166 app().post(priority::medium, [ptr{std::move(ptr)}, bsp{std::move(bsp)}, id, c = shared_from_this()]() mutable {
3167 c->process_signed_block( id, std::move(ptr), std::move(bsp) );
3168 });
3169
3170 if( signal_producer )
3171 my_impl->producer_plug->received_block();
3172 }
3173
3174 // called from application thread
3176 controller& cc = my_impl->chain_plug->chain();
3177 uint32_t blk_num = msg->block_num();
3178 // use c in this method instead of this to highlight that all methods called on c-> must be thread safe
3179 connection_ptr c = shared_from_this();
3180
3181 // if we have closed connection then stop processing
3182 if( !c->socket_is_open() )
3183 return;
3184
3185 try {
3186 if( cc.fetch_block_by_id(blk_id) ) {
3187 c->strand.post( [sync_master = my_impl->sync_master.get(),
3188 dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
3189 dispatcher->add_peer_block( blk_id, c->connection_id );
3190 sync_master->sync_recv_block( c, blk_id, blk_num, false );
3191 });
3192 return;
3193 }
3194 } catch(...) {
3195 // should this even be caught?
3196 fc_elog( logger, "Caught an unknown exception trying to recall block ID" );
3197 }
3198
3199 fc::microseconds age( fc::time_point::now() - msg->timestamp);
3200 fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection ${cid}",
3201 ("n", blk_num)("age", age.to_seconds())("cid", c->connection_id) );
3202
3203 go_away_reason reason = fatal_other;
3204 try {
3205 bool accepted = my_impl->chain_plug->accept_block(msg, blk_id, bsp);
3206 my_impl->update_chain_info();
3207 if( !accepted ) return;
3208 reason = no_reason;
3209 } catch( const unlinkable_block_exception &ex) {
3210 fc_elog(logger, "unlinkable_block_exception connection ${cid}: #${n} ${id}...: ${m}",
3211 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3212 reason = unlinkable;
3213 } catch( const block_validate_exception &ex ) {
3214 fc_elog(logger, "block_validate_exception connection ${cid}: #${n} ${id}...: ${m}",
3215 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3216 reason = validation;
3217 } catch( const assert_exception &ex ) {
3218 fc_elog(logger, "block assert_exception connection ${cid}: #${n} ${id}...: ${m}",
3219 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3220 } catch( const fc::exception &ex ) {
3221 fc_elog(logger, "bad block exception connection ${cid}: #${n} ${id}...: ${m}",
3222 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
3223 } catch( ... ) {
3224 fc_elog(logger, "bad block connection ${cid}: #${n} ${id}...: unknown exception",
3225 ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16)));
3226 }
3227
3228 if( reason == no_reason ) {
3229 boost::asio::post( my_impl->thread_pool->get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() {
3230 fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) );
3231 dispatcher->add_peer_block( blk_id, cid );
3232 });
3233 c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
3234 dispatcher->recv_block( c, blk_id, blk_num );
3235 sync_master->sync_recv_block( c, blk_id, blk_num, true );
3236 });
3237 } else {
3238 c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
3239 sync_master->rejected_block( c, blk_num );
3240 dispatcher->rejected_block( blk_id );
3241 });
3242 }
3243 }
3244
3245 // called from any thread
3246 void net_plugin_impl::start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection) {
3247 if( in_shutdown ) return;
3248 std::lock_guard<std::mutex> g( connector_check_timer_mtx );
3250 connector_check_timer->expires_from_now( du );
3251 connector_check_timer->async_wait( [my = shared_from_this(), from_connection](boost::system::error_code ec) {
3252 std::unique_lock<std::mutex> g( my->connector_check_timer_mtx );
3253 int num_in_flight = --my->connector_checks_in_flight;
3254 g.unlock();
3255 if( !ec ) {
3256 my->connection_monitor(from_connection, num_in_flight == 0 );
3257 } else {
3258 if( num_in_flight == 0 ) {
3259 if( my->in_shutdown ) return;
3260 fc_elog( logger, "Error from connection check monitor: ${m}", ("m", ec.message()));
3261 my->start_conn_timer( my->connector_period, std::weak_ptr<connection>() );
3262 }
3263 }
3264 });
3265 }
3266
3267 // thread safe
3269 if( in_shutdown ) return;
3270 std::lock_guard<std::mutex> g( expire_timer_mtx );
3271 expire_timer->expires_from_now( txn_exp_period);
3272 expire_timer->async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
3273 if( !ec ) {
3274 my->expire();
3275 } else {
3276 if( my->in_shutdown ) return;
3277 fc_elog( logger, "Error from transaction check monitor: ${m}", ("m", ec.message()) );
3278 my->start_expire_timer();
3279 }
3280 } );
3281 }
3282
3283 // thread safe
3285 if( in_shutdown ) return;
3286 std::lock_guard<std::mutex> g( keepalive_timer_mtx );
3287 keepalive_timer->expires_from_now(keepalive_interval);
3288 keepalive_timer->async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
3289 my->ticker();
3290 if( ec ) {
3291 if( my->in_shutdown ) return;
3292 fc_wlog( logger, "Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()) );
3293 }
3294
3295 tstamp current_time = connection::get_time();
3296 for_each_connection( [current_time]( auto& c ) {
3297 if( c->socket_is_open() ) {
3298 c->strand.post([c, current_time]() {
3299 c->check_heartbeat(current_time);
3300 } );
3301 }
3302 return true;
3303 } );
3304 } );
3305 }
3306
3308 {
3309 std::lock_guard<std::mutex> g( connector_check_timer_mtx );
3310 connector_check_timer.reset(new boost::asio::steady_timer( my_impl->thread_pool->get_executor() ));
3311 }
3312 {
3313 std::lock_guard<std::mutex> g( expire_timer_mtx );
3314 expire_timer.reset( new boost::asio::steady_timer( my_impl->thread_pool->get_executor() ) );
3315 }
3316 start_conn_timer(connector_period, std::weak_ptr<connection>());
3318 }
3319
3321 auto now = time_point::now();
3322 uint32_t lib = 0;
3323 std::tie( lib, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = get_chain_info();
3324 dispatcher->expire_blocks( lib );
3325 dispatcher->expire_txns();
3326 fc_dlog( logger, "expire_txns ${n}us", ("n", time_point::now() - now) );
3327
3329 }
3330
3331 // called from any thread
3332 void net_plugin_impl::connection_monitor(std::weak_ptr<connection> from_connection, bool reschedule ) {
3333 auto max_time = fc::time_point::now();
3335 auto from = from_connection.lock();
3336 std::unique_lock<std::shared_mutex> g( connections_mtx );
3337 auto it = (from ? connections.find(from) : connections.begin());
3338 if (it == connections.end()) it = connections.begin();
3339 size_t num_rm = 0, num_clients = 0, num_peers = 0;
3340 while (it != connections.end()) {
3341 if (fc::time_point::now() >= max_time) {
3342 connection_wptr wit = *it;
3343 g.unlock();
3344 fc_dlog( logger, "Exiting connection monitor early, ran out of time: ${t}", ("t", max_time - fc::time_point::now()) );
3345 fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}",
3346 ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size()) );
3347 if( reschedule ) {
3348 start_conn_timer( std::chrono::milliseconds( 1 ), wit ); // avoid exhausting
3349 }
3350 return;
3351 }
3352 (*it)->peer_address().empty() ? ++num_clients : ++num_peers;
3353 if( !(*it)->socket_is_open() && !(*it)->connecting) {
3354 if( !(*it)->peer_address().empty() ) {
3355 if( !(*it)->resolve_and_connect() ) {
3356 it = connections.erase(it);
3357 --num_peers; ++num_rm;
3358 continue;
3359 }
3360 } else {
3361 --num_clients; ++num_rm;
3362 it = connections.erase(it);
3363 continue;
3364 }
3365 }
3366 ++it;
3367 }
3368 g.unlock();
3369 if( num_clients > 0 || num_peers > 0 )
3370 fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}",
3371 ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size()) );
3372 fc_dlog( logger, "connection monitor, removed ${n} connections", ("n", num_rm) );
3373 if( reschedule ) {
3374 start_conn_timer( connector_period, std::weak_ptr<connection>());
3375 }
3376 }
3377
3378 // called from application thread
3381 dispatcher->strand.post( [this, bs]() {
3382 fc_dlog( logger, "signaled accepted_block, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id) );
3383
3384 auto blk_trace = fc_create_trace_with_id( "Block", bs->id );
3385 auto blk_span = fc_create_span( blk_trace, "Accepted" );
3386 fc_add_tag( blk_span, "block_id", bs->id );
3387 fc_add_tag( blk_span, "block_num", bs->block_num );
3388 fc_add_tag( blk_span, "block_time", bs->block->timestamp.to_time_point() );
3389
3390 dispatcher->bcast_block( bs->block, bs->id );
3391 });
3392 }
3393
3394 // called from application thread
3397 controller& cc = chain_plug->chain();
3398 if( cc.is_trusted_producer(block->producer) ) {
3399 dispatcher->strand.post( [this, block]() {
3400 auto id = block->calculate_id();
3401 fc_dlog( logger, "signaled pre_accepted_block, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id) );
3402
3403 auto blk_trace = fc_create_trace_with_id("Block", id);
3404 auto blk_span = fc_create_span(blk_trace, "PreAccepted");
3405 fc_add_tag(blk_span, "block_id", id);
3406 fc_add_tag(blk_span, "block_num", block->block_num());
3407 fc_add_tag(blk_span, "block_time", block->timestamp.to_time_point());
3408 fc_add_tag(blk_span, "producer", block->producer.to_string());
3409
3410 dispatcher->bcast_block( block, id );
3411 });
3412 }
3413 }
3414
3415 // called from application thread
3417 fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block->block_num)("id", block->id) );
3419 }
3420
3421 // called from application thread
3422 void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>& results) {
3423 dispatcher->strand.post( [this, results]() {
3424 const auto& id = results.second->id();
3425 if (results.first) {
3426 fc_dlog( logger, "signaled NACK, trx-id = ${id} : ${why}", ("id", id)( "why", results.first->to_detail_string() ) );
3427
3428 uint32_t head_blk_num = 0;
3429 std::tie( std::ignore, head_blk_num, std::ignore, std::ignore, std::ignore, std::ignore ) = get_chain_info();
3430 dispatcher->rejected_transaction(results.second, head_blk_num);
3431 } else {
3432 fc_dlog( logger, "signaled ACK, trx-id = ${id}", ("id", id) );
3433 dispatcher->bcast_transaction(results.second);
3434 }
3435 });
3436 }
3437
3440 return false;
3441
3443 return true;
3444
3446 auto allowed_it = std::find(allowed_peers.begin(), allowed_peers.end(), msg.key);
3447 auto private_it = private_keys.find(msg.key);
3448 bool found_producer_key = false;
3449 if(producer_plug != nullptr)
3450 found_producer_key = producer_plug->is_producer_key(msg.key);
3451 if( allowed_it == allowed_peers.end() && private_it == private_keys.end() && !found_producer_key) {
3452 fc_elog( logger, "Peer ${peer} sent a handshake with an unauthorized key: ${key}.",
3453 ("peer", msg.p2p_address)("key", msg.key) );
3454 return false;
3455 }
3456 }
3457
3458 if(msg.sig != chain::signature_type() && msg.token != sha256()) {
3459 sha256 hash = fc::sha256::hash(msg.time);
3460 if(hash != msg.token) {
3461 fc_elog( logger, "Peer ${peer} sent a handshake with an invalid token.", ("peer", msg.p2p_address) );
3462 return false;
3463 }
3464 chain::public_key_type peer_key;
3465 try {
3466 peer_key = crypto::public_key(msg.sig, msg.token, true);
3467 }
3468 catch (const std::exception& /*e*/) {
3469 fc_elog( logger, "Peer ${peer} sent a handshake with an unrecoverable key.", ("peer", msg.p2p_address) );
3470 return false;
3471 }
3472 if((allowed_connections & (Producers | Specified)) && peer_key != msg.key) {
3473 fc_elog( logger, "Peer ${peer} sent a handshake with an unauthenticated key.", ("peer", msg.p2p_address) );
3474 return false;
3475 }
3476 }
3477 else if(allowed_connections & (Producers | Specified)) {
3478 fc_dlog( logger, "Peer sent a handshake with blank signature and token, but this node accepts only authenticated connections." );
3479 return false;
3480 }
3481 return true;
3482 }
3483
3485 if(!private_keys.empty())
3486 return private_keys.begin()->first;
3487 /*producer_plugin* pp = app().find_plugin<producer_plugin>();
3488 if(pp != nullptr && pp->get_state() == abstract_plugin::started)
3489 return pp->first_producer_public_key();*/
3490 return chain::public_key_type();
3491 }
3492
3494 {
3495 auto private_key_itr = private_keys.find(signer);
3496 if(private_key_itr != private_keys.end())
3497 return private_key_itr->second.sign(digest);
3499 return producer_plug->sign_compact(signer, digest);
3500 return chain::signature_type();
3501 }
3502
3503 // call from connection strand
3505 namespace sc = std::chrono;
3507 uint32_t lib, head;
3508 std::tie( lib, std::ignore, head,
3509 hello.last_irreversible_block_id, std::ignore, hello.head_id ) = my_impl->get_chain_info();
3510 hello.last_irreversible_block_num = lib;
3511 hello.head_num = head;
3512 hello.chain_id = my_impl->chain_id;
3513 hello.node_id = my_impl->node_id;
3514 hello.key = my_impl->get_authentication_key();
3515 hello.time = sc::duration_cast<sc::nanoseconds>(sc::system_clock::now().time_since_epoch()).count();
3516 hello.token = fc::sha256::hash(hello.time);
3517 hello.sig = my_impl->sign_compact(hello.key, hello.token);
3518 // If we couldn't sign, don't send a token.
3519 if(hello.sig == chain::signature_type())
3520 hello.token = sha256();
3521 hello.p2p_address = my_impl->p2p_address;
3522 if( is_transactions_only_connection() ) hello.p2p_address += ":trx";
3523 if( is_blocks_only_connection() ) hello.p2p_address += ":blk";
3524 hello.p2p_address += " - " + hello.node_id.str().substr(0,7);
3525#if defined( __APPLE__ )
3526 hello.os = "osx";
3527#elif defined( __linux__ )
3528 hello.os = "linux";
3529#elif defined( _WIN32 )
3530 hello.os = "win32";
3531#else
3532 hello.os = "other";
3533#endif
3534 hello.agent = my_impl->user_agent_name;
3535
3536 return true;
3537 }
3538
3540 :my( new net_plugin_impl ) {
3541 my_impl = my.get();
3542 }
3543
3546
3547 void net_plugin::set_program_options( options_description& /*cli*/, options_description& cfg )
3548 {
3549 cfg.add_options()
3550 ( "p2p-listen-endpoint", bpo::value<string>()->default_value( "0.0.0.0:9876" ), "The actual host:port used to listen for incoming p2p connections.")
3551 ( "p2p-server-address", bpo::value<string>(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint.")
3552 ( "p2p-peer-address", bpo::value< vector<string> >()->composing(),
3553 "The public endpoint of a peer node to connect to. Use multiple p2p-peer-address options as needed to compose a network.\n"
3554 " Syntax: host:port[:<trx>|<blk>]\n"
3555 " The optional 'trx' and 'blk' indicates to node that only transactions 'trx' or blocks 'blk' should be sent."
3556 " Examples:\n"
3557 " p2p.eos.io:9876\n"
3558 " p2p.trx.eos.io:9876:trx\n"
3559 " p2p.blk.eos.io:9876:blk\n")
3560 ( "p2p-max-nodes-per-host", bpo::value<int>()->default_value(def_max_nodes_per_host), "Maximum number of client nodes from any single IP address")
3561 ( "p2p-accept-transactions", bpo::value<bool>()->default_value(true), "Allow transactions received over p2p network to be evaluated and relayed if valid.")
3562 ( "agent-name", bpo::value<string>()->default_value("SYS Test Agent"), "The name supplied to identify this node amongst the peers.")
3563 ( "allowed-connection", bpo::value<vector<string>>()->multitoken()->default_value({"any"}, "any"), "Can be 'any' or 'producers' or 'specified' or 'none'. If 'specified', peer-key must be specified at least once. If only 'producers', peer-key is not required. 'producers' and 'specified' may be combined.")
3564 ( "peer-key", bpo::value<vector<string>>()->composing()->multitoken(), "Optional public key of peer allowed to connect. May be used multiple times.")
3565 ( "peer-private-key", boost::program_options::value<vector<string>>()->composing()->multitoken(),
3566 "Tuple of [PublicKey, WIF private key] (may specify multiple times)")
3567 ( "max-clients", bpo::value<int>()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit")
3568 ( "connection-cleanup-period", bpo::value<int>()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections")
3569 ( "max-cleanup-time-msec", bpo::value<int>()->default_value(10), "max connection cleanup time per cleanup call in milliseconds")
3570 ( "p2p-dedup-cache-expire-time-sec", bpo::value<uint32_t>()->default_value(10), "Maximum time to track transaction for duplicate optimization")
3571 ( "net-threads", bpo::value<uint16_t>()->default_value(my->thread_pool_size),
3572 "Number of worker threads in net_plugin thread pool" )
3573 ( "sync-fetch-span", bpo::value<uint32_t>()->default_value(def_sync_fetch_span), "number of blocks to retrieve in a chunk from any individual peer during synchronization")
3574 ( "use-socket-read-watermark", bpo::value<bool>()->default_value(false), "Enable experimental socket read watermark optimization")
3575 ( "peer-log-format", bpo::value<string>()->default_value( "[\"${_name}\" - ${_cid} ${_ip}:${_port}] " ),
3576 "The string used to format peers when logging messages about them. Variables are escaped with ${<variable name>}.\n"
3577 "Available Variables:\n"
3578 " _name \tself-reported name\n\n"
3579 " _cid \tassigned connection id\n\n"
3580 " _id \tself-reported ID (64 hex characters)\n\n"
3581 " _sid \tfirst 8 characters of _peer.id\n\n"
3582 " _ip \tremote IP address of peer\n\n"
3583 " _port \tremote port number of peer\n\n"
3584 " _lip \tlocal IP address connected to peer\n\n"
3585 " _lport \tlocal port number connected to peer\n\n")
3586 ( "p2p-keepalive-interval-ms", bpo::value<int>()->default_value(def_keepalive_interval), "peer heartbeat keepalive message interval in milliseconds")
3587
3588 ;
3589 }
3590
3591 template<typename T>
3592 T dejsonify(const string& s) {
3593 return fc::json::from_string(s).as<T>();
3594 }
3595
3596 void net_plugin::plugin_initialize( const variables_map& options ) {
3597 fc_ilog( logger, "Initialize net plugin" );
3598 try {
3599 peer_log_format = options.at( "peer-log-format" ).as<string>();
3600
3601 my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
3602
3603 my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as<int>());
3604 my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as<int>();
3605 my->txn_exp_period = def_txn_expire_wait;
3606 my->p2p_dedup_cache_expire_time_us = fc::seconds( options.at( "p2p-dedup-cache-expire-time-sec" ).as<uint32_t>() );
3607 my->resp_expected_period = def_resp_expected_wait;
3608 my->max_client_count = options.at( "max-clients" ).as<int>();
3609 my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as<int>();
3610 my->p2p_accept_transactions = options.at( "p2p-accept-transactions" ).as<bool>();
3611
3612 my->use_socket_read_watermark = options.at( "use-socket-read-watermark" ).as<bool>();
3613 my->keepalive_interval = std::chrono::milliseconds( options.at( "p2p-keepalive-interval-ms" ).as<int>() );
3614 SYS_ASSERT( my->keepalive_interval.count() > 0, chain::plugin_config_exception,
3615 "p2p-keepalive_interval-ms must be greater than 0" );
3616
3617 if( options.count( "p2p-keepalive-interval-ms" )) {
3618 my->heartbeat_timeout = std::chrono::milliseconds( options.at( "p2p-keepalive-interval-ms" ).as<int>() * 2 );
3619 }
3620
3621 if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as<string>().length()) {
3622 my->p2p_address = options.at( "p2p-listen-endpoint" ).as<string>();
3623 SYS_ASSERT( my->p2p_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
3624 "p2p-listen-endpoint too long, must be less than ${m}", ("m", max_p2p_address_length) );
3625 }
3626 if( options.count( "p2p-server-address" ) ) {
3627 my->p2p_server_address = options.at( "p2p-server-address" ).as<string>();
3628 SYS_ASSERT( my->p2p_server_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
3629 "p2p_server_address too long, must be less than ${m}", ("m", max_p2p_address_length) );
3630 }
3631
3632 my->thread_pool_size = options.at( "net-threads" ).as<uint16_t>();
3633 SYS_ASSERT( my->thread_pool_size > 0, chain::plugin_config_exception,
3634 "net-threads ${num} must be greater than 0", ("num", my->thread_pool_size) );
3635
3636 if( options.count( "p2p-peer-address" )) {
3637 my->supplied_peers = options.at( "p2p-peer-address" ).as<vector<string> >();
3638 }
3639 if( options.count( "agent-name" )) {
3640 my->user_agent_name = options.at( "agent-name" ).as<string>();
3641 SYS_ASSERT( my->user_agent_name.length() <= max_handshake_str_length, chain::plugin_config_exception,
3642 "agent-name too long, must be less than ${m}", ("m", max_handshake_str_length) );
3643 }
3644
3645 if( options.count( "allowed-connection" )) {
3646 const std::vector<std::string> allowed_remotes = options["allowed-connection"].as<std::vector<std::string>>();
3647 for( const std::string& allowed_remote : allowed_remotes ) {
3648 if( allowed_remote == "any" )
3649 my->allowed_connections |= net_plugin_impl::Any;
3650 else if( allowed_remote == "producers" )
3651 my->allowed_connections |= net_plugin_impl::Producers;
3652 else if( allowed_remote == "specified" )
3653 my->allowed_connections |= net_plugin_impl::Specified;
3654 else if( allowed_remote == "none" )
3655 my->allowed_connections = net_plugin_impl::None;
3656 }
3657 }
3658
3659 if( my->allowed_connections & net_plugin_impl::Specified )
3660 SYS_ASSERT( options.count( "peer-key" ),
3661 plugin_config_exception,
3662 "At least one peer-key must accompany 'allowed-connection=specified'" );
3663
3664 if( options.count( "peer-key" )) {
3665 const std::vector<std::string> key_strings = options["peer-key"].as<std::vector<std::string>>();
3666 for( const std::string& key_string : key_strings ) {
3667 my->allowed_peers.push_back( dejsonify<chain::public_key_type>( key_string ));
3668 }
3669 }
3670
3671 if( options.count( "peer-private-key" )) {
3672 const std::vector<std::string> key_id_to_wif_pair_strings = options["peer-private-key"].as<std::vector<std::string>>();
3673 for( const std::string& key_id_to_wif_pair_string : key_id_to_wif_pair_strings ) {
3675 key_id_to_wif_pair_string );
3676 my->private_keys[key_id_to_wif_pair.first] = fc::crypto::private_key( key_id_to_wif_pair.second );
3677 }
3678 }
3679
3680 my->chain_plug = app().find_plugin<chain_plugin>();
3681 SYS_ASSERT( my->chain_plug, chain::missing_chain_plugin_exception, "" );
3682 my->chain_id = my->chain_plug->get_chain_id();
3683 fc::rand_pseudo_bytes( my->node_id.data(), my->node_id.data_size());
3684 const controller& cc = my->chain_plug->chain();
3685
3686 if( cc.get_read_mode() == db_read_mode::IRREVERSIBLE || cc.get_read_mode() == db_read_mode::READ_ONLY ) {
3687 if( my->p2p_accept_transactions ) {
3688 my->p2p_accept_transactions = false;
3689 string m = cc.get_read_mode() == db_read_mode::IRREVERSIBLE ? "irreversible" : "read-only";
3690 wlog( "p2p-accept-transactions set to false due to read-mode: ${m}", ("m", m) );
3691 }
3692 }
3693 if( my->p2p_accept_transactions ) {
3694 my->chain_plug->enable_accept_transactions();
3695 }
3696
3698 }
3699
3701 handle_sighup();
3702 try {
3703
3704 fc_ilog( logger, "my node_id is ${id}", ("id", my->node_id ));
3705
3706 my->producer_plug = app().find_plugin<producer_plugin>();
3707
3708 my->thread_pool.emplace( "net", my->thread_pool_size );
3709
3710 my->dispatcher.reset( new dispatch_manager( my_impl->thread_pool->get_executor() ) );
3711
3712 if( !my->p2p_accept_transactions && my->p2p_address.size() ) {
3713 fc_ilog( logger, "\n"
3714 "***********************************\n"
3715 "* p2p-accept-transactions = false *\n"
3716 "* Transactions not forwarded *\n"
3717 "***********************************\n" );
3718 }
3719
3720 tcp::endpoint listen_endpoint;
3721 if( my->p2p_address.size() > 0 ) {
3722 auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' ));
3723 auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size());
3724 tcp::resolver resolver( my->thread_pool->get_executor() );
3725 // Note: need to add support for IPv6 too?
3726 listen_endpoint = *resolver.resolve( tcp::v4(), host, port );
3727
3728 my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool->get_executor() ) );
3729
3730 if( !my->p2p_server_address.empty() ) {
3731 my->p2p_address = my->p2p_server_address;
3732 } else {
3733 if( listen_endpoint.address().to_v4() == address_v4::any()) {
3734 boost::system::error_code ec;
3735 auto host = host_name( ec );
3736 if( ec.value() != boost::system::errc::success ) {
3737
3738 FC_THROW_EXCEPTION( fc::invalid_arg_exception,
3739 "Unable to retrieve host_name. ${msg}", ("msg", ec.message()));
3740
3741 }
3742 auto port = my->p2p_address.substr( my->p2p_address.find( ':' ), my->p2p_address.size());
3743 my->p2p_address = host + port;
3744 }
3745 }
3746 }
3747
3748 {
3749 chain::controller& cc = my->chain_plug->chain();
3750 cc.accepted_block.connect( [my = my]( const block_state_ptr& s ) {
3751 my->on_accepted_block( s );
3752 } );
3753 cc.pre_accepted_block.connect( [my = my]( const signed_block_ptr& s ) {
3754 my->on_pre_accepted_block( s );
3755 } );
3756 cc.irreversible_block.connect( [my = my]( const block_state_ptr& s ) {
3757 my->on_irreversible_block( s );
3758 } );
3759 }
3760
3761 {
3762 std::lock_guard<std::mutex> g( my->keepalive_timer_mtx );
3763 my->keepalive_timer.reset( new boost::asio::steady_timer( my->thread_pool->get_executor() ) );
3764 }
3765
3766 my->incoming_transaction_ack_subscription = app().get_channel<compat::channels::transaction_ack>().subscribe(
3767 std::bind(&net_plugin_impl::transaction_ack, my.get(), std::placeholders::_1));
3768
3769 app().post(priority::highest, [my=my, listen_endpoint](){
3770 if( my->acceptor ) {
3771 try {
3772 my->acceptor->open(listen_endpoint.protocol());
3773 my->acceptor->set_option(tcp::acceptor::reuse_address(true));
3774 my->acceptor->bind(listen_endpoint);
3775 my->acceptor->listen();
3776 } catch (const std::exception& e) {
3777 elog( "net_plugin::plugin_startup failed to bind to port ${port}, ${what}",
3778 ("port", listen_endpoint.port())("what", e.what()) );
3779 app().quit();
3780 return;
3781 }
3782 fc_ilog( logger, "starting listener, max clients is ${mc}",("mc",my->max_client_count) );
3783 my->start_listen_loop();
3784 }
3785
3786 my->ticker();
3787 my->start_monitors();
3788 my->update_chain_info();
3789 for( const auto& seed_node : my->supplied_peers ) {
3790 my->connect( seed_node );
3791 }
3792 });
3793
3794 } catch( ... ) {
3795 // always want plugin_shutdown even on exception
3797 throw;
3798 }
3799 }
3800
3804
3806 try {
3807 fc_ilog( logger, "shutdown.." );
3808 my->in_shutdown = true;
3809 {
3810 std::lock_guard<std::mutex> g( my->connector_check_timer_mtx );
3811 if( my->connector_check_timer )
3812 my->connector_check_timer->cancel();
3813 }{
3814 std::lock_guard<std::mutex> g( my->expire_timer_mtx );
3815 if( my->expire_timer )
3816 my->expire_timer->cancel();
3817 }{
3818 std::lock_guard<std::mutex> g( my->keepalive_timer_mtx );
3819 if( my->keepalive_timer )
3820 my->keepalive_timer->cancel();
3821 }
3822
3823 {
3824 fc_ilog( logger, "close ${s} connections", ("s", my->connections.size()) );
3825 std::lock_guard<std::shared_mutex> g( my->connections_mtx );
3826 for( auto& con : my->connections ) {
3827 fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) );
3828 con->close( false, true );
3829 }
3830 my->connections.clear();
3831 }
3832
3833 if( my->thread_pool ) {
3834 my->thread_pool->stop();
3835 }
3836
3837 if( my->acceptor ) {
3838 boost::system::error_code ec;
3839 my->acceptor->cancel( ec );
3840 my->acceptor->close( ec );
3841 }
3842
3843 app().post( 0, [me = my](){} ); // keep my pointer alive until queue is drained
3844 fc_ilog( logger, "exit shutdown" );
3845 }
3847 }
3848
3852 string net_plugin::connect( const string& host ) {
3853 return my->connect( host );
3854 }
3855
3856 string net_plugin_impl::connect( const string& host ) {
3857 std::lock_guard<std::shared_mutex> g( connections_mtx );
3858 if( find_connection( host ) )
3859 return "already connected";
3860
3861 connection_ptr c = std::make_shared<connection>( host );
3862 fc_dlog( logger, "calling active connector: ${h}", ("h", host) );
3863 if( c->resolve_and_connect() ) {
3864 fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) );
3865 c->set_heartbeat_timeout( heartbeat_timeout );
3866 connections.insert( c );
3867 }
3868 return "added connection";
3869 }
3870
3871 string net_plugin::disconnect( const string& host ) {
3872 std::lock_guard<std::shared_mutex> g( my->connections_mtx );
3873 for( auto itr = my->connections.begin(); itr != my->connections.end(); ++itr ) {
3874 if( (*itr)->peer_address() == host ) {
3875 fc_ilog( logger, "disconnecting: ${cid}", ("cid", (*itr)->connection_id) );
3876 (*itr)->close();
3877 my->connections.erase(itr);
3878 return "connection removed";
3879 }
3880 }
3881 return "no known connection for host";
3882 }
3883
3884 std::optional<connection_status> net_plugin::status( const string& host )const {
3885 std::shared_lock<std::shared_mutex> g( my->connections_mtx );
3886 auto con = my->find_connection( host );
3887 if( con )
3888 return con->get_status();
3889 return std::optional<connection_status>();
3890 }
3891
3894 std::shared_lock<std::shared_mutex> g( my->connections_mtx );
3895 result.reserve( my->connections.size() );
3896 for( const auto& c : my->connections ) {
3897 result.push_back( c->get_status() );
3898 }
3899 return result;
3900 }
3901
3902 // call with connections_mtx
3904 for( const auto& c : connections )
3905 if( c->peer_address() == host ) return c;
3906 return connection_ptr();
3907 }
3908
3910 if (v >= net_version_base) {
3911 v -= net_version_base;
3912 return (v > net_version_range) ? 0 : v;
3913 }
3914 return 0;
3915 }
3916
3917}
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
@ started
the plugin is actively running
Definition plugin.hpp:33
abstract_plugin * find_plugin(const string &name) const
auto post(int priority, Func &&func)
auto get_channel() -> std::enable_if_t< is_channel_decl< ChannelDecl >::value, typename ChannelDecl::channel_type & >
virtual state get_state() const override
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_detail_string(log_level ll=log_level::all) const
const char * what() const noexcept override
std::string to_string(log_level ll=log_level::info) const
static variant from_string(const string &utf8_str, const parse_type ptype=parse_type::legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition json.cpp:442
static void update(const fc::string &name, logger &log)
Definition logger.cpp:92
void log(log_message m)
Definition logger.cpp:62
bool is_enabled(log_level e) const
Definition logger.cpp:58
abstraction for a message buffer that spans a chain of physical buffers
mb_datastream< buffer_len > create_datastream()
std::vector< boost::asio::mutable_buffer > get_buffer_sequence_for_boost_async_read()
mb_peek_datastream< buffer_len > create_peek_datastream()
void advance_read_ptr(uint32_t bytes)
uint32_t bytes_to_write() const
constexpr int64_t to_seconds() const
Definition time.hpp:27
An order-preserving dictionary of variants.
static sha256 hash(const char *d, uint32_t dlen)
Definition sha256.cpp:44
string str() const
Definition sha256.cpp:26
static time_point now()
Definition time.cpp:14
void reset(pointer v)
An order-preserving dictionary of variants.
T as() const
Definition variant.hpp:327
auto events() const
returns number of consecutive rbws
void accepted()
called when a block is accepted (sync_recv_block)
block_status_monitor & operator=(block_status_monitor &&)=delete
block_status_monitor(fc::microseconds window_size=fc::microseconds(2 *1000), uint32_t max_rejected_windows=13)
block_status_monitor(const block_status_monitor &)=delete
block_status_monitor & operator=(const block_status_monitor &)=delete
assignment not allowed
void reset()
reset to initial state
bool max_events_violated() const
indicates if the max number of consecutive rbws has been reached or exceeded
void rejected()
called when a block is rejected
block_status_monitor(block_status_monitor &&)=delete
bool is_trusted_producer(const account_name &producer) const
signed_block_ptr fetch_block_by_number(uint32_t block_num) const
block_id_type fork_db_pending_head_block_id() const
signal< void(const block_state_ptr &)> accepted_block
signal< void(const block_state_ptr &)> irreversible_block
db_read_mode get_read_mode() const
block_state_ptr create_block_state(const block_id_type &id, const signed_block_ptr &b) const
uint32_t head_block_num() const
block_id_type head_block_id() const
signal< void(const signed_block_ptr &)> pre_accepted_block
uint32_t last_irreversible_block_num() const
uint32_t fork_db_pending_head_block_num() const
signed_block_ptr fetch_block_by_id(block_id_type id) const
block_id_type last_irreversible_block_id() const
block_id_type get_block_id_for_num(uint32_t block_num) const
block_state_ptr fetch_block_state_by_id(block_id_type id) const
bool accept_block(const chain::signed_block_ptr &block, const chain::block_id_type &id, const chain::block_state_ptr &bsp)
void accept_transaction(const chain::packed_transaction_ptr &trx, chain::plugin_interface::next_function< chain::transaction_trace_ptr > next)
void set_heartbeat_timeout(std::chrono::milliseconds msec)
void request_sync_blocks(uint32_t start, uint32_t end)
void cancel_sync(go_away_reason)
connection_status get_status() const
tstamp rec
receive timestamp
void enqueue_buffer(const std::shared_ptr< std::vector< char > > &send_buffer, go_away_reason close_after_send, bool to_sync_queue=false)
handshake_message last_handshake_recv
fc::time_point last_close
void send_time()
Populate and queue time_message.
tstamp xmt
transmit timestamp
void set_connection_type(const string &peer_addr)
const uint32_t connection_id
boost::asio::steady_timer response_expected_timer
void blk_send(const block_id_type &blkid)
bool process_next_message(uint32_t message_length)
Process the next message from the pending message buffer.
std::atomic< bool > connecting
void handle_message(const signed_block &msg)=delete
int16_t sent_handshake_count
std::atomic< std::size_t > outstanding_read_bytes
fc::variant_object get_logger_variant() const
void connect(const std::shared_ptr< tcp::resolver > &resolver, tcp::resolver::results_type endpoints)
fc::time_point last_dropped_trx_msg_time
fc::sha256 conn_node_id
void blk_send_branch(const block_id_type &msg_head_id)
std::mutex conn_mtx
void handle_message(const handshake_message &msg)
~connection()=default
std::atomic< go_away_reason > no_retry
tstamp org
originate timestamp
void process_signed_block(const block_id_type &id, signed_block_ptr msg, block_state_ptr bsp)
std::mutex response_expected_timer_mtx
std::atomic< uint16_t > consecutive_immediate_connection_close
bool is_blocks_only_connection() const
void close(bool reconnect=true, bool shutdown=false)
queued_buffer buffer_queue
block_id_type fork_head
block_status_monitor block_status_monitor_
boost::asio::io_context::strand strand
void blk_send_branch_impl(uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num)
string log_remote_endpoint_port
void handle_message(const packed_transaction &msg)=delete
bool is_transactions_only_connection() const
static tstamp get_time()
Read system time and convert to a 64 bit integer.
handshake_message last_handshake_sent
std::atomic< bool > syncing
std::optional< request_message > last_req
string log_remote_endpoint_ip
bool is_valid(const handshake_message &msg) const
const string & peer_address() const
void check_heartbeat(tstamp current_time)
Check heartbeat time and send Time_message.
void sync_timeout(boost::system::error_code ec)
std::atomic< uint16_t > protocol_version
std::atomic< uint32_t > trx_in_progress_size
void enqueue(const net_message &msg)
std::shared_ptr< tcp::socket > socket
void fetch_timeout(boost::system::error_code ec)
bool socket_is_open() const
void queue_write(const std::shared_ptr< vector< char > > &buff, std::function< void(boost::system::error_code, std::size_t)> callback, bool to_sync_queue=false)
bool populate_handshake(handshake_message &hello)
fc::message_buffer< 1024 *1024 > pending_message_buffer
tstamp dst
destination timestamp
void enqueue_block(const signed_block_ptr &sb, bool to_sync_queue=false)
bool have_txn(const transaction_id_type &tid) const
void rejected_transaction(const packed_transaction_ptr &trx, uint32_t head_blk_num)
boost::asio::io_context::strand strand
void expire_blocks(uint32_t bnum)
void bcast_block(const signed_block_ptr &b, const block_id_type &id)
bool peer_has_block(const block_id_type &blkid, uint32_t connection_id) const
void rejected_block(const block_id_type &id)
dispatch_manager(boost::asio::io_context &io_context)
void recv_notice(const connection_ptr &conn, const notice_message &msg, bool generated)
void recv_block(const connection_ptr &conn, const block_id_type &msg, uint32_t bnum)
bool have_block(const block_id_type &blkid) const
bool add_peer_block(const block_id_type &blkid, uint32_t connection_id)
void bcast_transaction(const packed_transaction_ptr &trx)
void retry_fetch(const connection_ptr &conn)
bool add_peer_txn(const transaction_id_type id, const time_point_sec &trx_expires, uint32_t connection_id, const time_point_sec &now=time_point::now())
unique_ptr< boost::asio::steady_timer > expire_timer
std::mutex keepalive_timer_mtx
const std::chrono::system_clock::duration peer_authentication_interval
Peer clock may be no more than 1 second skewed from our clock, including network latency.
std::map< chain::public_key_type, chain::private_key_type > private_keys
overlapping with producer keys, also authenticating non-producing nodes
void ticker()
Peer heartbeat ticker.
unique_ptr< sync_manager > sync_master
chain::signature_type sign_compact(const chain::public_key_type &signer, const fc::sha256 &digest) const
Returns a signature of the digest using the corresponding private key of the signer.
connection_ptr find_connection(const string &host) const
boost::asio::steady_timer::duration connector_period
std::chrono::milliseconds keepalive_interval
bool authenticate_peer(const handshake_message &msg) const
Determine if a peer is allowed to connect.
fc::microseconds p2p_dedup_cache_expire_time_us
unique_ptr< dispatch_manager > dispatcher
void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr< connection > from_connection)
void on_pre_accepted_block(const signed_block_ptr &bs)
string connect(const string &host)
std::atomic< uint32_t > current_connection_id
unique_ptr< boost::asio::steady_timer > keepalive_timer
possible_connections allowed_connections
producer_plugin * producer_plug
std::shared_mutex connections_mtx
void on_irreversible_block(const block_state_ptr &blk)
unique_ptr< tcp::acceptor > acceptor
boost::asio::steady_timer::duration resp_expected_period
chain_plugin * chain_plug
unique_ptr< boost::asio::steady_timer > connector_check_timer
void on_accepted_block(const block_state_ptr &bs)
std::atomic< bool > in_shutdown
std::tuple< uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type > get_chain_info() const
compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription
std::mutex connector_check_timer_mtx
vector< chain::public_key_type > allowed_peers
peer keys allowed to connect
vector< string > supplied_peers
void connection_monitor(std::weak_ptr< connection > from_connection, bool reschedule)
std::chrono::milliseconds heartbeat_timeout
std::optional< sysio::chain::named_thread_pool > thread_pool
static constexpr uint16_t to_protocol_version(uint16_t v)
std::set< connection_ptr > connections
chain::public_key_type get_authentication_key() const
Retrieve public key used to authenticate with peers.
void transaction_ack(const std::pair< fc::exception_ptr, packed_transaction_ptr > &)
boost::asio::steady_timer::duration txn_exp_period
vector< connection_status > connections() const
void plugin_initialize(const variables_map &options)
string disconnect(const string &endpoint)
std::optional< connection_status > status(const string &endpoint) const
void handle_sighup() override
string connect(const string &endpoint)
virtual void set_program_options(options_description &cli, options_description &cfg) override
chain::signature_type sign_compact(const chain::public_key_type &key, const fc::sha256 &digest) const
bool is_producer_key(const chain::public_key_type &key) const
void log_failed_transaction(const transaction_id_type &trx_id, const chain::packed_transaction_ptr &packed_trx_ptr, const char *reason) const
bool ready_to_send() const
void fill_out_buffer(std::vector< boost::asio::const_buffer > &bufs)
bool add_write_queue(const std::shared_ptr< vector< char > > &buff, std::function< void(boost::system::error_code, std::size_t)> callback, bool to_sync_queue)
void out_callback(boost::system::error_code ec, std::size_t w)
uint32_t write_queue_size() const
bool is_out_queue_empty() const
sync_manager(uint32_t span)
void sync_recv_notice(const connection_ptr &c, const notice_message &msg)
void recv_handshake(const connection_ptr &c, const handshake_message &msg)
void rejected_block(const connection_ptr &c, uint32_t blk_num)
std::unique_lock< std::mutex > locked_sync_mutex()
void reset_last_requested_num(const std::unique_lock< std::mutex > &lock)
void sync_reassign_fetch(const connection_ptr &c, go_away_reason reason)
void sync_reset_lib_num(const connection_ptr &conn, bool closing)
bool syncing_with_peer() const
void sync_update_expected(const connection_ptr &c, const block_id_type &blk_id, uint32_t blk_num, bool blk_applied)
void sync_recv_block(const connection_ptr &c, const block_id_type &blk_id, uint32_t blk_num, bool blk_applied)
static void send_handshakes()
uint64_t id
Definition code_cache.cpp:0
client::connection_ptr connection_ptr
Defines exception's used by fc.
#define FC_CAPTURE_AND_RETHROW(...)
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define FC_LOG_AND_RETHROW()
#define FC_LOG_AND_DROP(...)
int * count
catch(...)
void close(T *e, websocketpp::connection_hdl hdl)
#define fc_create_span(TRACE_VARNAME, SPAN_STR)
Definition trace.hpp:24
#define fc_add_tag(SPAN_VARNAME, TAG_KEY_STR, TAG_VALUE_STR)
Definition trace.hpp:40
#define fc_create_trace_with_id(TRACE_STR, TRACE_ID)
Definition trace.hpp:16
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
#define __func__
#define fc_ilog(LOGGER, FORMAT,...)
Definition logger.hpp:83
#define wlog(FORMAT,...)
Definition logger.hpp:124
#define elog(FORMAT,...)
Definition logger.hpp:130
#define fc_elog(LOGGER, FORMAT,...)
Definition logger.hpp:95
#define fc_wlog(LOGGER, FORMAT,...)
Definition logger.hpp:89
#define fc_dlog(LOGGER, FORMAT,...)
Definition logger.hpp:77
return str
Definition CLI11.hpp:1359
static const Segment ds(Segment::ds)
static const Reg8 bh(Operand::BH)
application & app()
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
void pack(Stream &s, const std::deque< T > &value)
Definition raw.hpp:531
size_t pack_size(const T &v)
Definition raw.hpp:671
std::string string
Definition string.hpp:10
fc::sha256 digest(const T &value)
Definition digest.hpp:9
constexpr microseconds milliseconds(int64_t s)
Definition time.hpp:33
void rand_pseudo_bytes(char *buf, int count)
Definition rand.cpp:16
constexpr microseconds seconds(int64_t s)
Definition time.hpp:32
constexpr std::size_t get_index()
fc::crypto::public_key public_key_type
Definition types.hpp:76
fc::sha256 block_id_type
Definition types.hpp:231
std::shared_ptr< transaction_trace > transaction_trace_ptr
Definition trace.hpp:20
std::shared_ptr< const packed_transaction > packed_transaction_ptr
key Invalid authority Invalid transaction Invalid block ID Invalid packed transaction Invalid chain ID Invalid symbol Signature type is not a currently activated type Block can not be found block_validate_exception
std::shared_ptr< signed_block > signed_block_ptr
Definition block.hpp:105
std::shared_ptr< block_state > block_state_ptr
fc::crypto::signature signature_type
Definition types.hpp:78
constexpr auto message_header_size
void for_each_block_connection(Function f)
constexpr uint16_t net_version_max
enum_type & operator|=(enum_type &lhs, const enum_type &rhs)
constexpr uint16_t proto_explicit_sync
void verify_strand_in_this_thread(const Strand &strand, const char *func, int line)
size_t calc_trx_size(const packed_transaction_ptr &trx)
constexpr size_t max_handshake_str_length
Definition protocol.hpp:25
constexpr uint16_t proto_dup_goaway_resolution
constexpr uint16_t proto_base
constexpr uint16_t proto_wire_sysio_initial
@ normal
Definition protocol.hpp:96
@ last_irr_catch_up
Definition protocol.hpp:95
@ catch_up
Definition protocol.hpp:94
std::chrono::system_clock::duration::rep tstamp
Definition protocol.hpp:11
constexpr auto modes_str(id_list_modes m)
Definition protocol.hpp:99
std::shared_ptr< std::vector< char > > send_buffer_type
constexpr auto def_max_clients
std::string peer_log_format
T dejsonify(const string &s)
go_away_reason
Definition protocol.hpp:46
@ duplicate
the connection is redundant
Definition protocol.hpp:49
@ self
the connection is to itself
Definition protocol.hpp:48
@ no_reason
no reason to go away
Definition protocol.hpp:47
@ wrong_chain
the peer's chain id doesn't match
Definition protocol.hpp:50
@ wrong_version
the peer's network version doesn't match
Definition protocol.hpp:51
@ unlinkable
the peer sent a block we couldn't use
Definition protocol.hpp:53
@ fatal_other
a catch-all for errors we don't have discriminated
Definition protocol.hpp:57
@ authentication
peer failed authenicatio
Definition protocol.hpp:58
@ validation
the peer sent a block that failed validation
Definition protocol.hpp:55
@ benign_other
reasons such as a timeout. not fatal but warrant resetting
Definition protocol.hpp:56
multi_index_container< node_transaction_state, indexed_by< ordered_unique< tag< by_id >, composite_key< node_transaction_state, member< node_transaction_state, transaction_id_type, &node_transaction_state::id >, member< node_transaction_state, uint32_t, &node_transaction_state::connection_id > >, composite_key_compare< sha256_less, std::less< uint32_t > > >, ordered_non_unique< tag< by_expiry >, member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > > > > node_transaction_index
constexpr uint32_t signed_block_which
constexpr uint16_t proto_block_id_notify
multi_index_container< sysio::peer_block_state, indexed_by< ordered_unique< tag< by_id >, composite_key< peer_block_state, member< peer_block_state, uint32_t, &sysio::peer_block_state::connection_id >, member< peer_block_state, block_id_type, &sysio::peer_block_state::id > >, composite_key_compare< std::less< uint32_t >, sha256_less > >, ordered_non_unique< tag< by_peer_block_id >, composite_key< peer_block_state, member< peer_block_state, block_id_type, &sysio::peer_block_state::id >, member< peer_block_state, bool, &sysio::peer_block_state::have_block > >, composite_key_compare< sha256_less, std::greater< bool > > >, ordered_non_unique< tag< by_block_num >, member< sysio::peer_block_state, uint32_t, &sysio::peer_block_state::block_num > > > > peer_block_state_index
constexpr uint16_t proto_pruned_types
std::weak_ptr< connection > connection_wptr
fc::logger logger
constexpr auto def_send_buffer_size_mb
constexpr uint16_t proto_heartbeat_interval
std::variant< handshake_message, chain_size_message, go_away_message, time_message, notice_message, request_message, sync_request_message, signed_block, packed_transaction > net_message
Definition protocol.hpp:138
constexpr uint32_t packed_transaction_which
constexpr auto def_send_buffer_size
constexpr auto def_sync_fetch_span
constexpr auto def_max_trx_in_progress_size
constexpr auto reason_str(go_away_reason rsn)
Definition protocol.hpp:61
constexpr auto def_max_nodes_per_host
constexpr uint16_t net_version_range
constexpr size_t max_p2p_address_length
Definition protocol.hpp:24
constexpr uint16_t net_version_base
constexpr auto def_resp_expected_wait
constexpr auto def_max_write_queue_size
constexpr auto def_keepalive_interval
constexpr auto def_txn_expire_wait
constexpr auto def_conn_retry_wait
const fc::string logger_name("net_plugin_impl")
constexpr uint16_t proto_dup_node_id_goaway
std::shared_ptr< connection > connection_ptr
constexpr auto def_max_consecutive_immediate_connection_close
void for_each_connection(Function f)
#define peer_elog(PEER, FORMAT,...)
#define peer_dlog(PEER, FORMAT,...)
#define peer_ilog(PEER, FORMAT,...)
#define peer_wlog(PEER, FORMAT,...)
#define T(meth, val, expected)
signed short int16_t
Definition stdint.h:122
unsigned short uint16_t
Definition stdint.h:125
signed __int64 int64_t
Definition stdint.h:135
unsigned int uint32_t
Definition stdint.h:126
#define INT16_MAX
Definition stdint.h:181
static constexpr int medium
static constexpr int highest
const send_buffer_type & get_send_buffer(const signed_block_ptr &sb)
caches result for subsequent calls, only provide same signed_block_ptr instance for each invocation.
static send_buffer_type create_send_buffer(uint32_t which, const T &v)
const send_buffer_type & get_send_buffer(const net_message &m)
caches result for subsequent calls, only provide same net_message instance for each invocation
static send_buffer_type create_send_buffer(const net_message &m)
send_buffer_type send_buffer
static uint32_t num_from_id(const block_id_type &id)
handshake_message last_handshake
go_away_reason reason
Definition protocol.hpp:81
fc::sha256 node_id
for duplicate notification
Definition protocol.hpp:82
chain_id_type chain_id
used to identify chain
Definition protocol.hpp:29
uint16_t network_version
incremental value above a computed base
Definition protocol.hpp:28
block_id_type head_id
Definition protocol.hpp:39
uint32_t last_irreversible_block_num
Definition protocol.hpp:36
fc::sha256 node_id
used to identify peers and prevent self-connect
Definition protocol.hpp:30
fc::sha256 token
digest of time to prove we own the private key of the key above
Definition protocol.hpp:33
chain::public_key_type key
authentication key; may be a producer or peer key, or empty
Definition protocol.hpp:31
chain::signature_type sig
signature for the digest
Definition protocol.hpp:34
int64_t time
time message created in nanoseconds from epoch
Definition protocol.hpp:32
block_id_type last_irreversible_block_id
Definition protocol.hpp:37
void operator()(const handshake_message &msg) const
void operator()(const T &) const
void operator()(const request_message &msg) const
void operator()(const chain_size_message &msg) const
msg_handler(const connection_ptr &conn)
connection_ptr c
void operator()(const go_away_message &msg) const
void operator()(const sync_request_message &msg) const
void operator()(const time_message &msg) const
void operator()(const notice_message &msg) const
uint32_t connection_id
time after which this may be purged.
ordered_blk_ids known_blocks
Definition protocol.hpp:124
ordered_txn_ids known_trx
Definition protocol.hpp:123
peer_sync_state(uint32_t start=0, uint32_t end=0, uint32_t last_acted=0)
time_point start_time
time request made or received
uint32_t last
last sent or received
ordered_blk_ids req_blocks
Definition protocol.hpp:130
ordered_txn_ids req_trx
Definition protocol.hpp:129
id_list_modes mode
Definition protocol.hpp:112
vector< T > ids
Definition protocol.hpp:114
tstamp xmt
transmit timestamp
Definition protocol.hpp:88
tstamp dst
destination timestamp
Definition protocol.hpp:89
tstamp rec
receive timestamp
Definition protocol.hpp:87
tstamp org
origin timestamp
Definition protocol.hpp:86
const send_buffer_type & get_send_buffer(const packed_transaction_ptr &trx)
caches result for subsequent calls, only provide same packed_transaction_ptr instance for each invoca...
void rep()
char * s