10#include <boost/asio/bind_executor.hpp>
11#include <boost/asio/ip/host_name.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/asio/strand.hpp>
14#include <boost/beast/core.hpp>
15#include <boost/beast/websocket.hpp>
16#include <boost/signals2/connection.hpp>
18using tcp = boost::asio::ip::tcp;
19namespace ws = boost::beast::websocket;
25using namespace state_history;
26using boost::signals2::scoped_connection;
36 }
catch (
const std::exception& e) {
39 elog(
"unknown exception");
58 if (block_num < log.begin_block() || block_num >= log.end_block())
61 auto& stream = log.get_entry(block_num, header);
66 stream.read((
char*)&
s,
sizeof(
s));
70 stream.read(compressed.data(), s2);
90 if (
trace_log && block_num >=
trace_log->begin_block() && block_num < trace_log->end_block())
91 return trace_log->get_block_id(block_num);
100 struct session : std::enable_shared_from_this<session> {
101 std::shared_ptr<state_history_plugin_impl>
plugin;
113 ilog(
"incoming connection");
114 socket_stream = std::make_unique<ws::stream<tcp::socket>>(std::move(socket));
116 socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(
true));
117 socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
118 socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024));
119 socket_stream->async_accept([
self = shared_from_this()](boost::system::error_code ec) {
120 self->callback(ec,
"async_accept", [
self] {
128 auto in_buffer = std::make_shared<boost::beast::flat_buffer>();
130 *in_buffer, [
self = shared_from_this(), in_buffer](boost::system::error_code ec,
size_t) {
131 self->callback(ec,
"async_read", [
self, in_buffer] {
132 auto d = boost::asio::buffer_cast<char const*>(boost::beast::buffers_front(in_buffer->data()));
133 auto s = boost::asio::buffer_size(in_buffer->data());
137 std::visit(*
self, req);
148 template <
typename T>
164 [
self = shared_from_this()](boost::system::error_code ec,
size_t) {
165 self->callback(ec,
"async_write", [
self] {
166 self->send_queue.erase(
self->send_queue.begin());
167 self->sending =
false;
175 auto& chain =
plugin->chain_plug->chain();
177 result.head = {chain.head_block_num(), chain.head_block_id()};
178 result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
179 result.chain_id = chain.get_chain_id();
181 result.trace_begin_block =
plugin->trace_log->begin_block();
182 result.trace_end_block =
plugin->trace_log->end_block();
184 if (
plugin->chain_state_log) {
185 result.chain_state_begin_block =
plugin->chain_state_log->begin_block();
186 result.chain_state_end_block =
plugin->chain_state_log->end_block();
188 send(std::move(result));
195 auto id =
plugin->get_block_id(cp.block_num);
196 if (!
id || *
id != cp.block_id)
215 auto& chain =
plugin->chain_plug->chain();
216 result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
218 current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;
237 send(std::move(result));
258 auto& chain =
plugin->chain_plug->chain();
260 result.head = {chain.head_block_num(), chain.head_block_id()};
264 template <
typename F>
271 }
catch (
const std::exception& e) {
275 elog(
"unknown exception");
280 template <
typename F>
281 void callback(boost::system::error_code ec,
const char* what, F
f) {
291 void on_fail(boost::system::error_code ec,
const char* what) {
293 if (ec == boost::asio::error::eof) {
294 dlog(
"${w}: ${m}", (
"w", what)(
"m", ec.message()));
296 elog(
"${w}: ${m}", (
"w", what)(
"m", ec.message()));
300 elog(
"uncaught exception on close");
306 plugin->sessions.erase(
this);
309 std::map<session*, std::shared_ptr<session>>
sessions;
312 boost::system::error_code ec;
316 acceptor = std::make_unique<tcp::acceptor>(
app().get_io_service());
318 auto check_ec = [&](
const char* what) {
321 elog(
"${w}: ${m}", (
"w", what)(
"m", ec.message()));
322 SYS_ASSERT(
false, plugin_exception,
"unable to open listen socket");
325 acceptor->open(endpoint.protocol(), ec);
327 acceptor->set_option(boost::asio::socket_base::reuse_address(
true));
330 acceptor->listen(boost::asio::socket_base::max_listen_connections, ec);
336 auto socket = std::make_shared<tcp::socket>(
app().get_io_service());
337 acceptor->async_accept(*socket, [
self = shared_from_this(), socket,
this](
const boost::system::error_code& ec) {
341 if (ec == boost::system::errc::too_many_files_open)
346 auto s = std::make_shared<session>(
self);
348 s->start(std::move(*socket));
370 chain::state_history_write_exception,
371 "State history encountered an Error which it cannot recover from. Please resolve the error and relaunch "
398 SYS_ASSERT(traces_bin.size() == (
uint32_t)traces_bin.size(), plugin_exception,
"traces is too big");
402 .payload_size =
sizeof(
uint32_t) + traces_bin.size()};
404 uint32_t s = (uint32_t)traces_bin.size();
405 stream.write((char*)&s, sizeof(s));
406 if (!traces_bin.empty())
407 stream.write(traces_bin.data(), traces_bin.size());
422 .payload_size =
sizeof(
uint32_t) + deltas_bin.size()};
428 uint32_t s = (uint32_t)deltas_bin.size();
429 if (s != deltas_bin.size())
431 stream.write((char*)&s, sizeof(s));
432 if (!deltas_bin.empty())
433 stream.write(deltas_bin.data(), deltas_bin.size());
444 auto options = cfg.add_options();
445 options(
"state-history-dir", bpo::value<bfs::path>()->default_value(
"state-history"),
446 "the location of the state-history directory (absolute path or relative to application data dir)");
447 cli.add_options()(
"delete-state-history", bpo::bool_switch()->default_value(
false),
"clear state history files");
448 options(
"trace-history", bpo::bool_switch()->default_value(
false),
"enable trace history");
449 options(
"chain-state-history", bpo::bool_switch()->default_value(
false),
"enable chain state history");
450 options(
"state-history-endpoint", bpo::value<string>()->default_value(
"127.0.0.1:8080"),
451 "the endpoint upon which to listen for incoming connections. Caution: only expose this port to "
452 "your internal network.");
453 options(
"trace-history-debug-mode", bpo::bool_switch()->default_value(
false),
"enable debug mode for trace history");
456 options(
"state-history-log-retain-blocks", bpo::value<uint32_t>(),
"if set, periodically prune the state history files to store only configured number of most recent blocks");
461 SYS_ASSERT(options.at(
"disable-replay-opts").as<
bool>(), plugin_exception,
462 "state_history_plugin requires --disable-replay-opts");
465 SYS_ASSERT(my->chain_plug, chain::missing_chain_plugin_exception,
"");
466 auto& chain = my->chain_plug->chain();
467 my->applied_transaction_connection.emplace(
468 chain.applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> t) {
469 my->on_applied_transaction(std::get<0>(t), std::get<1>(t));
471 my->accepted_block_connection.emplace(
472 chain.accepted_block.connect([&](
const block_state_ptr&
p) { my->on_accepted_block(p); }));
473 my->block_start_connection.emplace(
474 chain.block_start.connect([&](
uint32_t block_num) { my->on_block_start(block_num); }));
476 auto dir_option = options.at(
"state-history-dir").as<bfs::path>();
477 boost::filesystem::path state_history_dir;
478 if (dir_option.is_relative())
481 state_history_dir = dir_option;
482 if (
auto resmon_plugin =
app().find_plugin<resource_monitor_plugin>())
483 resmon_plugin->monitor_directory(state_history_dir);
485 auto ip_port = options.at(
"state-history-endpoint").as<
string>();
486 auto port = ip_port.substr(ip_port.find(
':') + 1, ip_port.size());
487 auto host = ip_port.substr(0, ip_port.find(
':'));
488 my->endpoint_address =
host;
489 my->endpoint_port = std::stoi(port);
490 idump((ip_port)(host)(port));
492 if (options.at(
"delete-state-history").as<
bool>()) {
493 ilog(
"Deleting state history");
494 boost::filesystem::remove_all(state_history_dir);
496 boost::filesystem::create_directories(state_history_dir);
498 if (options.at(
"trace-history-debug-mode").as<
bool>()) {
499 my->trace_debug_mode =
true;
502 std::optional<state_history_log_prune_config> ship_log_prune_conf;
503 if (options.count(
"state-history-log-retain-blocks")) {
504 ship_log_prune_conf.emplace();
505 ship_log_prune_conf->prune_blocks = options.at(
"state-history-log-retain-blocks").as<
uint32_t>();
508 SYS_ASSERT(ship_log_prune_conf->prune_blocks >= 1000, plugin_exception,
"state-history-log-retain-blocks must be 1000 blocks or greater");
511 if (options.at(
"trace-history").as<
bool>())
512 my->trace_log.emplace(
"trace_history", (state_history_dir /
"trace_history.log").
string(),
513 (state_history_dir /
"trace_history.index").
string(), ship_log_prune_conf);
514 if (options.at(
"chain-state-history").as<
bool>())
515 my->chain_state_log.emplace(
"chain_state_history", (state_history_dir /
"chain_state_history.log").
string(),
516 (state_history_dir /
"chain_state_history.index").
string(), ship_log_prune_conf);
524 my->applied_transaction_connection.reset();
525 my->accepted_block_connection.reset();
526 my->block_start_connection.reset();
527 while (!my->sessions.empty())
528 my->sessions.begin()->second->close();
std::string ws(int const level)
const char *const state_history_plugin_abi
#define SYS_THROW(exc_type, FORMAT,...)
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
abstract_plugin * find_plugin(const string &name) const
bfs::path data_dir() const
Get data directory.
auto post(int priority, Func &&func)
static bool supports_hole_punching()
Used to generate a useful error report when an exception is thrown.
std::string to_detail_string(log_level ll=log_level::all) const
const char * what() const noexcept override
const chainbase::database & db() const
signed_block_ptr fetch_block_by_number(uint32_t block_num) const
block_id_type get_block_id_for_num(uint32_t block_num) const
virtual void set_program_options(options_description &cli, options_description &cfg) override
void plugin_initialize(const variables_map &options)
virtual ~state_history_plugin()
#define FC_LOG_AND_RETHROW()
void check_ec(T *c, websocketpp::lib::error_code ec, websocketpp::connection_hdl hdl)
void unpack(Stream &s, std::deque< T > &value)
void pack(Stream &s, const std::deque< T > &value)
std::shared_ptr< transaction_trace > transaction_trace_ptr
std::shared_ptr< const packed_transaction > packed_transaction_ptr
std::shared_ptr< signed_block > signed_block_ptr
std::shared_ptr< block_state > block_state_ptr
std::variant< get_status_request_v0, get_blocks_request_v0, get_blocks_ack_request_v0 > state_request
std::vector< table_delta > create_deltas(const chainbase::database &db, bool full_snapshot)
bytes zlib_compress_bytes(const bytes &in)
std::variant< get_status_result_v0, get_blocks_result_v0 > state_result
bytes zlib_decompress(const bytes &in)
@ self
the connection is to itself
uint64_t ship_magic(uint16_t version, uint16_t features=0)
#define T(meth, val, expected)
schedule config_dir_name data_dir_name p2p_port http_port file_size name host(p2p_endpoint)) FC_REFLECT(tn_node_def
const char *const state_history_plugin_abi
unsigned __int64 uint64_t
static constexpr int medium
std::vector< block_position > have_positions
bytes pack(const chainbase::database &db, bool trace_debug_mode, const block_state_ptr &block_state)
std::optional< augmented_transaction_trace > onblock_trace
void add_transaction(const transaction_trace_ptr &trace, const chain::packed_transaction_ptr &transaction)
std::map< transaction_id_type, augmented_transaction_trace > cached_traces
std::shared_ptr< state_history_plugin_impl > plugin
void start(tcp::socket socket)
void on_fail(boost::system::error_code ec, const char *what)
session(std::shared_ptr< state_history_plugin_impl > plugin)
void operator()(get_blocks_request_v0 &req)
void operator()(get_blocks_ack_request_v0 &req)
void send_update(const block_state_ptr &block_state)
void send_update(get_blocks_result_v0 result, const block_state_ptr &block_state)
void operator()(get_status_request_v0 &)
void callback(boost::system::error_code ec, const char *what, F f)
void send_update(bool changed=false)
void catch_and_close(F f)
std::optional< get_blocks_request_v0 > current_request
std::unique_ptr< ws::stream< tcp::socket > > socket_stream
std::vector< std::vector< char > > send_queue
std::unique_ptr< tcp::acceptor > acceptor
std::optional< state_history_log > trace_log
std::map< session *, std::shared_ptr< session > > sessions
void store_traces(const block_state_ptr &block_state)
std::optional< scoped_connection > accepted_block_connection
state_history::trace_converter trace_converter
void get_log_entry(state_history_log &log, uint32_t block_num, std::optional< bytes > &result)
void on_block_start(uint32_t block_num)
chain_plugin * chain_plug
void on_applied_transaction(const transaction_trace_ptr &p, const packed_transaction_ptr &t)
void get_block(uint32_t block_num, const block_state_ptr &block_state, std::optional< bytes > &result)
std::optional< scoped_connection > applied_transaction_connection
void on_accepted_block(const block_state_ptr &block_state)
std::optional< state_history_log > chain_state_log
std::optional< scoped_connection > block_start_connection
void store_chain_state(const block_state_ptr &block_state)
std::optional< chain::block_id_type > get_block_id(uint32_t block_num)