Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
sysio::sync_manager Class Reference

Public Member Functions

 sync_manager (uint32_t span)
 
bool syncing_with_peer () const
 
void sync_reset_lib_num (const connection_ptr &conn, bool closing)
 
void sync_reassign_fetch (const connection_ptr &c, go_away_reason reason)
 
void rejected_block (const connection_ptr &c, uint32_t blk_num)
 
void sync_recv_block (const connection_ptr &c, const block_id_type &blk_id, uint32_t blk_num, bool blk_applied)
 
void sync_update_expected (const connection_ptr &c, const block_id_type &blk_id, uint32_t blk_num, bool blk_applied)
 
void recv_handshake (const connection_ptr &c, const handshake_message &msg)
 
void sync_recv_notice (const connection_ptr &c, const notice_message &msg)
 
std::unique_lock< std::mutex > locked_sync_mutex ()
 
void reset_last_requested_num (const std::unique_lock< std::mutex > &lock)
 

Static Public Member Functions

static void send_handshakes ()
 

Detailed Description

Definition at line 119 of file net_plugin.cpp.

Constructor & Destructor Documentation

◆ sync_manager()

sysio::sync_manager::sync_manager ( uint32_t span)
explicit

Definition at line 1537 of file net_plugin.cpp.

1538 :sync_known_lib_num( 0 )
1539 ,sync_last_requested_num( 0 )
1540 ,sync_next_expected_num( 1 )
1541 ,sync_req_span( req_span )
1542 ,sync_source()
1543 ,sync_state(in_sync)
1544 {
1545 }

Member Function Documentation

◆ locked_sync_mutex()

std::unique_lock< std::mutex > sysio::sync_manager::locked_sync_mutex ( )
inline

Definition at line 157 of file net_plugin.cpp.

157 {
158 return std::unique_lock<std::mutex>(sync_mtx);
159 }

◆ recv_handshake()

void sysio::sync_manager::recv_handshake ( const connection_ptr & c,
const handshake_message & msg )

Definition at line 1774 of file net_plugin.cpp.

1774 {
1775
1776 if( c->is_transactions_only_connection() ) return;
1777
1778 uint32_t lib_num = 0;
1779 uint32_t peer_lib = msg.last_irreversible_block_num;
1780 uint32_t head = 0;
1781 block_id_type head_id;
1782 std::tie( lib_num, std::ignore, head,
1783 std::ignore, std::ignore, head_id ) = my_impl->get_chain_info();
1784
1785 sync_reset_lib_num(c, false);
1786
1787 auto current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
1788 int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds
1789 if( network_latency_ns < 0 ) {
1790 peer_wlog(c, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000));
1791 network_latency_ns = 0;
1792 }
1793 // number of blocks syncing node is behind from a peer node
1794 uint32_t nblk_behind_by_net_latency = static_cast<uint32_t>(network_latency_ns / block_interval_ns);
1795 // 2x for time it takes for message to reach back to peer node, +1 to compensate for integer division truncation
1796 uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency + 1;
1797 // message in the log below is used in p2p_high_latency_test.py test
1798 peer_dlog(c, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
1799 ("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));
1800
1801 //--------------------------------
1802 // sync need checks; (lib == last irreversible block)
1803 //
1804 // 0. my head block id == peer head id means we are all caught up block wise
1805 // 1. my head block num < peer lib - start sync locally
1806 // 2. my lib > peer head num + nblk_combined_latency - send last_irr_catch_up notice if not the first generation
1807 //
1808 // 3 my head block num + nblk_combined_latency < peer head block num - update sync state and send a catchup request
1809 // 4 my head block num >= peer block num + nblk_combined_latency send a notice catchup if this is not the first generation
1810 // 4.1 if peer appears to be on a different fork ( our_id_for( msg.head_num ) != msg.head_id )
1811 // then request peer's blocks
1812 //
1813 //-----------------------------
1814
1815 if (head_id == msg.head_id) {
1816 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 0",
1817 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1818 c->syncing = false;
1819 notice_message note;
1820 note.known_blocks.mode = none;
1821 note.known_trx.mode = catch_up;
1822 note.known_trx.pending = 0;
1823 c->enqueue( note );
1824 return;
1825 }
1826 if (head < peer_lib) {
1827 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 1",
1828 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1829 c->syncing = false;
1830 if (c->sent_handshake_count > 0) {
1831 c->send_handshake();
1832 }
1833 return;
1834 }
1835 if (lib_num > msg.head_num + nblk_combined_latency) {
1836 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 2",
1837 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1838 if (msg.generation > 1 || c->protocol_version > proto_base) {
1839 notice_message note;
1840 note.known_trx.pending = lib_num;
1841 note.known_trx.mode = last_irr_catch_up;
1842 note.known_blocks.mode = last_irr_catch_up;
1843 note.known_blocks.pending = head;
1844 c->enqueue( note );
1845 }
1846 c->syncing = true;
1847 return;
1848 }
1849
1850 if (head + nblk_combined_latency < msg.head_num ) {
1851 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 3",
1852 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1853 c->syncing = false;
1854 verify_catchup(c, msg.head_num, msg.head_id);
1855 return;
1856 } else if(head >= msg.head_num + nblk_combined_latency) {
1857 peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 4",
1858 ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
1859 if (msg.generation > 1 || c->protocol_version > proto_base) {
1860 notice_message note;
1861 note.known_trx.mode = none;
1862 note.known_blocks.mode = catch_up;
1863 note.known_blocks.pending = head;
1864 note.known_blocks.ids.push_back(head_id);
1865 c->enqueue( note );
1866 }
1867 c->syncing = false;
1868 app().post( priority::medium, [chain_plug = my_impl->chain_plug, c,
1869 msg_head_num = msg.head_num, msg_head_id = msg.head_id]() {
1870 bool on_fork = true;
1871 try {
1872 controller& cc = chain_plug->chain();
1873 on_fork = cc.get_block_id_for_num( msg_head_num ) != msg_head_id;
1874 } catch( ... ) {}
1875 if( on_fork ) {
1876 c->strand.post( [c]() {
1877 request_message req;
1878 req.req_blocks.mode = catch_up;
1879 req.req_trx.mode = none;
1880 c->enqueue( req );
1881 } );
1882 }
1883 } );
1884 return;
1885 } else {
1886 peer_dlog( c, "Block discrepancy is within network latency range.");
1887 }
1888 }
auto post(int priority, Func &&func)
chain_plugin * chain_plug
std::tuple< uint32_t, uint32_t, uint32_t, block_id_type, block_id_type, block_id_type > get_chain_info() const
void sync_reset_lib_num(const connection_ptr &conn, bool closing)
application & app()
constexpr uint16_t proto_base
@ last_irr_catch_up
Definition protocol.hpp:95
@ catch_up
Definition protocol.hpp:94
#define peer_dlog(PEER, FORMAT,...)
#define peer_ilog(PEER, FORMAT,...)
#define peer_wlog(PEER, FORMAT,...)
signed __int64 int64_t
Definition stdint.h:135
unsigned int uint32_t
Definition stdint.h:126
static constexpr int medium
Here is the call graph for this function:

◆ rejected_block()

void sysio::sync_manager::rejected_block ( const connection_ptr & c,
uint32_t blk_num )

Definition at line 1966 of file net_plugin.cpp.

1966 {
1967 c->block_status_monitor_.rejected();
1968 std::unique_lock<std::mutex> g( sync_mtx );
1970 if( c->block_status_monitor_.max_events_violated()) {
1971 peer_wlog( c, "block ${bn} not accepted, closing connection", ("bn", blk_num) );
1972 sync_source.reset();
1973 g.unlock();
1974 c->close();
1975 } else {
1976 g.unlock();
1977 c->send_handshake();
1978 }
1979 }
void reset_last_requested_num(const std::unique_lock< std::mutex > &lock)
Here is the call graph for this function:

◆ reset_last_requested_num()

void sysio::sync_manager::reset_last_requested_num ( const std::unique_lock< std::mutex > & lock)
inline

Definition at line 160 of file net_plugin.cpp.

160 {
161 sync_last_requested_num = 0;
162 }
Here is the caller graph for this function:

◆ send_handshakes()

void sysio::sync_manager::send_handshakes ( )
static

Definition at line 1711 of file net_plugin.cpp.

1711 {
1712 for_each_connection( []( auto& ci ) {
1713 if( ci->current() ) {
1714 ci->send_handshake();
1715 }
1716 return true;
1717 } );
1718 }
void for_each_connection(Function f)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sync_reassign_fetch()

void sysio::sync_manager::sync_reassign_fetch ( const connection_ptr & c,
go_away_reason reason )

Definition at line 1761 of file net_plugin.cpp.

1761 {
1762 std::unique_lock<std::mutex> g( sync_mtx );
1763 peer_ilog( c, "reassign_fetch, our last req is ${cc}, next expected is ${ne}",
1764 ("cc", sync_last_requested_num)("ne", sync_next_expected_num) );
1765
1766 if( c == sync_source ) {
1767 c->cancel_sync(reason);
1769 request_next_chunk( std::move(g) );
1770 }
1771 }
Here is the call graph for this function:

◆ sync_recv_block()

void sysio::sync_manager::sync_recv_block ( const connection_ptr & c,
const block_id_type & blk_id,
uint32_t blk_num,
bool blk_applied )

Definition at line 1998 of file net_plugin.cpp.

1998 {
1999 peer_dlog( c, "got block ${bn}", ("bn", blk_num) );
2000 if( app().is_quiting() ) {
2001 c->close( false, true );
2002 return;
2003 }
2004 c->block_status_monitor_.accepted();
2005 sync_update_expected( c, blk_id, blk_num, blk_applied );
2006 std::unique_lock<std::mutex> g_sync( sync_mtx );
2007 stages state = sync_state;
2008 peer_dlog( c, "state ${s}", ("s", stage_str( state )) );
2009 if( state == head_catchup ) {
2010 peer_dlog( c, "sync_manager in head_catchup state" );
2011 sync_source.reset();
2012 g_sync.unlock();
2013
2014 block_id_type null_id;
2015 bool set_state_to_head_catchup = false;
2016 for_each_block_connection( [&null_id, blk_num, &blk_id, &c, &set_state_to_head_catchup]( const auto& cp ) {
2017 std::unique_lock<std::mutex> g_cp_conn( cp->conn_mtx );
2018 uint32_t fork_head_num = cp->fork_head_num;
2019 block_id_type fork_head_id = cp->fork_head;
2020 g_cp_conn.unlock();
2021 if( fork_head_id == null_id ) {
2022 // continue
2023 } else if( fork_head_num < blk_num || fork_head_id == blk_id ) {
2024 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
2025 c->fork_head = null_id;
2026 c->fork_head_num = 0;
2027 } else {
2028 set_state_to_head_catchup = true;
2029 }
2030 return true;
2031 } );
2032
2033 if( set_state_to_head_catchup ) {
2034 if( set_state( head_catchup ) ) {
2036 }
2037 } else {
2038 set_state( in_sync );
2040 }
2041 } else if( state == lib_catchup ) {
2042 if( blk_num >= sync_known_lib_num ) {
2043 peer_dlog( c, "All caught up with last known last irreversible block resending handshake" );
2044 set_state( in_sync );
2045 g_sync.unlock();
2047 } else if( blk_num >= sync_last_requested_num ) {
2048 request_next_chunk( std::move( g_sync) );
2049 } else {
2050 g_sync.unlock();
2051 peer_dlog( c, "calling sync_wait" );
2052 c->sync_wait();
2053 }
2054 }
2055 }
void sync_update_expected(const connection_ptr &c, const block_id_type &blk_id, uint32_t blk_num, bool blk_applied)
static void send_handshakes()
void for_each_block_connection(Function f)
Here is the call graph for this function:

◆ sync_recv_notice()

void sysio::sync_manager::sync_recv_notice ( const connection_ptr & c,
const notice_message & msg )

Definition at line 1937 of file net_plugin.cpp.

1937 {
1938 peer_dlog( c, "sync_manager got ${m} block notice", ("m", modes_str( msg.known_blocks.mode )) );
1939 SYS_ASSERT( msg.known_blocks.mode == catch_up || msg.known_blocks.mode == last_irr_catch_up, plugin_exception,
1940 "sync_recv_notice only called on catch_up" );
1941 if (msg.known_blocks.mode == catch_up) {
1942 if (msg.known_blocks.ids.size() == 0) {
1943 peer_elog( c, "got a catch up with ids size = 0" );
1944 } else {
1945 const block_id_type& id = msg.known_blocks.ids.back();
1946 peer_ilog( c, "notice_message, pending ${p}, blk_num ${n}, id ${id}...",
1947 ("p", msg.known_blocks.pending)("n", block_header::num_from_id(id))("id",id.str().substr(8,16)) );
1948 if( !my_impl->dispatcher->have_block( id ) ) {
1949 verify_catchup( c, msg.known_blocks.pending, id );
1950 } else {
1951 // we already have the block, so update peer with our view of the world
1952 c->send_handshake();
1953 }
1954 }
1955 } else if (msg.known_blocks.mode == last_irr_catch_up) {
1956 {
1957 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1958 c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
1959 }
1960 sync_reset_lib_num(c, false);
1961 start_sync(c, msg.known_trx.pending);
1962 }
1963 }
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
unique_ptr< dispatch_manager > dispatcher
constexpr auto modes_str(id_list_modes m)
Definition protocol.hpp:99
#define peer_elog(PEER, FORMAT,...)
static uint32_t num_from_id(const block_id_type &id)
Here is the call graph for this function:

◆ sync_reset_lib_num()

void sysio::sync_manager::sync_reset_lib_num ( const connection_ptr & conn,
bool closing )

Definition at line 1566 of file net_plugin.cpp.

1566 {
1567 std::unique_lock<std::mutex> g( sync_mtx );
1568 if( sync_state == in_sync ) {
1569 sync_source.reset();
1570 }
1571 if( !c ) return;
1572 if( !closing ) {
1573 std::lock_guard<std::mutex> g_conn( c->conn_mtx );
1574 if( c->last_handshake_recv.last_irreversible_block_num > sync_known_lib_num ) {
1575 sync_known_lib_num = c->last_handshake_recv.last_irreversible_block_num;
1576 }
1577 } else {
1578 // Closing connection, therefore its view of LIB can no longer be considered as we will no longer be connected.
1579 // Determine current LIB of remaining peers as our sync_known_lib_num.
1580 uint32_t highest_lib_num = 0;
1581 for_each_block_connection( [&highest_lib_num]( const auto& cc ) {
1582 std::lock_guard<std::mutex> g_conn( cc->conn_mtx );
1583 if( cc->current() && cc->last_handshake_recv.last_irreversible_block_num > highest_lib_num ) {
1584 highest_lib_num = cc->last_handshake_recv.last_irreversible_block_num;
1585 }
1586 return true;
1587 } );
1588 sync_known_lib_num = highest_lib_num;
1589
1590 // if closing the connection we are currently syncing from, then reset our last requested and next expected.
1591 if( c == sync_source ) {
1593 // if starting to sync need to always start from lib as we might be on our own fork
1594 uint32_t lib_num = 0;
1595 std::tie( lib_num, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore ) = my_impl->get_chain_info();
1596 sync_next_expected_num = lib_num + 1;
1597 request_next_chunk( std::move(g) );
1598 }
1599 }
1600 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ sync_update_expected()

void sysio::sync_manager::sync_update_expected ( const connection_ptr & c,
const block_id_type & blk_id,
uint32_t blk_num,
bool blk_applied )

Definition at line 1982 of file net_plugin.cpp.

1982 {
1983 std::unique_lock<std::mutex> g_sync( sync_mtx );
1984 if( blk_num <= sync_last_requested_num ) {
1985 peer_dlog( c, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}",
1986 ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) );
1987 if (blk_num != sync_next_expected_num && !blk_applied) {
1988 auto sync_next_expected = sync_next_expected_num;
1989 g_sync.unlock();
1990 peer_dlog( c, "expected block ${ne} but got ${bn}", ("ne", sync_next_expected)("bn", blk_num) );
1991 return;
1992 }
1993 sync_next_expected_num = blk_num + 1;
1994 }
1995 }
Here is the caller graph for this function:

◆ syncing_with_peer()

bool sysio::sync_manager::syncing_with_peer ( ) const
inline

Definition at line 149 of file net_plugin.cpp.

149{ return sync_state == lib_catchup; }

The documentation for this class was generated from the following file: