Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
trace_api_plugin.cpp
Go to the documentation of this file.
2
7
9
11
12#include <boost/signals2/connection.hpp>
13
14using namespace sysio::trace_api;
16using boost::signals2::scoped_connection;
17
18namespace {
20
21 const std::string logger_name("trace_api");
23
24 std::string to_detail_string(const std::exception_ptr& e) {
25 try {
26 std::rethrow_exception(e);
27 } catch (fc::exception& er) {
28 return er.to_detail_string();
29 } catch (const std::exception& e) {
30 fc::exception fce(
31 FC_LOG_MESSAGE(warn, "std::exception: ${what}: ", ("what", e.what())),
33 BOOST_CORE_TYPEID(e).name(),
34 e.what());
35 return fce.to_detail_string();
36 } catch (...) {
38 FC_LOG_MESSAGE(warn, "unknown: ",),
39 std::current_exception());
40 return ue.to_detail_string();
41 }
42 }
43
44 void log_exception( const exception_with_context& e, fc::log_level level ) {
45 if( _log.is_enabled( level ) ) {
46 auto detail_string = to_detail_string(std::get<0>(e));
47 auto context = fc::log_context( level, std::get<1>(e), std::get<2>(e), std::get<3>(e) );
48 _log.log(fc::log_message( context, detail_string ));
49 }
50 }
51
62 template<typename F>
63 void emit_killer(F&& f) {
64 try {
65 f();
66 } catch (const yield_exception& ) {
67 SYS_THROW(chain::controller_emit_signal_exception, "Trace API encountered an Error which it cannot recover from. Please resolve the error and relaunch the process")
68 }
69 }
70
71 template<typename Store>
72 struct shared_store_provider {
73 shared_store_provider(const std::shared_ptr<Store>& store)
74 :store(store)
75 {}
76
77 template <typename BlockTrace>
78 void append( const BlockTrace& trace ) {
79 store->append(trace);
80 }
81
82 void append_lib( uint32_t new_lib ) {
83 store->append_lib(new_lib);
84 }
85
86 get_block_t get_block(uint32_t height, const yield_function& yield) {
87 return store->get_block(height, yield);
88 }
89
90 void append_trx_ids(block_trxs_entry tt){
91 store->append_trx_ids(std::move(tt));
92 }
93
94 std::shared_ptr<Store> store;
95 };
96}
97
98namespace sysio {
99
104 static void set_program_options(appbase::options_description& cli, appbase::options_description& cfg) {
105 auto cfg_options = cfg.add_options();
106 cfg_options("trace-dir", bpo::value<bfs::path>()->default_value("traces"),
107 "the location of the trace directory (absolute path or relative to application data dir)");
108 cfg_options("trace-slice-stride", bpo::value<uint32_t>()->default_value(10'000),
109 "the number of blocks each \"slice\" of trace data will contain on the filesystem");
110 cfg_options("trace-minimum-irreversible-history-blocks", boost::program_options::value<int32_t>()->default_value(-1),
111 "Number of blocks to ensure are kept past LIB for retrieval before \"slice\" files can be automatically removed.\n"
112 "A value of -1 indicates that automatic removal of \"slice\" files will be turned off.");
113 cfg_options("trace-minimum-uncompressed-irreversible-history-blocks", boost::program_options::value<int32_t>()->default_value(-1),
114 "Number of blocks to ensure are uncompressed past LIB. Compressed \"slice\" files are still accessible but may carry a performance loss on retrieval\n"
115 "A value of -1 indicates that automatic compression of \"slice\" files will be turned off.");
116 }
117
118 void plugin_initialize(const appbase::variables_map& options) {
119 auto dir_option = options.at("trace-dir").as<bfs::path>();
120 if (dir_option.is_relative())
121 trace_dir = app().data_dir() / dir_option;
122 else
123 trace_dir = dir_option;
124 if (auto resmon_plugin = app().find_plugin<resource_monitor_plugin>())
125 resmon_plugin->monitor_directory(trace_dir);
126
127 slice_stride = options.at("trace-slice-stride").as<uint32_t>();
128
129 const int32_t blocks = options.at("trace-minimum-irreversible-history-blocks").as<int32_t>();
130 SYS_ASSERT(blocks >= -1, chain::plugin_config_exception,
131 "\"trace-minimum-irreversible-history-blocks\" must be greater to or equal to -1.");
132 if (blocks > manual_slice_file_value) {
134 }
135
136 const int32_t uncompressed_blocks = options.at("trace-minimum-uncompressed-irreversible-history-blocks").as<int32_t>();
137 SYS_ASSERT(uncompressed_blocks >= -1, chain::plugin_config_exception,
138 "\"trace-minimum-uncompressed-irreversible-history-blocks\" must be greater to or equal to -1.");
139
140 if (uncompressed_blocks > manual_slice_file_value) {
142 }
143
144 store = std::make_shared<store_provider>(
145 trace_dir,
150 );
151 }
152
154 store->start_maintenance_thread([](const std::string& msg ){
155 fc_dlog( _log, msg );
156 });
157 }
158
160 store->stop_maintenance_thread();
161 }
162
163 // common configuration paramters
164 boost::filesystem::path trace_dir;
166
167 std::optional<uint32_t> minimum_irreversible_history_blocks;
169
170 static constexpr int32_t manual_slice_file_value = -1;
171 static constexpr uint32_t compression_seek_point_stride = 6 * 1024 * 1024; // 6 MiB strides for clog seek points
172
173 std::shared_ptr<store_provider> store;
174};
175
179struct trace_api_rpc_plugin_impl : public std::enable_shared_from_this<trace_api_rpc_plugin_impl>
180{
181 trace_api_rpc_plugin_impl( const std::shared_ptr<trace_api_common_impl>& common )
182 :common(common) {}
183
184 static void set_program_options(appbase::options_description& cli, appbase::options_description& cfg) {
185 auto cfg_options = cfg.add_options();
186 cfg_options("trace-rpc-abi", bpo::value<vector<string>>()->composing(),
187 "ABIs used when decoding trace RPC responses.\n"
188 "There must be at least one ABI specified OR the flag trace-no-abis must be used.\n"
189 "ABIs are specified as \"Key=Value\" pairs in the form <account-name>=<abi-def>\n"
190 "Where <abi-def> can be:\n"
191 " an absolute path to a file containing a valid JSON-encoded ABI\n"
192 " a relative path from `data-dir` to a file containing a valid JSON-encoded ABI\n"
193 );
194 cfg_options("trace-no-abis",
195 "Use to indicate that the RPC responses will not use ABIs.\n"
196 "Failure to specify this option when there are no trace-rpc-abi configuations will result in an Error.\n"
197 "This option is mutually exclusive with trace-rpc-api"
198 );
199 }
200
201 void plugin_initialize(const appbase::variables_map& options) {
202 ilog("initializing trace api rpc plugin");
203 std::shared_ptr<abi_data_handler> data_handler = std::make_shared<abi_data_handler>([](const exception_with_context& e){
204 log_exception(e, fc::log_level::debug);
205 });
206
207 if( options.count("trace-rpc-abi") ) {
208 SYS_ASSERT(options.count("trace-no-abis") == 0, chain::plugin_config_exception,
209 "Trace API is configured with ABIs however trace-no-abis is set");
210 const std::vector<std::string> key_value_pairs = options["trace-rpc-abi"].as<std::vector<std::string>>();
211 for (const auto& entry : key_value_pairs) {
212 try {
213 auto kv = parse_kv_pairs(entry);
214 auto account = chain::name(kv.first);
215 auto abi = abi_def_from_file(kv.second, app().data_dir());
216 data_handler->add_abi(account, abi);
217 } catch (...) {
218 elog("Malformed trace-rpc-abi provider: \"${val}\"", ("val", entry));
219 throw;
220 }
221 }
222 } else {
223 SYS_ASSERT(options.count("trace-no-abis") != 0, chain::plugin_config_exception,
224 "Trace API is not configured with ABIs and trace-no-abis is not set");
225 }
226
227 req_handler = std::make_shared<request_handler_t>(
228 shared_store_provider<store_provider>(common->store),
230 [](const std::string& msg ) {
231 fc_dlog( _log, msg );
232 }
233 );
234 }
235
236 fc::time_point calc_deadline( const fc::microseconds& max_serialization_time ) {
238 if( max_serialization_time > fc::microseconds::maximum() - deadline.time_since_epoch() ) {
239 deadline = fc::time_point::maximum();
240 } else {
241 deadline += max_serialization_time;
242 }
243 return deadline;
244 }
245
247 auto& http = app().get_plugin<http_plugin>();
248 fc::microseconds max_response_time = http.get_max_response_time();
249
250 http.add_async_handler("/v1/trace_api/get_block",
251 [wthis=weak_from_this(), max_response_time](std::string, std::string body, url_response_callback cb)
252 {
253 auto that = wthis.lock();
254 if (!that) {
255 return;
256 }
257
258 auto block_number = ([&body]() -> std::optional<uint32_t> {
259 if (body.empty()) {
260 return {};
261 }
262
263 try {
264 auto input = fc::json::from_string(body);
265 auto block_num = input.get_object()["block_num"].as_uint64();
266 if (block_num > std::numeric_limits<uint32_t>::max()) {
267 return {};
268 }
269 return block_num;
270 } catch (...) {
271 return {};
272 }
273 })();
274
275 if (!block_number) {
276 error_results results{400, "Bad or missing block_num"};
277 cb( 400, fc::variant( results ));
278 return;
279 }
280
281 try {
282
283 const auto deadline = that->calc_deadline( max_response_time );
284 auto resp = that->req_handler->get_block_trace(*block_number, [deadline]() { FC_CHECK_DEADLINE(deadline); });
285 if (resp.is_null()) {
286 error_results results{404, "Trace API: block trace missing"};
287 cb( 404, fc::variant( results ));
288 } else {
289 cb( 200, std::move(resp) );
290 }
291 } catch (...) {
292 http_plugin::handle_exception("trace_api", "get_block", body, cb);
293 }
294 });
295
296
297 http.add_async_handler("/v1/trace_api/get_transaction_trace",
298 [wthis=weak_from_this(), max_response_time, this](std::string, std::string body, url_response_callback cb)
299 {
300 auto that = wthis.lock();
301 if (!that) {
302 return;
303 }
304
305 auto trx_id = ([&body]() -> std::optional<transaction_id_type> {
306 if (body.empty()) {
307 return {};
308 }
309 try {
310 auto input = fc::json::from_string(body);
311 auto trxid = input.get_object()["id"].as_string();
312 if (trxid.size() < 8 || trxid.size() > 64) {
313 return {};
314 }
315 return transaction_id_type(trxid);
316 } catch (...) {
317 return {};
318 }
319 })();
320
321 if (!trx_id) {
322 error_results results{400, "Bad or missing transaction ID"};
323 cb( 400, fc::variant( results ));
324 return;
325 }
326
327 try {
328 const auto deadline = that->calc_deadline( max_response_time );
329 // search for the block that contains the transaction
330 get_block_n blk_num = common->store->get_trx_block_number(*trx_id, common->minimum_irreversible_history_blocks, [deadline]() { FC_CHECK_DEADLINE(deadline); });
331 if (!blk_num.has_value()){
332 error_results results{404, "Trace API: transaction id missing in the transaction id log files"};
333 cb( 404, fc::variant( results ));
334 } else {
335 auto resp = that->req_handler->get_transaction_trace(*trx_id, *blk_num, [deadline]() { FC_CHECK_DEADLINE(deadline); });
336 if (resp.is_null()) {
337 error_results results{404, "Trace API: transaction trace missing"};
338 cb( 404, fc::variant( results ));
339 } else {
340 cb( 200, std::move(resp) );
341 }
342 }
343 } catch (...) {
344 http_plugin::handle_exception("trace_api", "get_transaction", body, cb);
345 }
346 });
347 }
348
350 }
351
352 std::shared_ptr<trace_api_common_impl> common;
353
355 std::shared_ptr<request_handler_t> req_handler;
356};
357
359 trace_api_plugin_impl( const std::shared_ptr<trace_api_common_impl>& common )
360 :common(common) {}
361
362 static void set_program_options(appbase::options_description& cli, appbase::options_description& cfg) {
363 }
364
365 void plugin_initialize(const appbase::variables_map& options) {
366 ilog("initializing trace api plugin");
367 auto log_exceptions_and_shutdown = [](const exception_with_context& e) {
368 log_exception(e, fc::log_level::error);
369 app().quit();
370 throw yield_exception("shutting down");
371 };
372 extraction = std::make_shared<chain_extraction_t>(shared_store_provider<store_provider>(common->store), log_exceptions_and_shutdown);
373
374 auto& chain = app().find_plugin<chain_plugin>()->chain();
375
377 chain.applied_transaction.connect([this](std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t) {
378 emit_killer([&](){
379 extraction->signal_applied_transaction(std::get<0>(t), std::get<1>(t));
380 });
381 }));
382
384 chain.block_start.connect([this](uint32_t block_num) {
385 emit_killer([&](){
386 extraction->signal_block_start(block_num);
387 });
388 }));
389
390 accepted_block_connection.emplace(
391 chain.accepted_block.connect([this](const chain::block_state_ptr& p) {
392 emit_killer([&](){
393 extraction->signal_accepted_block(p);
394 });
395 }));
396
397 irreversible_block_connection.emplace(
398 chain.irreversible_block.connect([this](const chain::block_state_ptr& p) {
399 emit_killer([&](){
400 extraction->signal_irreversible_block(p);
401 });
402 }));
403
404 }
405
407 common->plugin_startup();
408 }
409
411 common->plugin_shutdown();
412 }
413
414 std::shared_ptr<trace_api_common_impl> common;
415
417 std::shared_ptr<chain_extraction_t> extraction;
418
419 std::optional<scoped_connection> applied_transaction_connection;
420 std::optional<scoped_connection> block_start_connection;
421 std::optional<scoped_connection> accepted_block_connection;
422 std::optional<scoped_connection> irreversible_block_connection;
423};
424
425trace_api_plugin::trace_api_plugin()
426{}
427
430
436
437void trace_api_plugin::plugin_initialize(const appbase::variables_map& options) {
438 auto common = std::make_shared<trace_api_common_impl>();
439 common->plugin_initialize(options);
440
441 my = std::make_shared<trace_api_plugin_impl>(common);
442 my->plugin_initialize(options);
443
444 rpc = std::make_shared<trace_api_rpc_plugin_impl>(common);
445 rpc->plugin_initialize(options);
446}
447
449 handle_sighup(); // setup logging
450
451 my->plugin_startup();
452 rpc->plugin_startup();
453}
454
456 my->plugin_shutdown();
457 rpc->plugin_shutdown();
458 fc_ilog( _log, "exit shutdown");
459}
460
464
467
470
471void trace_api_rpc_plugin::set_program_options(appbase::options_description& cli, appbase::options_description& cfg) {
474}
475
476void trace_api_rpc_plugin::plugin_initialize(const appbase::variables_map& options) {
477 auto common = std::make_shared<trace_api_common_impl>();
478 common->plugin_initialize(options);
479
480 rpc = std::make_shared<trace_api_rpc_plugin_impl>(common);
481 rpc->plugin_initialize(options);
482}
483
485 rpc->plugin_startup();
486}
487
489 rpc->plugin_shutdown();
490}
491
495
496}
const mie::Vuint & p
Definition bn.cpp:27
std::string name
#define SYS_THROW(exc_type, FORMAT,...)
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
abstract_plugin * find_plugin(const string &name) const
abstract_plugin & get_plugin(const string &name) const
bfs::path data_dir() const
Get data directory.
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_detail_string(log_level ll=log_level::all) const
static variant from_string(const string &utf8_str, const parse_type ptype=parse_type::legacy_parser, uint32_t max_depth=DEFAULT_MAX_RECURSION_DEPTH)
Definition json.cpp:442
provides information about where and when a log message was generated.
aggregates a message along with the context and associated meta-information.
static void update(const fc::string &name, logger &log)
Definition logger.cpp:92
void log(log_message m)
Definition logger.cpp:62
bool is_enabled(log_level e) const
Definition logger.cpp:58
static constexpr microseconds maximum()
Definition time.hpp:14
static time_point now()
Definition time.cpp:14
static constexpr time_point maximum()
Definition time.hpp:46
re-thrown whenever an unhandled exception is caught.Any exceptions thrown by 3rd party libraries that...
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
Definition variant.hpp:191
static void handle_exception(const char *api_name, const char *call_name, const string &body, url_response_callback cb)
void plugin_initialize(const appbase::variables_map &options)
virtual void set_program_options(appbase::options_description &cli, appbase::options_description &cfg) override
void plugin_initialize(const appbase::variables_map &options)
virtual void set_program_options(appbase::options_description &cli, appbase::options_description &cfg) override
#define FC_CHECK_DEADLINE(DEADLINE,...)
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
#define fc_ilog(LOGGER, FORMAT,...)
Definition logger.hpp:83
#define ilog(FORMAT,...)
Definition logger.hpp:118
#define elog(FORMAT,...)
Definition logger.hpp:130
#define fc_dlog(LOGGER, FORMAT,...)
Definition logger.hpp:77
application & app()
@ std_exception_code
Definition exception.hpp:32
checksum_type transaction_id_type
Definition types.hpp:236
std::shared_ptr< block_state > block_state_ptr
std::pair< std::string, std::string > parse_kv_pairs(const std::string &input)
chain::abi_def abi_def_from_file(const std::string &file_name, const fc::path &data_dir)
std::tuple< const std::exception_ptr &, char const *, uint64_t, char const * > exception_with_context
Definition common.hpp:40
std::optional< std::tuple< data_log_entry, bool > > get_block_t
Definition common.hpp:49
std::optional< uint32_t > get_block_n
Definition common.hpp:51
std::function< void(int, fc::variant)> url_response_callback
A callback function provided to a URL handler to allow it to specify the HTTP response code and body.
const fc::string logger_name("net_plugin_impl")
fc::logger _log
unsigned int uint32_t
Definition stdint.h:126
signed int int32_t
Definition stdint.h:123
Immutable except for fc::from_variant.
Definition name.hpp:43
Structure used to create JSON error responses.
Definition trace.hpp:93
static constexpr int32_t manual_slice_file_value
boost::filesystem::path trace_dir
std::optional< uint32_t > minimum_irreversible_history_blocks
static void set_program_options(appbase::options_description &cli, appbase::options_description &cfg)
static constexpr uint32_t compression_seek_point_stride
void plugin_initialize(const appbase::variables_map &options)
std::shared_ptr< store_provider > store
std::optional< uint32_t > minimum_uncompressed_irreversible_history_blocks
void plugin_initialize(const appbase::variables_map &options)
std::optional< scoped_connection > applied_transaction_connection
std::optional< scoped_connection > block_start_connection
static void set_program_options(appbase::options_description &cli, appbase::options_description &cfg)
std::optional< scoped_connection > irreversible_block_connection
std::shared_ptr< trace_api_common_impl > common
std::optional< scoped_connection > accepted_block_connection
trace_api_plugin_impl(const std::shared_ptr< trace_api_common_impl > &common)
std::shared_ptr< chain_extraction_t > extraction
static void set_program_options(appbase::options_description &cli, appbase::options_description &cfg)
fc::time_point calc_deadline(const fc::microseconds &max_serialization_time)
trace_api_rpc_plugin_impl(const std::shared_ptr< trace_api_common_impl > &common)
std::shared_ptr< trace_api_common_impl > common
std::shared_ptr< request_handler_t > req_handler
void plugin_initialize(const appbase::variables_map &options)
void cli()