Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
sysio::dispatch_manager Class Reference

Public Member Functions

 dispatch_manager (boost::asio::io_context &io_context)
 
void bcast_transaction (const packed_transaction_ptr &trx)
 
void rejected_transaction (const packed_transaction_ptr &trx, uint32_t head_blk_num)
 
void bcast_block (const signed_block_ptr &b, const block_id_type &id)
 
void rejected_block (const block_id_type &id)
 
void recv_block (const connection_ptr &conn, const block_id_type &msg, uint32_t bnum)
 
void expire_blocks (uint32_t bnum)
 
void recv_notice (const connection_ptr &conn, const notice_message &msg, bool generated)
 
void retry_fetch (const connection_ptr &conn)
 
bool add_peer_block (const block_id_type &blkid, uint32_t connection_id)
 
bool peer_has_block (const block_id_type &blkid, uint32_t connection_id) const
 
bool have_block (const block_id_type &blkid) const
 
bool add_peer_txn (const transaction_id_type id, const time_point_sec &trx_expires, uint32_t connection_id, const time_point_sec &now=time_point::now())
 
bool have_txn (const transaction_id_type &tid) const
 
void expire_txns ()
 

Public Attributes

boost::asio::io_context::strand strand
 

Detailed Description

Definition at line 165 of file net_plugin.cpp.

Constructor & Destructor Documentation

◆ dispatch_manager()

sysio::dispatch_manager::dispatch_manager ( boost::asio::io_context & io_context)
inlineexplicit

Definition at line 174 of file net_plugin.cpp.

175 : strand( io_context ) {}
boost::asio::io_context::strand strand

Member Function Documentation

◆ add_peer_block()

bool sysio::dispatch_manager::add_peer_block ( const block_id_type & blkid,
uint32_t connection_id )

Definition at line 2060 of file net_plugin.cpp.

2060 {
2061 std::lock_guard<std::mutex> g( blk_state_mtx );
2062 auto bptr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
2063 bool added = (bptr == blk_state.end());
2064 if( added ) {
2065 blk_state.insert( {blkid, block_header::num_from_id( blkid ), connection_id, true} );
2066 } else if( !bptr->have_block ) {
2067 blk_state.modify( bptr, []( auto& pb ) {
2068 pb.have_block = true;
2069 });
2070 }
2071 return added;
2072 }
RUNTIME_API Runtime::ObjectInstance * find(const std::string &name, const IR::ObjectType &type)
static uint32_t num_from_id(const block_id_type &id)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ add_peer_txn()

bool sysio::dispatch_manager::add_peer_txn ( const transaction_id_type id,
const time_point_sec & trx_expires,
uint32_t connection_id,
const time_point_sec & now = time_point::now() )

Definition at line 2091 of file net_plugin.cpp.

2092 {
2093 std::lock_guard<std::mutex> g( local_txns_mtx );
2094 auto tptr = local_txns.get<by_id>().find( std::make_tuple( std::ref( id ), connection_id ) );
2095 bool added = (tptr == local_txns.end());
2096 if( added ) {
2097 // expire at either transaction expiration or configured max expire time whichever is less
2098 time_point_sec expires = now + my_impl->p2p_dedup_cache_expire_time_us;
2099 expires = std::min( trx_expires, expires );
2100 local_txns.insert( node_transaction_state{
2101 .id = id,
2102 .expires = expires,
2103 .connection_id = connection_id} );
2104 }
2105 return added;
2106 }
fc::microseconds p2p_dedup_cache_expire_time_us
uint64_t id
Definition code_cache.cpp:0
Here is the caller graph for this function:

◆ bcast_block()

void sysio::dispatch_manager::bcast_block ( const signed_block_ptr & b,
const block_id_type & id )

Definition at line 2135 of file net_plugin.cpp.

2135 {
2136 fc_dlog( logger, "bcast block ${b}", ("b", b->block_num()) );
2137
2138 if( my_impl->sync_master->syncing_with_peer() ) return;
2139
2140 block_buffer_factory buff_factory;
2141 const auto bnum = b->block_num();
2142 for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) {
2143 fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}",
2144 ("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) );
2145 if( !cp->current() ) return true;
2146 send_buffer_type sb = buff_factory.get_send_buffer( b );
2147
2148 cp->strand.post( [this, cp, id, bnum, sb{std::move(sb)}]() {
2149 cp->latest_blk_time = cp->get_time();
2150 std::unique_lock<std::mutex> g_conn( cp->conn_mtx );
2151 bool has_block = cp->last_handshake_recv.last_irreversible_block_num >= bnum;
2152 g_conn.unlock();
2153 if( !has_block ) {
2154 if( !add_peer_block( id, cp->connection_id ) ) {
2155 peer_dlog( cp, "not bcast block ${b}", ("b", bnum) );
2156 return;
2157 }
2158 peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
2159 cp->enqueue_buffer( sb, no_reason );
2160 }
2161 });
2162 return true;
2163 } );
2164 }
bool add_peer_block(const block_id_type &blkid, uint32_t connection_id)
unique_ptr< sync_manager > sync_master
#define fc_dlog(LOGGER, FORMAT,...)
Definition logger.hpp:77
void for_each_block_connection(Function f)
std::shared_ptr< std::vector< char > > send_buffer_type
@ no_reason
no reason to go away
Definition protocol.hpp:47
#define peer_dlog(PEER, FORMAT,...)
Here is the call graph for this function:

◆ bcast_transaction()

void sysio::dispatch_manager::bcast_transaction ( const packed_transaction_ptr & trx)

Definition at line 2187 of file net_plugin.cpp.

2187 {
2188 trx_buffer_factory buff_factory;
2189 const auto now = fc::time_point::now();
2190 for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) {
2191 if( cp->is_blocks_only_connection() || !cp->current() ) {
2192 return true;
2193 }
2194 if( !add_peer_txn(trx->id(), trx->expiration(), cp->connection_id, now) ) {
2195 return true;
2196 }
2197
2198 send_buffer_type sb = buff_factory.get_send_buffer( trx );
2199 fc_dlog( logger, "sending trx: ${id}, to connection ${cid}", ("id", trx->id())("cid", cp->connection_id) );
2200 cp->strand.post( [cp, sb{std::move(sb)}]() {
2201 cp->enqueue_buffer( sb, no_reason );
2202 } );
2203 return true;
2204 } );
2205 }
static time_point now()
Definition time.cpp:14
bool add_peer_txn(const transaction_id_type id, const time_point_sec &trx_expires, uint32_t connection_id, const time_point_sec &now=time_point::now())
void for_each_connection(Function f)
Here is the call graph for this function:

◆ expire_blocks()

void sysio::dispatch_manager::expire_blocks ( uint32_t bnum)

Definition at line 2128 of file net_plugin.cpp.

2128 {
2129 std::lock_guard<std::mutex> g(blk_state_mtx);
2130 auto& stale_blk = blk_state.get<by_block_num>();
2131 stale_blk.erase( stale_blk.lower_bound(1), stale_blk.upper_bound(lib_num) );
2132 }

◆ expire_txns()

void sysio::dispatch_manager::expire_txns ( )

Definition at line 2114 of file net_plugin.cpp.

2114 {
2115 size_t start_size = 0, end_size = 0;
2116
2117 std::unique_lock<std::mutex> g( local_txns_mtx );
2118 start_size = local_txns.size();
2119 auto& old = local_txns.get<by_expiry>();
2120 auto ex_lo = old.lower_bound( fc::time_point_sec( 0 ) );
2121 auto ex_up = old.upper_bound( time_point::now() );
2122 old.erase( ex_lo, ex_up );
2123 g.unlock();
2124
2125 fc_dlog( logger, "expire_local_txns size ${s} removed ${r}", ("s", start_size)( "r", start_size - end_size ) );
2126 }
Here is the call graph for this function:

◆ have_block()

bool sysio::dispatch_manager::have_block ( const block_id_type & blkid) const

Definition at line 2080 of file net_plugin.cpp.

2080 {
2081 std::lock_guard<std::mutex> g(blk_state_mtx);
2082 // by_peer_block_id sorts have_block by greater so have_block == true will be the first one found
2083 const auto& index = blk_state.get<by_peer_block_id>();
2084 auto blk_itr = index.find( blkid );
2085 if( blk_itr != index.end() ) {
2086 return blk_itr->have_block;
2087 }
2088 return false;
2089 }

◆ have_txn()

bool sysio::dispatch_manager::have_txn ( const transaction_id_type & tid) const

Definition at line 2108 of file net_plugin.cpp.

2108 {
2109 std::lock_guard<std::mutex> g( local_txns_mtx );
2110 const auto tptr = local_txns.get<by_id>().find( tid );
2111 return tptr != local_txns.end();
2112 }

◆ peer_has_block()

bool sysio::dispatch_manager::peer_has_block ( const block_id_type & blkid,
uint32_t connection_id ) const

Definition at line 2074 of file net_plugin.cpp.

2074 {
2075 std::lock_guard<std::mutex> g(blk_state_mtx);
2076 const auto blk_itr = blk_state.get<by_id>().find( std::make_tuple( connection_id, std::ref( blkid )));
2077 return blk_itr != blk_state.end();
2078 }
Here is the caller graph for this function:

◆ recv_block()

void sysio::dispatch_manager::recv_block ( const connection_ptr & conn,
const block_id_type & msg,
uint32_t bnum )

Definition at line 2167 of file net_plugin.cpp.

2167 {
2168 std::unique_lock<std::mutex> g( c->conn_mtx );
2169 if (c &&
2170 c->last_req &&
2171 c->last_req->req_blocks.mode != none &&
2172 !c->last_req->req_blocks.ids.empty() &&
2173 c->last_req->req_blocks.ids.back() == id) {
2174 peer_dlog( c, "resetting last_req" );
2175 c->last_req.reset();
2176 }
2177 g.unlock();
2178
2179 peer_dlog(c, "canceling wait");
2180 c->cancel_wait();
2181 }

◆ recv_notice()

void sysio::dispatch_manager::recv_notice ( const connection_ptr & conn,
const notice_message & msg,
bool generated )

Definition at line 2213 of file net_plugin.cpp.

2213 {
2214 if (msg.known_trx.mode == normal) {
2215 } else if (msg.known_trx.mode != none) {
2216 peer_elog( c, "passed a notice_message with something other than a normal on none known_trx" );
2217 return;
2218 }
2219 if (msg.known_blocks.mode == normal) {
2220 // known_blocks.ids is never > 1
2221 if( !msg.known_blocks.ids.empty() ) {
2222 if( msg.known_blocks.pending == 1 ) { // block id notify of 2.0.0, ignore
2223 return;
2224 }
2225 }
2226 } else if (msg.known_blocks.mode != none) {
2227 peer_elog( c, "passed a notice_message with something other than a normal on none known_blocks" );
2228 return;
2229 }
2230 }
@ normal
Definition protocol.hpp:96
#define peer_elog(PEER, FORMAT,...)

◆ rejected_block()

void sysio::dispatch_manager::rejected_block ( const block_id_type & id)

Definition at line 2183 of file net_plugin.cpp.

2183 {
2184 fc_dlog( logger, "rejected block ${id}", ("id", id) );
2185 }

◆ rejected_transaction()

void sysio::dispatch_manager::rejected_transaction ( const packed_transaction_ptr & trx,
uint32_t head_blk_num )

Definition at line 2207 of file net_plugin.cpp.

2207 {
2208 fc_dlog( logger, "not sending rejected transaction ${tid}", ("tid", trx->id()) );
2209 // keep rejected transaction around for awhile so we don't broadcast it, don't remove from local_txns
2210 }

◆ retry_fetch()

void sysio::dispatch_manager::retry_fetch ( const connection_ptr & conn)

Definition at line 2233 of file net_plugin.cpp.

2233 {
2234 peer_dlog( c, "retry fetch" );
2235 request_message last_req;
2236 block_id_type bid;
2237 {
2238 std::lock_guard<std::mutex> g_c_conn( c->conn_mtx );
2239 if( !c->last_req ) {
2240 return;
2241 }
2242 peer_wlog( c, "failed to fetch from peer" );
2243 if( c->last_req->req_blocks.mode == normal && !c->last_req->req_blocks.ids.empty() ) {
2244 bid = c->last_req->req_blocks.ids.back();
2245 } else {
2246 peer_wlog( c, "no retry, block mpde = ${b} trx mode = ${t}",
2247 ("b", modes_str( c->last_req->req_blocks.mode ))( "t", modes_str( c->last_req->req_trx.mode ) ) );
2248 return;
2249 }
2250 last_req = *c->last_req;
2251 }
2252 for_each_block_connection( [this, &c, &last_req, &bid]( auto& conn ) {
2253 if( conn == c )
2254 return true;
2255
2256 {
2257 std::lock_guard<std::mutex> guard( conn->conn_mtx );
2258 if( conn->last_req ) {
2259 return true;
2260 }
2261 }
2262
2263 bool sendit = peer_has_block( bid, conn->connection_id );
2264 if( sendit ) {
2265 conn->strand.post( [conn, last_req{std::move(last_req)}]() {
2266 conn->enqueue( last_req );
2267 conn->fetch_wait();
2268 std::lock_guard<std::mutex> g_conn_conn( conn->conn_mtx );
2269 conn->last_req = last_req;
2270 } );
2271 return false;
2272 }
2273 return true;
2274 } );
2275
2276 // at this point no other peer has it, re-request or do nothing?
2277 peer_wlog( c, "no peer has last_req" );
2278 if( c->connected() ) {
2279 c->enqueue( last_req );
2280 c->fetch_wait();
2281 }
2282 }
bool peer_has_block(const block_id_type &blkid, uint32_t connection_id) const
constexpr auto modes_str(id_list_modes m)
Definition protocol.hpp:99
#define peer_wlog(PEER, FORMAT,...)
Here is the call graph for this function:

Member Data Documentation

◆ strand

boost::asio::io_context::strand sysio::dispatch_manager::strand

Definition at line 172 of file net_plugin.cpp.


The documentation for this class was generated from the following file: