Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
state_history_plugin.cpp
Go to the documentation of this file.
9
10#include <boost/asio/bind_executor.hpp>
11#include <boost/asio/ip/host_name.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/asio/strand.hpp>
14#include <boost/beast/core.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/signals2/connection.hpp>
17
18using tcp = boost::asio::ip::tcp;
19namespace ws = boost::beast::websocket;
20
21extern const char* const state_history_plugin_abi;
22
23namespace sysio {
24using namespace chain;
25using namespace state_history;
26using boost::signals2::scoped_connection;
27
28static appbase::abstract_plugin& _state_history_plugin = app().register_plugin<state_history_plugin>();
29
30template <typename F>
32 try {
33 return f();
34 } catch (const fc::exception& e) {
35 elog("${e}", ("e", e.to_detail_string()));
36 } catch (const std::exception& e) {
37 elog("${e}", ("e", e.what()));
38 } catch (...) {
39 elog("unknown exception");
40 }
41}
42
43struct state_history_plugin_impl : std::enable_shared_from_this<state_history_plugin_impl> {
45 std::optional<state_history_log> trace_log;
46 std::optional<state_history_log> chain_state_log;
47 bool trace_debug_mode = false;
48 bool stopping = false;
49 std::optional<scoped_connection> applied_transaction_connection;
50 std::optional<scoped_connection> block_start_connection;
51 std::optional<scoped_connection> accepted_block_connection;
52 string endpoint_address = "0.0.0.0";
54 std::unique_ptr<tcp::acceptor> acceptor;
56
57 void get_log_entry(state_history_log& log, uint32_t block_num, std::optional<bytes>& result) {
58 if (block_num < log.begin_block() || block_num >= log.end_block())
59 return;
61 auto& stream = log.get_entry(block_num, header);
62 uint32_t s;
63 // Compressed deltas now exceeds 4GB on one of the public chains. This length prefix
64 // was intended to support adding additional fields in the future after the
65 // packed deltas or packed traces. For now we're going to ignore on read.
66 stream.read((char*)&s, sizeof(s));
67 uint64_t s2 = header.payload_size - sizeof(s);
68 bytes compressed(s2);
69 if (s2)
70 stream.read(compressed.data(), s2);
71 result = state_history::zlib_decompress(compressed);
72 }
73
74 void get_block(uint32_t block_num, const block_state_ptr& block_state, std::optional<bytes>& result) {
76 try {
77 if( block_state && block_num == block_state->block_num ) {
79 } else {
80 p = chain_plug->chain().fetch_block_by_number( block_num );
81 }
82 } catch (...) {
83 return;
84 }
85 if (p)
86 result = fc::raw::pack(*p);
87 }
88
89 std::optional<chain::block_id_type> get_block_id(uint32_t block_num) {
90 if (trace_log && block_num >= trace_log->begin_block() && block_num < trace_log->end_block())
91 return trace_log->get_block_id(block_num);
92 if (chain_state_log && block_num >= chain_state_log->begin_block() && block_num < chain_state_log->end_block())
93 return chain_state_log->get_block_id(block_num);
94 try {
95 return chain_plug->chain().get_block_id_for_num(block_num);
96 } catch (...) {}
97 return {};
98 }
99
100 struct session : std::enable_shared_from_this<session> {
101 std::shared_ptr<state_history_plugin_impl> plugin;
102 std::unique_ptr<ws::stream<tcp::socket>> socket_stream;
103 bool sending = false;
104 bool sent_abi = false;
105 std::vector<std::vector<char>> send_queue;
106 std::optional<get_blocks_request_v0> current_request;
108
109 session(std::shared_ptr<state_history_plugin_impl> plugin)
110 : plugin(std::move(plugin)) {}
111
112 void start(tcp::socket socket) {
113 ilog("incoming connection");
114 socket_stream = std::make_unique<ws::stream<tcp::socket>>(std::move(socket));
115 socket_stream->binary(true);
116 socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(true));
117 socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
118 socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024));
119 socket_stream->async_accept([self = shared_from_this()](boost::system::error_code ec) {
120 self->callback(ec, "async_accept", [self] {
121 self->start_read();
123 });
124 });
125 }
126
127 void start_read() {
128 auto in_buffer = std::make_shared<boost::beast::flat_buffer>();
129 socket_stream->async_read(
130 *in_buffer, [self = shared_from_this(), in_buffer](boost::system::error_code ec, size_t) {
131 self->callback(ec, "async_read", [self, in_buffer] {
132 auto d = boost::asio::buffer_cast<char const*>(boost::beast::buffers_front(in_buffer->data()));
133 auto s = boost::asio::buffer_size(in_buffer->data());
135 state_request req;
136 fc::raw::unpack(ds, req);
137 std::visit(*self, req);
138 self->start_read();
139 });
140 });
141 }
142
143 void send(const char* s) {
144 send_queue.push_back({s, s + strlen(s)});
145 send();
146 }
147
148 template <typename T>
149 void send(T obj) {
150 send_queue.push_back(fc::raw::pack(state_result{std::move(obj)}));
151 send();
152 }
153
154 void send() {
155 if (sending)
156 return;
157 if (send_queue.empty())
158 return send_update();
159 sending = true;
160 socket_stream->binary(sent_abi);
161 sent_abi = true;
162 socket_stream->async_write( //
163 boost::asio::buffer(send_queue[0]), //
164 [self = shared_from_this()](boost::system::error_code ec, size_t) {
165 self->callback(ec, "async_write", [self] {
166 self->send_queue.erase(self->send_queue.begin());
167 self->sending = false;
168 self->send();
169 });
170 });
171 }
172
173 using result_type = void;
175 auto& chain = plugin->chain_plug->chain();
177 result.head = {chain.head_block_num(), chain.head_block_id()};
178 result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
179 result.chain_id = chain.get_chain_id();
180 if (plugin->trace_log) {
181 result.trace_begin_block = plugin->trace_log->begin_block();
182 result.trace_end_block = plugin->trace_log->end_block();
183 }
184 if (plugin->chain_state_log) {
185 result.chain_state_begin_block = plugin->chain_state_log->begin_block();
186 result.chain_state_end_block = plugin->chain_state_log->end_block();
187 }
188 send(std::move(result));
189 }
190
192 for (auto& cp : req.have_positions) {
193 if (req.start_block_num <= cp.block_num)
194 continue;
195 auto id = plugin->get_block_id(cp.block_num);
196 if (!id || *id != cp.block_id)
197 req.start_block_num = std::min(req.start_block_num, cp.block_num);
198 }
199 req.have_positions.clear();
200 current_request = req;
201 send_update(true);
202 }
203
205 if (!current_request)
206 return;
207 current_request->max_messages_in_flight += req.num_messages;
208 send_update();
209 }
210
212 need_to_send_update = true;
213 if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
214 return;
215 auto& chain = plugin->chain_plug->chain();
216 result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
217 uint32_t current =
218 current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;
219 if (current_request->start_block_num <= current &&
220 current_request->start_block_num < current_request->end_block_num) {
221 auto block_id = plugin->get_block_id(current_request->start_block_num);
222 if (block_id) {
223 result.this_block = block_position{current_request->start_block_num, *block_id};
224 auto prev_block_id = plugin->get_block_id(current_request->start_block_num - 1);
225 if (prev_block_id)
226 result.prev_block = block_position{current_request->start_block_num - 1, *prev_block_id};
227 if (current_request->fetch_block) {
228 plugin->get_block( current_request->start_block_num, block_state, result.block );
229 }
230 if (current_request->fetch_traces && plugin->trace_log)
231 plugin->get_log_entry(*plugin->trace_log, current_request->start_block_num, result.traces);
232 if (current_request->fetch_deltas && plugin->chain_state_log)
233 plugin->get_log_entry(*plugin->chain_state_log, current_request->start_block_num, result.deltas);
234 }
235 ++current_request->start_block_num;
236 }
237 send(std::move(result));
238 --current_request->max_messages_in_flight;
239 need_to_send_update = current_request->start_block_num <= current &&
240 current_request->start_block_num < current_request->end_block_num;
241 }
242
244 need_to_send_update = true;
245 if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
246 return;
248 result.head = {block_state->block_num, block_state->id};
249 send_update(std::move(result), block_state);
250 }
251
252 void send_update(bool changed = false) {
253 if (changed)
254 need_to_send_update = true;
255 if (!send_queue.empty() || !need_to_send_update || !current_request ||
256 !current_request->max_messages_in_flight)
257 return;
258 auto& chain = plugin->chain_plug->chain();
260 result.head = {chain.head_block_num(), chain.head_block_id()};
261 send_update(std::move(result), {});
262 }
263
264 template <typename F>
266 try {
267 f();
268 } catch (const fc::exception& e) {
269 elog("${e}", ("e", e.to_detail_string()));
270 close();
271 } catch (const std::exception& e) {
272 elog("${e}", ("e", e.what()));
273 close();
274 } catch (...) {
275 elog("unknown exception");
276 close();
277 }
278 }
279
280 template <typename F>
281 void callback(boost::system::error_code ec, const char* what, F f) {
282 app().post(priority::medium, [=]() {
283 if (plugin->stopping)
284 return;
285 if (ec)
286 return on_fail(ec, what);
288 });
289 }
290
291 void on_fail(boost::system::error_code ec, const char* what) {
292 try {
293 if (ec == boost::asio::error::eof) {
294 dlog("${w}: ${m}", ("w", what)("m", ec.message()));
295 } else {
296 elog("${w}: ${m}", ("w", what)("m", ec.message()));
297 }
298 close();
299 } catch (...) {
300 elog("uncaught exception on close");
301 }
302 }
303
304 void close() {
305 socket_stream->next_layer().close();
306 plugin->sessions.erase(this);
307 }
308 };
309 std::map<session*, std::shared_ptr<session>> sessions;
310
311 void listen() {
312 boost::system::error_code ec;
313
314 auto address = boost::asio::ip::make_address(endpoint_address);
315 auto endpoint = tcp::endpoint{address, endpoint_port};
316 acceptor = std::make_unique<tcp::acceptor>(app().get_io_service());
317
318 auto check_ec = [&](const char* what) {
319 if (!ec)
320 return;
321 elog("${w}: ${m}", ("w", what)("m", ec.message()));
322 SYS_ASSERT(false, plugin_exception, "unable to open listen socket");
323 };
324
325 acceptor->open(endpoint.protocol(), ec);
326 check_ec("open");
327 acceptor->set_option(boost::asio::socket_base::reuse_address(true));
328 acceptor->bind(endpoint, ec);
329 check_ec("bind");
330 acceptor->listen(boost::asio::socket_base::max_listen_connections, ec);
331 check_ec("listen");
332 do_accept();
333 }
334
335 void do_accept() {
336 auto socket = std::make_shared<tcp::socket>(app().get_io_service());
337 acceptor->async_accept(*socket, [self = shared_from_this(), socket, this](const boost::system::error_code& ec) {
338 if (stopping)
339 return;
340 if (ec) {
341 if (ec == boost::system::errc::too_many_files_open)
342 catch_and_log([&] { do_accept(); });
343 return;
344 }
345 catch_and_log([&] {
346 auto s = std::make_shared<session>(self);
347 sessions[s.get()] = s;
348 s->start(std::move(*socket));
349 });
350 catch_and_log([&] { do_accept(); });
351 });
352 }
353
358
360 try {
363 } catch (const fc::exception& e) {
364 elog("fc::exception: ${details}", ("details", e.to_detail_string()));
365 // Both app().quit() and exception throwing are required. Without app().quit(),
366 // the exception would be caught and drop before reaching main(). The exception is
367 // to ensure the block won't be commited.
368 appbase::app().quit();
369 SYS_THROW(
370 chain::state_history_write_exception,
371 "State history encountered an Error which it cannot recover from. Please resolve the error and relaunch "
372 "the process");
373 }
374
375 for (auto& s : sessions) {
376 auto& p = s.second;
377 if (p) {
378 if (p->current_request && block_state->block_num < p->current_request->start_block_num)
379 p->current_request->start_block_num = block_state->block_num;
380 p->send_update(block_state);
381 }
382 }
383 }
384
385 void on_block_start(uint32_t block_num) { clear_caches(); }
386
391
393 if (!trace_log)
394 return;
395 auto traces_bin = state_history::zlib_compress_bytes(
397
398 SYS_ASSERT(traces_bin.size() == (uint32_t)traces_bin.size(), plugin_exception, "traces is too big");
399
400 state_history_log_header header{.magic = ship_magic(ship_current_version, 0),
401 .block_id = block_state->id,
402 .payload_size = sizeof(uint32_t) + traces_bin.size()};
403 trace_log->write_entry(header, block_state->block->previous, [&](auto& stream) {
404 uint32_t s = (uint32_t)traces_bin.size();
405 stream.write((char*)&s, sizeof(s));
406 if (!traces_bin.empty())
407 stream.write(traces_bin.data(), traces_bin.size());
408 });
409 }
410
412 if (!chain_state_log)
413 return;
414 bool fresh = chain_state_log->begin_block() == chain_state_log->end_block();
415 if (fresh)
416 ilog("Placing initial state in block ${n}", ("n", block_state->block->block_num()));
417
418 std::vector<table_delta> deltas = state_history::create_deltas(chain_plug->chain().db(), fresh);
419 auto deltas_bin = state_history::zlib_compress_bytes(fc::raw::pack(deltas));
420 state_history_log_header header{.magic = ship_magic(ship_current_version, 0),
421 .block_id = block_state->id,
422 .payload_size = sizeof(uint32_t) + deltas_bin.size()};
423 chain_state_log->write_entry(header, block_state->block->previous, [&](auto& stream) {
424 // Compressed deltas now exceeds 4GB on one of the public chains. This length prefix
425 // was intended to support adding additional fields in the future after the
426 // packed deltas. For now we're going to ignore on read. The 0 is an attempt to signal
427 // old versions that something's not quite right.
428 uint32_t s = (uint32_t)deltas_bin.size();
429 if (s != deltas_bin.size())
430 s = 0;
431 stream.write((char*)&s, sizeof(s));
432 if (!deltas_bin.empty())
433 stream.write(deltas_bin.data(), deltas_bin.size());
434 });
435 } // store_chain_state
436}; // state_history_plugin_impl
437
440
442
443void state_history_plugin::set_program_options(options_description& cli, options_description& cfg) {
444 auto options = cfg.add_options();
445 options("state-history-dir", bpo::value<bfs::path>()->default_value("state-history"),
446 "the location of the state-history directory (absolute path or relative to application data dir)");
447 cli.add_options()("delete-state-history", bpo::bool_switch()->default_value(false), "clear state history files");
448 options("trace-history", bpo::bool_switch()->default_value(false), "enable trace history");
449 options("chain-state-history", bpo::bool_switch()->default_value(false), "enable chain state history");
450 options("state-history-endpoint", bpo::value<string>()->default_value("127.0.0.1:8080"),
451 "the endpoint upon which to listen for incoming connections. Caution: only expose this port to "
452 "your internal network.");
453 options("trace-history-debug-mode", bpo::bool_switch()->default_value(false), "enable debug mode for trace history");
454
456 options("state-history-log-retain-blocks", bpo::value<uint32_t>(), "if set, periodically prune the state history files to store only configured number of most recent blocks");
457}
458
459void state_history_plugin::plugin_initialize(const variables_map& options) {
460 try {
461 SYS_ASSERT(options.at("disable-replay-opts").as<bool>(), plugin_exception,
462 "state_history_plugin requires --disable-replay-opts");
463
464 my->chain_plug = app().find_plugin<chain_plugin>();
465 SYS_ASSERT(my->chain_plug, chain::missing_chain_plugin_exception, "");
466 auto& chain = my->chain_plug->chain();
467 my->applied_transaction_connection.emplace(
468 chain.applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> t) {
469 my->on_applied_transaction(std::get<0>(t), std::get<1>(t));
470 }));
471 my->accepted_block_connection.emplace(
472 chain.accepted_block.connect([&](const block_state_ptr& p) { my->on_accepted_block(p); }));
473 my->block_start_connection.emplace(
474 chain.block_start.connect([&](uint32_t block_num) { my->on_block_start(block_num); }));
475
476 auto dir_option = options.at("state-history-dir").as<bfs::path>();
477 boost::filesystem::path state_history_dir;
478 if (dir_option.is_relative())
479 state_history_dir = app().data_dir() / dir_option;
480 else
481 state_history_dir = dir_option;
482 if (auto resmon_plugin = app().find_plugin<resource_monitor_plugin>())
483 resmon_plugin->monitor_directory(state_history_dir);
484
485 auto ip_port = options.at("state-history-endpoint").as<string>();
486 auto port = ip_port.substr(ip_port.find(':') + 1, ip_port.size());
487 auto host = ip_port.substr(0, ip_port.find(':'));
488 my->endpoint_address = host;
489 my->endpoint_port = std::stoi(port);
490 idump((ip_port)(host)(port));
491
492 if (options.at("delete-state-history").as<bool>()) {
493 ilog("Deleting state history");
494 boost::filesystem::remove_all(state_history_dir);
495 }
496 boost::filesystem::create_directories(state_history_dir);
497
498 if (options.at("trace-history-debug-mode").as<bool>()) {
499 my->trace_debug_mode = true;
500 }
501
502 std::optional<state_history_log_prune_config> ship_log_prune_conf;
503 if (options.count("state-history-log-retain-blocks")) {
504 ship_log_prune_conf.emplace();
505 ship_log_prune_conf->prune_blocks = options.at("state-history-log-retain-blocks").as<uint32_t>();
506 //the arbitrary limit of 1000 here is mainly so that there is enough buffer for newly applied forks to be delivered to clients
507 // before getting pruned out. ideally pruning would have been smart enough to know not to prune reversible blocks
508 SYS_ASSERT(ship_log_prune_conf->prune_blocks >= 1000, plugin_exception, "state-history-log-retain-blocks must be 1000 blocks or greater");
509 }
510
511 if (options.at("trace-history").as<bool>())
512 my->trace_log.emplace("trace_history", (state_history_dir / "trace_history.log").string(),
513 (state_history_dir / "trace_history.index").string(), ship_log_prune_conf);
514 if (options.at("chain-state-history").as<bool>())
515 my->chain_state_log.emplace("chain_state_history", (state_history_dir / "chain_state_history.log").string(),
516 (state_history_dir / "chain_state_history.index").string(), ship_log_prune_conf);
517 }
519} // state_history_plugin::plugin_initialize
520
522
524 my->applied_transaction_connection.reset();
525 my->accepted_block_connection.reset();
526 my->block_start_connection.reset();
527 while (!my->sessions.empty())
528 my->sessions.begin()->second->close();
529 my->stopping = true;
530}
531
532} // namespace sysio
std::string ws(int const level)
const char *const state_history_plugin_abi
const mie::Vuint & p
Definition bn.cpp:27
#define SYS_THROW(exc_type, FORMAT,...)
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
abstract_plugin * find_plugin(const string &name) const
bfs::path data_dir() const
Get data directory.
auto post(int priority, Func &&func)
static bool supports_hole_punching()
Definition cfile.hpp:182
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_detail_string(log_level ll=log_level::all) const
const char * what() const noexcept override
const chainbase::database & db() const
signed_block_ptr fetch_block_by_number(uint32_t block_num) const
block_id_type get_block_id_for_num(uint32_t block_num) const
virtual void set_program_options(options_description &cli, options_description &cfg) override
void plugin_initialize(const variables_map &options)
#define FC_LOG_AND_RETHROW()
boost::asio::ip::tcp tcp
void check_ec(T *c, websocketpp::lib::error_code ec, websocketpp::connection_hdl hdl)
#define dlog(FORMAT,...)
Definition logger.hpp:101
#define ilog(FORMAT,...)
Definition logger.hpp:118
#define elog(FORMAT,...)
Definition logger.hpp:130
#define idump(SEQ)
Definition logger.hpp:158
application & app()
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
void pack(Stream &s, const std::deque< T > &value)
Definition raw.hpp:531
Definition name.hpp:106
std::shared_ptr< transaction_trace > transaction_trace_ptr
Definition trace.hpp:20
std::shared_ptr< const packed_transaction > packed_transaction_ptr
std::shared_ptr< signed_block > signed_block_ptr
Definition block.hpp:105
std::shared_ptr< block_state > block_state_ptr
std::variant< get_status_request_v0, get_blocks_request_v0, get_blocks_ack_request_v0 > state_request
Definition types.hpp:114
std::vector< table_delta > create_deltas(const chainbase::database &db, bool full_snapshot)
bytes zlib_compress_bytes(const bytes &in)
std::variant< get_status_result_v0, get_blocks_result_v0 > state_result
Definition types.hpp:115
bytes zlib_decompress(const bytes &in)
auto catch_and_log(F f)
@ self
the connection is to itself
Definition protocol.hpp:48
uint64_t ship_magic(uint16_t version, uint16_t features=0)
Definition log.hpp:38
#define T(meth, val, expected)
schedule config_dir_name data_dir_name p2p_port http_port file_size name host(p2p_endpoint)) FC_REFLECT(tn_node_def
const char *const state_history_plugin_abi
unsigned short uint16_t
Definition stdint.h:125
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
static constexpr int medium
std::vector< block_position > have_positions
Definition types.hpp:93
bytes pack(const chainbase::database &db, bool trace_debug_mode, const block_state_ptr &block_state)
std::optional< augmented_transaction_trace > onblock_trace
void add_transaction(const transaction_trace_ptr &trace, const chain::packed_transaction_ptr &transaction)
std::map< transaction_id_type, augmented_transaction_trace > cached_traces
std::shared_ptr< state_history_plugin_impl > plugin
void on_fail(boost::system::error_code ec, const char *what)
session(std::shared_ptr< state_history_plugin_impl > plugin)
void operator()(get_blocks_request_v0 &req)
void operator()(get_blocks_ack_request_v0 &req)
void send_update(const block_state_ptr &block_state)
void send_update(get_blocks_result_v0 result, const block_state_ptr &block_state)
void callback(boost::system::error_code ec, const char *what, F f)
std::optional< get_blocks_request_v0 > current_request
std::unique_ptr< ws::stream< tcp::socket > > socket_stream
std::vector< std::vector< char > > send_queue
std::unique_ptr< tcp::acceptor > acceptor
std::optional< state_history_log > trace_log
std::map< session *, std::shared_ptr< session > > sessions
void store_traces(const block_state_ptr &block_state)
std::optional< scoped_connection > accepted_block_connection
state_history::trace_converter trace_converter
void get_log_entry(state_history_log &log, uint32_t block_num, std::optional< bytes > &result)
void on_applied_transaction(const transaction_trace_ptr &p, const packed_transaction_ptr &t)
void get_block(uint32_t block_num, const block_state_ptr &block_state, std::optional< bytes > &result)
std::optional< scoped_connection > applied_transaction_connection
void on_accepted_block(const block_state_ptr &block_state)
std::optional< state_history_log > chain_state_log
std::optional< scoped_connection > block_start_connection
void store_chain_state(const block_state_ptr &block_state)
std::optional< chain::block_id_type > get_block_id(uint32_t block_num)
void cli()
CK_ULONG d
char * s