Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
sysio::net_plugin Class Reference

#include <net_plugin.hpp>

Inheritance diagram for sysio::net_plugin:
Collaboration diagram for sysio::net_plugin:

Public Member Functions

 net_plugin ()
 
virtual ~net_plugin ()
 
virtual void set_program_options (options_description &cli, options_description &cfg) override
 
void handle_sighup () override
 
void plugin_initialize (const variables_map &options)
 
void plugin_startup ()
 
void plugin_shutdown ()
 
string connect (const string &endpoint)
 
string disconnect (const string &endpoint)
 
std::optional< connection_statusstatus (const string &endpoint) const
 
vector< connection_statusconnections () const
 
- Public Member Functions inherited from appbase::plugin< net_plugin >
 plugin ()
 
virtual ~plugin ()
 
virtual state get_state () const override
 
virtual const std::string & name () const override
 
virtual void register_dependencies ()
 
virtual void initialize (const variables_map &options) override
 
virtual void startup () override
 
virtual void shutdown () override
 
- Public Member Functions inherited from appbase::abstract_plugin
virtual ~abstract_plugin ()
 

Additional Inherited Members

- Public Types inherited from appbase::abstract_plugin
enum  state { registered , initialized , started , stopped }
 
- Protected Member Functions inherited from appbase::plugin< net_plugin >
 plugin (const string &name)
 

Detailed Description

Definition at line 16 of file net_plugin.hpp.

Constructor & Destructor Documentation

◆ net_plugin()

net_plugin::net_plugin ( )

Definition at line 3539 of file net_plugin.cpp.

3540 :my( new net_plugin_impl ) {
3541 my_impl = my.get();
3542 }

◆ ~net_plugin()

net_plugin::~net_plugin ( )
virtual

Definition at line 3544 of file net_plugin.cpp.

3544 {
3545 }

Member Function Documentation

◆ connect()

string sysio::net_plugin::connect ( const string & host)

Used to trigger a new connection from RPC API

Definition at line 3852 of file net_plugin.cpp.

3852 {
3853 return my->connect( host );
3854 }

◆ connections()

vector< connection_status > net_plugin::connections ( ) const

Definition at line 3892 of file net_plugin.cpp.

3892 {
3894 std::shared_lock<std::shared_mutex> g( my->connections_mtx );
3895 result.reserve( my->connections.size() );
3896 for( const auto& c : my->connections ) {
3897 result.push_back( c->get_status() );
3898 }
3899 return result;
3900 }

◆ disconnect()

string net_plugin::disconnect ( const string & endpoint)

Definition at line 3871 of file net_plugin.cpp.

3871 {
3872 std::lock_guard<std::shared_mutex> g( my->connections_mtx );
3873 for( auto itr = my->connections.begin(); itr != my->connections.end(); ++itr ) {
3874 if( (*itr)->peer_address() == host ) {
3875 fc_ilog( logger, "disconnecting: ${cid}", ("cid", (*itr)->connection_id) );
3876 (*itr)->close();
3877 my->connections.erase(itr);
3878 return "connection removed";
3879 }
3880 }
3881 return "no known connection for host";
3882 }
#define fc_ilog(LOGGER, FORMAT,...)
Definition logger.hpp:83

◆ handle_sighup()

void net_plugin::handle_sighup ( )
overridevirtual

Reimplemented from appbase::plugin< net_plugin >.

Definition at line 3801 of file net_plugin.cpp.

3801 {
3803 }
static void update(const fc::string &name, logger &log)
Definition logger.cpp:92
const fc::string logger_name("net_plugin_impl")
Here is the call graph for this function:
Here is the caller graph for this function:

◆ plugin_initialize()

void net_plugin::plugin_initialize ( const variables_map & options)

Definition at line 3596 of file net_plugin.cpp.

3596 {
3597 fc_ilog( logger, "Initialize net plugin" );
3598 try {
3599 peer_log_format = options.at( "peer-log-format" ).as<string>();
3600
3601 my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
3602
3603 my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as<int>());
3604 my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as<int>();
3605 my->txn_exp_period = def_txn_expire_wait;
3606 my->p2p_dedup_cache_expire_time_us = fc::seconds( options.at( "p2p-dedup-cache-expire-time-sec" ).as<uint32_t>() );
3607 my->resp_expected_period = def_resp_expected_wait;
3608 my->max_client_count = options.at( "max-clients" ).as<int>();
3609 my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as<int>();
3610 my->p2p_accept_transactions = options.at( "p2p-accept-transactions" ).as<bool>();
3611
3612 my->use_socket_read_watermark = options.at( "use-socket-read-watermark" ).as<bool>();
3613 my->keepalive_interval = std::chrono::milliseconds( options.at( "p2p-keepalive-interval-ms" ).as<int>() );
3614 SYS_ASSERT( my->keepalive_interval.count() > 0, chain::plugin_config_exception,
3615 "p2p-keepalive_interval-ms must be greater than 0" );
3616
3617 if( options.count( "p2p-keepalive-interval-ms" )) {
3618 my->heartbeat_timeout = std::chrono::milliseconds( options.at( "p2p-keepalive-interval-ms" ).as<int>() * 2 );
3619 }
3620
3621 if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as<string>().length()) {
3622 my->p2p_address = options.at( "p2p-listen-endpoint" ).as<string>();
3623 SYS_ASSERT( my->p2p_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
3624 "p2p-listen-endpoint too long, must be less than ${m}", ("m", max_p2p_address_length) );
3625 }
3626 if( options.count( "p2p-server-address" ) ) {
3627 my->p2p_server_address = options.at( "p2p-server-address" ).as<string>();
3628 SYS_ASSERT( my->p2p_server_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
3629 "p2p_server_address too long, must be less than ${m}", ("m", max_p2p_address_length) );
3630 }
3631
3632 my->thread_pool_size = options.at( "net-threads" ).as<uint16_t>();
3633 SYS_ASSERT( my->thread_pool_size > 0, chain::plugin_config_exception,
3634 "net-threads ${num} must be greater than 0", ("num", my->thread_pool_size) );
3635
3636 if( options.count( "p2p-peer-address" )) {
3637 my->supplied_peers = options.at( "p2p-peer-address" ).as<vector<string> >();
3638 }
3639 if( options.count( "agent-name" )) {
3640 my->user_agent_name = options.at( "agent-name" ).as<string>();
3641 SYS_ASSERT( my->user_agent_name.length() <= max_handshake_str_length, chain::plugin_config_exception,
3642 "agent-name too long, must be less than ${m}", ("m", max_handshake_str_length) );
3643 }
3644
3645 if( options.count( "allowed-connection" )) {
3646 const std::vector<std::string> allowed_remotes = options["allowed-connection"].as<std::vector<std::string>>();
3647 for( const std::string& allowed_remote : allowed_remotes ) {
3648 if( allowed_remote == "any" )
3649 my->allowed_connections |= net_plugin_impl::Any;
3650 else if( allowed_remote == "producers" )
3651 my->allowed_connections |= net_plugin_impl::Producers;
3652 else if( allowed_remote == "specified" )
3653 my->allowed_connections |= net_plugin_impl::Specified;
3654 else if( allowed_remote == "none" )
3655 my->allowed_connections = net_plugin_impl::None;
3656 }
3657 }
3658
3659 if( my->allowed_connections & net_plugin_impl::Specified )
3660 SYS_ASSERT( options.count( "peer-key" ),
3661 plugin_config_exception,
3662 "At least one peer-key must accompany 'allowed-connection=specified'" );
3663
3664 if( options.count( "peer-key" )) {
3665 const std::vector<std::string> key_strings = options["peer-key"].as<std::vector<std::string>>();
3666 for( const std::string& key_string : key_strings ) {
3667 my->allowed_peers.push_back( dejsonify<chain::public_key_type>( key_string ));
3668 }
3669 }
3670
3671 if( options.count( "peer-private-key" )) {
3672 const std::vector<std::string> key_id_to_wif_pair_strings = options["peer-private-key"].as<std::vector<std::string>>();
3673 for( const std::string& key_id_to_wif_pair_string : key_id_to_wif_pair_strings ) {
3675 key_id_to_wif_pair_string );
3676 my->private_keys[key_id_to_wif_pair.first] = fc::crypto::private_key( key_id_to_wif_pair.second );
3677 }
3678 }
3679
3680 my->chain_plug = app().find_plugin<chain_plugin>();
3681 SYS_ASSERT( my->chain_plug, chain::missing_chain_plugin_exception, "" );
3682 my->chain_id = my->chain_plug->get_chain_id();
3683 fc::rand_pseudo_bytes( my->node_id.data(), my->node_id.data_size());
3684 const controller& cc = my->chain_plug->chain();
3685
3686 if( cc.get_read_mode() == db_read_mode::IRREVERSIBLE || cc.get_read_mode() == db_read_mode::READ_ONLY ) {
3687 if( my->p2p_accept_transactions ) {
3688 my->p2p_accept_transactions = false;
3689 string m = cc.get_read_mode() == db_read_mode::IRREVERSIBLE ? "irreversible" : "read-only";
3690 wlog( "p2p-accept-transactions set to false due to read-mode: ${m}", ("m", m) );
3691 }
3692 }
3693 if( my->p2p_accept_transactions ) {
3694 my->chain_plug->enable_accept_transactions();
3695 }
3696
3698 }
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
abstract_plugin * find_plugin(const string &name) const
db_read_mode get_read_mode() const
#define FC_LOG_AND_RETHROW()
#define wlog(FORMAT,...)
Definition logger.hpp:124
application & app()
void rand_pseudo_bytes(char *buf, int count)
Definition rand.cpp:16
constexpr microseconds seconds(int64_t s)
Definition time.hpp:32
constexpr size_t max_handshake_str_length
Definition protocol.hpp:25
std::string peer_log_format
T dejsonify(const string &s)
constexpr size_t max_p2p_address_length
Definition protocol.hpp:24
constexpr auto def_resp_expected_wait
constexpr auto def_txn_expire_wait
unsigned short uint16_t
Definition stdint.h:125
unsigned int uint32_t
Definition stdint.h:126
Here is the call graph for this function:

◆ plugin_shutdown()

void net_plugin::plugin_shutdown ( )

Definition at line 3805 of file net_plugin.cpp.

3805 {
3806 try {
3807 fc_ilog( logger, "shutdown.." );
3808 my->in_shutdown = true;
3809 {
3810 std::lock_guard<std::mutex> g( my->connector_check_timer_mtx );
3811 if( my->connector_check_timer )
3812 my->connector_check_timer->cancel();
3813 }{
3814 std::lock_guard<std::mutex> g( my->expire_timer_mtx );
3815 if( my->expire_timer )
3816 my->expire_timer->cancel();
3817 }{
3818 std::lock_guard<std::mutex> g( my->keepalive_timer_mtx );
3819 if( my->keepalive_timer )
3820 my->keepalive_timer->cancel();
3821 }
3822
3823 {
3824 fc_ilog( logger, "close ${s} connections", ("s", my->connections.size()) );
3825 std::lock_guard<std::shared_mutex> g( my->connections_mtx );
3826 for( auto& con : my->connections ) {
3827 fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) );
3828 con->close( false, true );
3829 }
3830 my->connections.clear();
3831 }
3832
3833 if( my->thread_pool ) {
3834 my->thread_pool->stop();
3835 }
3836
3837 if( my->acceptor ) {
3838 boost::system::error_code ec;
3839 my->acceptor->cancel( ec );
3840 my->acceptor->close( ec );
3841 }
3842
3843 app().post( 0, [me = my](){} ); // keep my pointer alive until queue is drained
3844 fc_ilog( logger, "exit shutdown" );
3845 }
3847 }
auto post(int priority, Func &&func)
#define FC_CAPTURE_AND_RETHROW(...)
#define fc_dlog(LOGGER, FORMAT,...)
Definition logger.hpp:77
Here is the call graph for this function:
Here is the caller graph for this function:

◆ plugin_startup()

void net_plugin::plugin_startup ( )

Definition at line 3700 of file net_plugin.cpp.

3700 {
3701 handle_sighup();
3702 try {
3703
3704 fc_ilog( logger, "my node_id is ${id}", ("id", my->node_id ));
3705
3706 my->producer_plug = app().find_plugin<producer_plugin>();
3707
3708 my->thread_pool.emplace( "net", my->thread_pool_size );
3709
3710 my->dispatcher.reset( new dispatch_manager( my_impl->thread_pool->get_executor() ) );
3711
3712 if( !my->p2p_accept_transactions && my->p2p_address.size() ) {
3713 fc_ilog( logger, "\n"
3714 "***********************************\n"
3715 "* p2p-accept-transactions = false *\n"
3716 "* Transactions not forwarded *\n"
3717 "***********************************\n" );
3718 }
3719
3720 tcp::endpoint listen_endpoint;
3721 if( my->p2p_address.size() > 0 ) {
3722 auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' ));
3723 auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size());
3724 tcp::resolver resolver( my->thread_pool->get_executor() );
3725 // Note: need to add support for IPv6 too?
3726 listen_endpoint = *resolver.resolve( tcp::v4(), host, port );
3727
3728 my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool->get_executor() ) );
3729
3730 if( !my->p2p_server_address.empty() ) {
3731 my->p2p_address = my->p2p_server_address;
3732 } else {
3733 if( listen_endpoint.address().to_v4() == address_v4::any()) {
3734 boost::system::error_code ec;
3735 auto host = host_name( ec );
3736 if( ec.value() != boost::system::errc::success ) {
3737
3738 FC_THROW_EXCEPTION( fc::invalid_arg_exception,
3739 "Unable to retrieve host_name. ${msg}", ("msg", ec.message()));
3740
3741 }
3742 auto port = my->p2p_address.substr( my->p2p_address.find( ':' ), my->p2p_address.size());
3743 my->p2p_address = host + port;
3744 }
3745 }
3746 }
3747
3748 {
3749 chain::controller& cc = my->chain_plug->chain();
3750 cc.accepted_block.connect( [my = my]( const block_state_ptr& s ) {
3751 my->on_accepted_block( s );
3752 } );
3753 cc.pre_accepted_block.connect( [my = my]( const signed_block_ptr& s ) {
3754 my->on_pre_accepted_block( s );
3755 } );
3756 cc.irreversible_block.connect( [my = my]( const block_state_ptr& s ) {
3757 my->on_irreversible_block( s );
3758 } );
3759 }
3760
3761 {
3762 std::lock_guard<std::mutex> g( my->keepalive_timer_mtx );
3763 my->keepalive_timer.reset( new boost::asio::steady_timer( my->thread_pool->get_executor() ) );
3764 }
3765
3766 my->incoming_transaction_ack_subscription = app().get_channel<compat::channels::transaction_ack>().subscribe(
3767 std::bind(&net_plugin_impl::transaction_ack, my.get(), std::placeholders::_1));
3768
3769 app().post(priority::highest, [my=my, listen_endpoint](){
3770 if( my->acceptor ) {
3771 try {
3772 my->acceptor->open(listen_endpoint.protocol());
3773 my->acceptor->set_option(tcp::acceptor::reuse_address(true));
3774 my->acceptor->bind(listen_endpoint);
3775 my->acceptor->listen();
3776 } catch (const std::exception& e) {
3777 elog( "net_plugin::plugin_startup failed to bind to port ${port}, ${what}",
3778 ("port", listen_endpoint.port())("what", e.what()) );
3779 app().quit();
3780 return;
3781 }
3782 fc_ilog( logger, "starting listener, max clients is ${mc}",("mc",my->max_client_count) );
3783 my->start_listen_loop();
3784 }
3785
3786 my->ticker();
3787 my->start_monitors();
3788 my->update_chain_info();
3789 for( const auto& seed_node : my->supplied_peers ) {
3790 my->connect( seed_node );
3791 }
3792 });
3793
3794 } catch( ... ) {
3795 // always want plugin_shutdown even on exception
3797 throw;
3798 }
3799 }
auto get_channel() -> std::enable_if_t< is_channel_decl< ChannelDecl >::value, typename ChannelDecl::channel_type & >
std::optional< sysio::chain::named_thread_pool > thread_pool
void transaction_ack(const std::pair< fc::exception_ptr, packed_transaction_ptr > &)
void handle_sighup() override
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define elog(FORMAT,...)
Definition logger.hpp:130
std::shared_ptr< signed_block > signed_block_ptr
Definition block.hpp:105
std::shared_ptr< block_state > block_state_ptr
static constexpr int highest
char * s
Here is the call graph for this function:

◆ set_program_options()

void net_plugin::set_program_options ( options_description & cli,
options_description & cfg )
overridevirtual

Implements appbase::abstract_plugin.

Definition at line 3547 of file net_plugin.cpp.

3548 {
3549 cfg.add_options()
3550 ( "p2p-listen-endpoint", bpo::value<string>()->default_value( "0.0.0.0:9876" ), "The actual host:port used to listen for incoming p2p connections.")
3551 ( "p2p-server-address", bpo::value<string>(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint.")
3552 ( "p2p-peer-address", bpo::value< vector<string> >()->composing(),
3553 "The public endpoint of a peer node to connect to. Use multiple p2p-peer-address options as needed to compose a network.\n"
3554 " Syntax: host:port[:<trx>|<blk>]\n"
3555 " The optional 'trx' and 'blk' indicates to node that only transactions 'trx' or blocks 'blk' should be sent."
3556 " Examples:\n"
3557 " p2p.eos.io:9876\n"
3558 " p2p.trx.eos.io:9876:trx\n"
3559 " p2p.blk.eos.io:9876:blk\n")
3560 ( "p2p-max-nodes-per-host", bpo::value<int>()->default_value(def_max_nodes_per_host), "Maximum number of client nodes from any single IP address")
3561 ( "p2p-accept-transactions", bpo::value<bool>()->default_value(true), "Allow transactions received over p2p network to be evaluated and relayed if valid.")
3562 ( "agent-name", bpo::value<string>()->default_value("SYS Test Agent"), "The name supplied to identify this node amongst the peers.")
3563 ( "allowed-connection", bpo::value<vector<string>>()->multitoken()->default_value({"any"}, "any"), "Can be 'any' or 'producers' or 'specified' or 'none'. If 'specified', peer-key must be specified at least once. If only 'producers', peer-key is not required. 'producers' and 'specified' may be combined.")
3564 ( "peer-key", bpo::value<vector<string>>()->composing()->multitoken(), "Optional public key of peer allowed to connect. May be used multiple times.")
3565 ( "peer-private-key", boost::program_options::value<vector<string>>()->composing()->multitoken(),
3566 "Tuple of [PublicKey, WIF private key] (may specify multiple times)")
3567 ( "max-clients", bpo::value<int>()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit")
3568 ( "connection-cleanup-period", bpo::value<int>()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections")
3569 ( "max-cleanup-time-msec", bpo::value<int>()->default_value(10), "max connection cleanup time per cleanup call in milliseconds")
3570 ( "p2p-dedup-cache-expire-time-sec", bpo::value<uint32_t>()->default_value(10), "Maximum time to track transaction for duplicate optimization")
3571 ( "net-threads", bpo::value<uint16_t>()->default_value(my->thread_pool_size),
3572 "Number of worker threads in net_plugin thread pool" )
3573 ( "sync-fetch-span", bpo::value<uint32_t>()->default_value(def_sync_fetch_span), "number of blocks to retrieve in a chunk from any individual peer during synchronization")
3574 ( "use-socket-read-watermark", bpo::value<bool>()->default_value(false), "Enable experimental socket read watermark optimization")
3575 ( "peer-log-format", bpo::value<string>()->default_value( "[\"${_name}\" - ${_cid} ${_ip}:${_port}] " ),
3576 "The string used to format peers when logging messages about them. Variables are escaped with ${<variable name>}.\n"
3577 "Available Variables:\n"
3578 " _name \tself-reported name\n\n"
3579 " _cid \tassigned connection id\n\n"
3580 " _id \tself-reported ID (64 hex characters)\n\n"
3581 " _sid \tfirst 8 characters of _peer.id\n\n"
3582 " _ip \tremote IP address of peer\n\n"
3583 " _port \tremote port number of peer\n\n"
3584 " _lip \tlocal IP address connected to peer\n\n"
3585 " _lport \tlocal port number connected to peer\n\n")
3586 ( "p2p-keepalive-interval-ms", bpo::value<int>()->default_value(def_keepalive_interval), "peer heartbeat keepalive message interval in milliseconds")
3587
3588 ;
3589 }
constexpr auto def_max_clients
constexpr auto def_sync_fetch_span
constexpr auto def_max_nodes_per_host
constexpr auto def_keepalive_interval
constexpr auto def_conn_retry_wait

◆ status()

std::optional< connection_status > net_plugin::status ( const string & endpoint) const

Definition at line 3884 of file net_plugin.cpp.

3884 {
3885 std::shared_lock<std::shared_mutex> g( my->connections_mtx );
3886 auto con = my->find_connection( host );
3887 if( con )
3888 return con->get_status();
3889 return std::optional<connection_status>();
3890 }

The documentation for this class was generated from the following files: