Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
ntp.cpp
Go to the documentation of this file.
1#include <fc/network/ntp.hpp>
4#include <fc/network/ip.hpp>
5#include <fc/thread/thread.hpp>
6
7#include <stdint.h>
8#include "../byteswap.hpp"
9
10#include <atomic>
11#include <array>
12
13namespace fc
14{
15 namespace detail {
16 using boost::fibers::future;
17
19 {
20 public:
22 fc::thread _ntp_thread;
23 std::vector< std::pair< std::string, uint16_t> > _ntp_hosts;
24 future<void> _read_loop_done;
29
31 std::atomic<int64_t> _last_ntp_delta_microseconds;
32
34
35 std::shared_ptr<scheduled_task_impl<void>> _scheduled_request_time;
36
38 _ntp_thread("ntp"),
39 _request_interval_sec( 60*60 /* 1 hr */),
42 {
44 _ntp_hosts.push_back( std::make_pair( "pool.ntp.org",123 ) );
45 }
46
48 {
49 _sock.close();
52 _scheduled_request_time->get_future().wait();
54 }
55 }
56
58 {
59 uint64_t ntp_timestamp_host = bswap_64(ntp_timestamp_net_order);
60 uint32_t fractional_seconds = ntp_timestamp_host & 0xffffffff;
61 uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (uint64_t(1) << 31)) >> 32);
62 uint32_t seconds_since_1900 = ntp_timestamp_host >> 32;
63 uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800;
64 return fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds);
65 }
66
68 {
69 uint64_t microseconds_since_epoch = (uint64_t)fc_timestamp.time_since_epoch().count();
70 uint32_t seconds_since_epoch = (uint32_t)(microseconds_since_epoch / 1000000);
71 uint32_t seconds_since_1900 = seconds_since_epoch + 2208988800;
72 uint32_t microseconds = microseconds_since_epoch % 1000000;
73 uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (uint64_t(1) << 31)) / 1000000);
74 uint64_t ntp_timestamp_net_order = ((uint64_t)seconds_since_1900 << 32) + fractional_seconds;
75 return bswap_64(ntp_timestamp_net_order);
76 }
77
79 {
80 FC_ASSERT(_ntp_thread.is_current());
81 for( auto item : _ntp_hosts )
82 {
83 try
84 {
85 wlog( "resolving... ${r}", ("r", item) );
86 auto eps = resolve( item.first, item.second );
87 for( auto ep : eps )
88 {
89 wlog( "sending request to ${ep}", ("ep",ep) );
90 std::shared_ptr<char> send_buffer(new char[48], [](char* p){ delete[] p; });
91 std::array<unsigned char, 48> packet_to_send { {010,0,0,0,0,0,0,0,0} };
92 memcpy(send_buffer.get(), packet_to_send.data(), packet_to_send.size());
93 uint64_t* send_buf_as_64_array = (uint64_t*)send_buffer.get();
94 send_buf_as_64_array[5] = fc_time_point_to_ntp_timestamp(fc::time_point::now()); // 5 = Transmit Timestamp
95 _sock.send_to(send_buffer, packet_to_send.size(), ep);
96 break;
97 }
98 }
99 catch (const fc::canceled_exception&)
100 {
101 throw;
102 }
103 catch ( const std::bad_alloc& )
104 {
105 throw;
106 }
107 catch ( const boost::interprocess::bad_alloc& )
108 {
109 throw;
110 }
111 // this could fail to resolve but we want to go on to other hosts..
112 catch ( const fc::exception& e )
113 {
114 elog( "${e}", ("e",e.to_detail_string() ) );
115 }
116 catch ( const std::exception& e )
117 {
118 elog( "${e}", ("e",e.what() ) );
119 }
120 }
121 } // request_now
122
123 // started for first time in ntp() constructor, canceled in ~ntp() destructor
124 // this task gets invoked every _retry_failed_request_interval_sec (currently 5 min), and if
125 // _request_interval_sec (currently 1 hour) has passed since the last successful update,
126 // it sends a new request
128 {
129 request_now();
130 } // request_loop
131
133 {
134 _read_loop_done = _ntp_thread.async( [this](){ read_loop(); } );
135 }
136
138 {
139 FC_ASSERT(_ntp_thread.is_current());
140
141 uint32_t receive_buffer_size = sizeof(uint64_t) * 1024;
142 std::shared_ptr<char> receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; });
143 uint64_t* recv_buf = (uint64_t*)receive_buffer.get();
144
145 // if you start the read while loop here, the recieve_from call will throw "invalid argument" on win32,
146 // so instead we start the loop after making our first request
147 _sock.open();
148 _request_time_task_done = fc::async( [&](){ request_time_task(); } );
149
150 while( true )
151 {
152 fc::ip::endpoint from;
153 try
154 {
155 _sock.receive_from( receive_buffer, receive_buffer_size, from );
156 // wlog("received ntp reply from ${from}",("from",from) );
157 } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket");
158
159 fc::time_point receive_time = fc::time_point::now();
160 fc::time_point origin_time = ntp_timestamp_to_fc_time_point(recv_buf[3]);
161 fc::time_point server_receive_time = ntp_timestamp_to_fc_time_point(recv_buf[4]);
162 fc::time_point server_transmit_time = ntp_timestamp_to_fc_time_point(recv_buf[5]);
163
164 fc::microseconds offset(((server_receive_time - origin_time) +
165 (server_transmit_time - receive_time)).count() / 2);
166 fc::microseconds round_trip_delay((receive_time - origin_time) -
167 (server_transmit_time - server_receive_time));
168 //wlog("origin_time = ${origin_time}, server_receive_time = ${server_receive_time}, server_transmit_time = ${server_transmit_time}, receive_time = ${receive_time}",
169 // ("origin_time", origin_time)("server_receive_time", server_receive_time)("server_transmit_time", server_transmit_time)("receive_time", receive_time));
170 // wlog("ntp offset: ${offset}, round_trip_delay ${delay}", ("offset", offset)("delay", round_trip_delay));
171
172 //if the reply we just received has occurred more than a second after our last time request (it was more than a second ago since our last request)
173 if( round_trip_delay > fc::microseconds(300000) )
174 {
175 wlog("received stale ntp reply requested at ${request_time}, send a new time request", ("request_time", origin_time));
176 request_now(); //request another reply and ignore this one
177 }
178 else //we think we have a timely reply, process it
179 {
180 if( offset < fc::seconds(60*60*24) && offset > fc::seconds(-60*60*24) )
181 {
186 wlog("ntp_delta_time updated to ${delta_time} us", ("delta_time",ntp_delta_time) );
187 }
188 else
189 elog( "NTP time and local time vary by more than a day! ntp:${ntp_time} local:${local}",
190 ("ntp_time", receive_time + offset)("local", fc::time_point::now()) );
191 }
192 }
193 wlog("exiting ntp read_loop");
194 } //end read_loop()
195
196 void reschedule() {
198 _scheduled_request_time->cancel();
199
201 [&](){
202 request_now();
203 reschedule();
204 },
206 }
207 }; //ntp_impl
208
209 } // namespace detail
210
211
212
213 ntp::ntp()
214 :my( new detail::ntp_impl() )
215 {
216 my->start_read_loop();
217 }
218
219 ntp::~ntp()
220 {
221 ilog( "shutting down ntp" );
222 my->_ntp_thread.async([=](){
223 my->_sock.close();
224 if( my->_scheduled_request_time ) {
225 ilog( "wait cancel scheduled request " );
226 my->_scheduled_request_time->cancel();
227 my->_scheduled_request_time->get_future().wait();
228 my->_scheduled_request_time.reset();
229 }
230 ilog( "wait request time task " );
231 my->_request_time_task_done.wait();
232 ilog( "wait read loop " );
233 my->_read_loop_done.wait();
234 }).wait();
235 my->_ntp_thread.quit();
236 ilog( "joining ntp" );
237 my->_ntp_thread.join();
238 }
239
240
241 void ntp::add_server( const std::string& hostname, uint16_t port)
242 {
243 my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }).wait();
244 }
245
246 void ntp::set_request_interval( uint32_t interval_sec )
247 {
248 my->_request_interval_sec = interval_sec;
249 my->_retry_failed_request_interval_sec = std::min(my->_retry_failed_request_interval_sec, interval_sec);
250 my->reschedule();
251 }
252
253 void ntp::request_now()
254 {
255 my->_ntp_thread.async( [=](){ my->request_now(); } ).get();
256 }
257
258 std::optional<time_point> ntp::get_time()const
259 {
260 if( my->_last_ntp_delta_initialized )
261 return fc::time_point::now() + fc::microseconds(my->_last_ntp_delta_microseconds);
262 return std::optional<time_point>();
263 }
264
265} //namespace fc
const mie::Vuint & p
Definition bn.cpp:27
uint32_t _request_interval_sec
Definition ntp.cpp:26
std::atomic< int64_t > _last_ntp_delta_microseconds
Definition ntp.cpp:31
future< void > _request_time_task_done
Definition ntp.cpp:33
udp_socket _sock
Definition ntp.cpp:25
fc::time_point ntp_timestamp_to_fc_time_point(uint64_t ntp_timestamp_net_order)
Definition ntp.cpp:57
std::vector< std::pair< std::string, uint16_t > > _ntp_hosts
Definition ntp.cpp:23
fc::time_point _last_valid_ntp_reply_received_time
Definition ntp.cpp:28
std::atomic_bool _last_ntp_delta_initialized
Definition ntp.cpp:30
void start_read_loop()
Definition ntp.cpp:132
future< void > _read_loop_done
Definition ntp.cpp:24
uint32_t _retry_failed_request_interval_sec
Definition ntp.cpp:27
void request_now()
Definition ntp.cpp:78
void request_time_task()
Definition ntp.cpp:127
uint64_t fc_time_point_to_ntp_timestamp(const fc::time_point &fc_timestamp)
Definition ntp.cpp:67
fc::thread _ntp_thread
Definition ntp.cpp:22
std::shared_ptr< scheduled_task_impl< void > > _scheduled_request_time
Definition ntp.cpp:35
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_detail_string(log_level ll=log_level::all) const
const char * what() const noexcept override
constexpr int64_t count() const
Definition time.hpp:26
static time_point now()
Definition time.cpp:14
constexpr const microseconds & time_since_epoch() const
Definition time.hpp:52
void send_to(const char *b, size_t l, boost::asio::ip::udp::endpoint &to)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
#define FC_RETHROW_EXCEPTIONS(LOG_LEVEL, FORMAT,...)
Catchs all exception's, std::exceptions, and ... and rethrows them after appending the provided log m...
int * count
#define wlog(FORMAT,...)
Definition logger.hpp:124
#define ilog(FORMAT,...)
Definition logger.hpp:118
#define elog(FORMAT,...)
Definition logger.hpp:130
namespace sysio::chain
Definition authority.cpp:3
constexpr microseconds seconds(int64_t s)
Definition time.hpp:32
std::vector< boost::asio::ip::udp::endpoint > resolve(boost::asio::io_service &io_service, const std::string &host, uint16_t port)
Definition resolve.cpp:6
unsigned short uint16_t
Definition stdint.h:125
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
void wait()
memcpy((char *) pInfo->slotDescription, s, l)