10#include <boost/asio/bind_executor.hpp> 
   11#include <boost/asio/ip/host_name.hpp> 
   12#include <boost/asio/ip/tcp.hpp> 
   13#include <boost/asio/strand.hpp> 
   14#include <boost/beast/core.hpp> 
   15#include <boost/beast/websocket.hpp> 
   16#include <boost/signals2/connection.hpp> 
   18using tcp    = boost::asio::ip::tcp;
 
   19namespace ws = boost::beast::websocket;
 
   25using namespace state_history;
 
   26using boost::signals2::scoped_connection;
 
   36   } 
catch (
const std::exception& e) {
 
   39      elog(
"unknown exception");
 
 
   58      if (block_num < log.begin_block() || block_num >= log.end_block())
 
   61      auto&                    stream = log.get_entry(block_num, header);
 
   66      stream.read((
char*)&
s, 
sizeof(
s));
 
   70         stream.read(compressed.data(), s2);
 
 
   90      if (
trace_log && block_num >= 
trace_log->begin_block() && block_num < trace_log->end_block())
 
   91         return trace_log->get_block_id(block_num);
 
 
  100   struct session : std::enable_shared_from_this<session> {
 
  101      std::shared_ptr<state_history_plugin_impl> 
plugin;
 
  113         ilog(
"incoming connection");
 
  114         socket_stream = std::make_unique<ws::stream<tcp::socket>>(std::move(socket));
 
  116         socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(
true));
 
  117         socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
 
  118         socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024));
 
  119         socket_stream->async_accept([
self = shared_from_this()](boost::system::error_code ec) {
 
  120            self->callback(ec, 
"async_accept", [
self] {
 
 
  128         auto in_buffer = std::make_shared<boost::beast::flat_buffer>();
 
  130             *in_buffer, [
self = shared_from_this(), in_buffer](boost::system::error_code ec, 
size_t) {
 
  131                self->callback(ec, 
"async_read", [
self, in_buffer] {
 
  132                   auto d = boost::asio::buffer_cast<char const*>(boost::beast::buffers_front(in_buffer->data()));
 
  133                   auto s = boost::asio::buffer_size(in_buffer->data());
 
  137                   std::visit(*
self, req);
 
 
  148      template <
typename T>
 
  164             [
self = shared_from_this()](boost::system::error_code ec, 
size_t) {
 
  165                self->callback(ec, 
"async_write", [
self] {
 
  166                   self->send_queue.erase(
self->send_queue.begin());
 
  167                   self->sending = 
false;
 
 
  175         auto&                chain = 
plugin->chain_plug->chain();
 
  177         result.head              = {chain.head_block_num(), chain.head_block_id()};
 
  178         result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
 
  179         result.chain_id          = chain.get_chain_id();
 
  181            result.trace_begin_block = 
plugin->trace_log->begin_block();
 
  182            result.trace_end_block   = 
plugin->trace_log->end_block();
 
  184         if (
plugin->chain_state_log) {
 
  185            result.chain_state_begin_block = 
plugin->chain_state_log->begin_block();
 
  186            result.chain_state_end_block   = 
plugin->chain_state_log->end_block();
 
  188         send(std::move(result));
 
 
  195            auto id = 
plugin->get_block_id(cp.block_num);
 
  196            if (!
id || *
id != cp.block_id)
 
 
  215         auto& chain              = 
plugin->chain_plug->chain();
 
  216         result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
 
  218             current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;
 
  237         send(std::move(result));
 
 
  258         auto&                chain = 
plugin->chain_plug->chain();
 
  260         result.head = {chain.head_block_num(), chain.head_block_id()};
 
 
  264      template <
typename F>
 
  271         } 
catch (
const std::exception& e) {
 
  275            elog(
"unknown exception");
 
 
  280      template <
typename F>
 
  281      void callback(boost::system::error_code ec, 
const char* what, F 
f) {
 
 
  291      void on_fail(boost::system::error_code ec, 
const char* what) {
 
  293            if (ec == boost::asio::error::eof) {
 
  294               dlog(
"${w}: ${m}", (
"w", what)(
"m", ec.message()));
 
  296               elog(
"${w}: ${m}", (
"w", what)(
"m", ec.message()));
 
  300            elog(
"uncaught exception on close");
 
 
  306         plugin->sessions.erase(
this);
 
 
 
  309   std::map<session*, std::shared_ptr<session>> 
sessions;
 
  312      boost::system::error_code ec;
 
  316      acceptor      = std::make_unique<tcp::acceptor>(
app().get_io_service());
 
  318      auto check_ec = [&](
const char* what) {
 
  321         elog(
"${w}: ${m}", (
"w", what)(
"m", ec.message()));
 
  322         SYS_ASSERT(
false, plugin_exception, 
"unable to open listen socket");
 
  325      acceptor->open(endpoint.protocol(), ec);
 
  327      acceptor->set_option(boost::asio::socket_base::reuse_address(
true));
 
  330      acceptor->listen(boost::asio::socket_base::max_listen_connections, ec);
 
 
  336      auto socket = std::make_shared<tcp::socket>(
app().get_io_service());
 
  337      acceptor->async_accept(*socket, [
self = shared_from_this(), socket, 
this](
const boost::system::error_code& ec) {
 
  341            if (ec == boost::system::errc::too_many_files_open)
 
  346            auto s            = std::make_shared<session>(
self);
 
  348            s->start(std::move(*socket));
 
 
  370             chain::state_history_write_exception,
 
  371             "State history encountered an Error which it cannot recover from.  Please resolve the error and relaunch " 
 
  398      SYS_ASSERT(traces_bin.size() == (
uint32_t)traces_bin.size(), plugin_exception, 
"traces is too big");
 
  402                                      .payload_size = 
sizeof(
uint32_t) + traces_bin.size()};
 
  404         uint32_t s = (uint32_t)traces_bin.size();
 
  405         stream.write((char*)&s, sizeof(s));
 
  406         if (!traces_bin.empty())
 
  407            stream.write(traces_bin.data(), traces_bin.size());
 
 
  422                                      .payload_size = 
sizeof(
uint32_t) + deltas_bin.size()};
 
  428         uint32_t s = (uint32_t)deltas_bin.size();
 
  429         if (s != deltas_bin.size())
 
  431         stream.write((char*)&s, sizeof(s));
 
  432         if (!deltas_bin.empty())
 
  433            stream.write(deltas_bin.data(), deltas_bin.size());
 
 
 
  444   auto options = cfg.add_options();
 
  445   options(
"state-history-dir", bpo::value<bfs::path>()->default_value(
"state-history"),
 
  446           "the location of the state-history directory (absolute path or relative to application data dir)");
 
  447   cli.add_options()(
"delete-state-history", bpo::bool_switch()->default_value(
false), 
"clear state history files");
 
  448   options(
"trace-history", bpo::bool_switch()->default_value(
false), 
"enable trace history");
 
  449   options(
"chain-state-history", bpo::bool_switch()->default_value(
false), 
"enable chain state history");
 
  450   options(
"state-history-endpoint", bpo::value<string>()->default_value(
"127.0.0.1:8080"),
 
  451           "the endpoint upon which to listen for incoming connections. Caution: only expose this port to " 
  452           "your internal network.");
 
  453   options(
"trace-history-debug-mode", bpo::bool_switch()->default_value(
false), 
"enable debug mode for trace history");
 
  456      options(
"state-history-log-retain-blocks", bpo::value<uint32_t>(), 
"if set, periodically prune the state history files to store only configured number of most recent blocks");
 
 
  461      SYS_ASSERT(options.at(
"disable-replay-opts").as<
bool>(), plugin_exception,
 
  462                 "state_history_plugin requires --disable-replay-opts");
 
  465      SYS_ASSERT(my->chain_plug, chain::missing_chain_plugin_exception, 
"");
 
  466      auto& chain = my->chain_plug->chain();
 
  467      my->applied_transaction_connection.emplace(
 
  468          chain.applied_transaction.connect([&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> t) {
 
  469             my->on_applied_transaction(std::get<0>(t), std::get<1>(t));
 
  471      my->accepted_block_connection.emplace(
 
  472          chain.accepted_block.connect([&](
const block_state_ptr& 
p) { my->on_accepted_block(p); }));
 
  473      my->block_start_connection.emplace(
 
  474          chain.block_start.connect([&](
uint32_t block_num) { my->on_block_start(block_num); }));
 
  476      auto                    dir_option = options.at(
"state-history-dir").as<bfs::path>();
 
  477      boost::filesystem::path state_history_dir;
 
  478      if (dir_option.is_relative())
 
  481         state_history_dir = dir_option;
 
  482      if (
auto resmon_plugin = 
app().find_plugin<resource_monitor_plugin>())
 
  483         resmon_plugin->monitor_directory(state_history_dir);
 
  485      auto ip_port         = options.at(
"state-history-endpoint").as<
string>();
 
  486      auto port            = ip_port.substr(ip_port.find(
':') + 1, ip_port.size());
 
  487      auto host            = ip_port.substr(0, ip_port.find(
':'));
 
  488      my->endpoint_address = 
host;
 
  489      my->endpoint_port    = std::stoi(port);
 
  490      idump((ip_port)(host)(port));
 
  492      if (options.at(
"delete-state-history").as<
bool>()) {
 
  493         ilog(
"Deleting state history");
 
  494         boost::filesystem::remove_all(state_history_dir);
 
  496      boost::filesystem::create_directories(state_history_dir);
 
  498      if (options.at(
"trace-history-debug-mode").as<
bool>()) {
 
  499         my->trace_debug_mode = 
true;
 
  502      std::optional<state_history_log_prune_config> ship_log_prune_conf;
 
  503      if (options.count(
"state-history-log-retain-blocks")) {
 
  504         ship_log_prune_conf.emplace();
 
  505         ship_log_prune_conf->prune_blocks = options.at(
"state-history-log-retain-blocks").as<
uint32_t>();
 
  508         SYS_ASSERT(ship_log_prune_conf->prune_blocks >= 1000, plugin_exception, 
"state-history-log-retain-blocks must be 1000 blocks or greater");
 
  511      if (options.at(
"trace-history").as<
bool>())
 
  512         my->trace_log.emplace(
"trace_history", (state_history_dir / 
"trace_history.log").
string(),
 
  513                               (state_history_dir / 
"trace_history.index").
string(), ship_log_prune_conf);
 
  514      if (options.at(
"chain-state-history").as<
bool>())
 
  515         my->chain_state_log.emplace(
"chain_state_history", (state_history_dir / 
"chain_state_history.log").
string(),
 
  516                                     (state_history_dir / 
"chain_state_history.index").
string(), ship_log_prune_conf);
 
 
  524   my->applied_transaction_connection.reset();
 
  525   my->accepted_block_connection.reset();
 
  526   my->block_start_connection.reset();
 
  527   while (!my->sessions.empty())
 
  528      my->sessions.begin()->second->close();
 
 
std::string ws(int const level)
const char *const state_history_plugin_abi
#define SYS_THROW(exc_type, FORMAT,...)
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
abstract_plugin * find_plugin(const string &name) const
bfs::path data_dir() const
Get data directory.
auto post(int priority, Func &&func)
static bool supports_hole_punching()
Used to generate a useful error report when an exception is thrown.
std::string to_detail_string(log_level ll=log_level::all) const
const char * what() const noexcept override
const chainbase::database & db() const
signed_block_ptr fetch_block_by_number(uint32_t block_num) const
block_id_type get_block_id_for_num(uint32_t block_num) const
virtual void set_program_options(options_description &cli, options_description &cfg) override
void plugin_initialize(const variables_map &options)
virtual ~state_history_plugin()
#define FC_LOG_AND_RETHROW()
void check_ec(T *c, websocketpp::lib::error_code ec, websocketpp::connection_hdl hdl)
void unpack(Stream &s, std::deque< T > &value)
void pack(Stream &s, const std::deque< T > &value)
std::shared_ptr< transaction_trace > transaction_trace_ptr
std::shared_ptr< const packed_transaction > packed_transaction_ptr
std::shared_ptr< signed_block > signed_block_ptr
std::shared_ptr< block_state > block_state_ptr
std::variant< get_status_request_v0, get_blocks_request_v0, get_blocks_ack_request_v0 > state_request
std::vector< table_delta > create_deltas(const chainbase::database &db, bool full_snapshot)
bytes zlib_compress_bytes(const bytes &in)
std::variant< get_status_result_v0, get_blocks_result_v0 > state_result
bytes zlib_decompress(const bytes &in)
@ self
the connection is to itself
uint64_t ship_magic(uint16_t version, uint16_t features=0)
#define T(meth, val, expected)
schedule config_dir_name data_dir_name p2p_port http_port file_size name host(p2p_endpoint)) FC_REFLECT(tn_node_def
const char *const state_history_plugin_abi
unsigned __int64 uint64_t
static constexpr int medium
std::vector< block_position > have_positions
bytes pack(const chainbase::database &db, bool trace_debug_mode, const block_state_ptr &block_state)
std::optional< augmented_transaction_trace > onblock_trace
void add_transaction(const transaction_trace_ptr &trace, const chain::packed_transaction_ptr &transaction)
std::map< transaction_id_type, augmented_transaction_trace > cached_traces
std::shared_ptr< state_history_plugin_impl > plugin
void start(tcp::socket socket)
void on_fail(boost::system::error_code ec, const char *what)
session(std::shared_ptr< state_history_plugin_impl > plugin)
void operator()(get_blocks_request_v0 &req)
void operator()(get_blocks_ack_request_v0 &req)
void send_update(const block_state_ptr &block_state)
void send_update(get_blocks_result_v0 result, const block_state_ptr &block_state)
void operator()(get_status_request_v0 &)
void callback(boost::system::error_code ec, const char *what, F f)
void send_update(bool changed=false)
void catch_and_close(F f)
std::optional< get_blocks_request_v0 > current_request
std::unique_ptr< ws::stream< tcp::socket > > socket_stream
std::vector< std::vector< char > > send_queue
std::unique_ptr< tcp::acceptor > acceptor
std::optional< state_history_log > trace_log
std::map< session *, std::shared_ptr< session > > sessions
void store_traces(const block_state_ptr &block_state)
std::optional< scoped_connection > accepted_block_connection
state_history::trace_converter trace_converter
void get_log_entry(state_history_log &log, uint32_t block_num, std::optional< bytes > &result)
void on_block_start(uint32_t block_num)
chain_plugin * chain_plug
void on_applied_transaction(const transaction_trace_ptr &p, const packed_transaction_ptr &t)
void get_block(uint32_t block_num, const block_state_ptr &block_state, std::optional< bytes > &result)
std::optional< scoped_connection > applied_transaction_connection
void on_accepted_block(const block_state_ptr &block_state)
std::optional< state_history_log > chain_state_log
std::optional< scoped_connection > block_start_connection
void store_chain_state(const block_state_ptr &block_state)
std::optional< chain::block_id_type > get_block_id(uint32_t block_num)