14#include <fc/io/json.hpp>
16#include <fc/scoped_exit.hpp>
18#include <boost/asio.hpp>
19#include <boost/date_time/posix_time/posix_time.hpp>
21#include <iostream>
22#include <algorithm>
23#include <boost/algorithm/string.hpp>
24#include <boost/range/adaptor/map.hpp>
25#include <boost/function_output_iterator.hpp>
26#include <boost/multi_index_container.hpp>
27#include <boost/multi_index/member.hpp>
28#include <boost/multi_index/hashed_index.hpp>
29#include <boost/multi_index/ordered_index.hpp>
30#include <boost/signals2/connection.hpp>
32namespace bmi = boost::multi_index;
33using bmi::indexed_by;
34using bmi::ordered_non_unique;
35using bmi::member;
36using bmi::tag;
37using bmi::hashed_unique;
39using boost::multi_index_container;
41using std::string;
42using std::vector;
43using std::deque;
44using boost::signals2::scoped_connection;
46#undef FC_LOG_AND_DROP
47#define LOG_AND_DROP() \
48 catch ( const guard_exception& e ) { \
49 chain_plugin::handle_guard_exception(e); \
50 } catch ( const std::bad_alloc& ) { \
51 chain_plugin::handle_bad_alloc(); \
52 } catch ( boost::interprocess::bad_alloc& ) { \
53 chain_plugin::handle_db_exhaustion(); \
54 } catch( fc::exception& er ) { \
55 wlog( "${details}", ("details",er.to_detail_string()) ); \
56 } catch( const std::exception& e ) { \
57 fc::exception fce( \
58 FC_LOG_MESSAGE( warn, "std::exception: ${what}: ",("what",e.what()) ), \
59 fc::std_exception_code,\
60 BOOST_CORE_TYPEID(e).name(), \
61 e.what() ) ; \
62 wlog( "${details}", ("details",fce.to_detail_string()) ); \
63 } catch( ... ) { \
64 fc::unhandled_exception e( \
65 FC_LOG_MESSAGE( warn, "unknown: ", ), \
66 std::current_exception() ); \
67 wlog( "${details}", ("details",e.to_detail_string()) ); \
68 }
70const std::string logger_name("producer_plugin");
73const std::string trx_successful_trace_logger_name("transaction_success_tracing");
76const std::string trx_failed_trace_logger_name("transaction_failure_tracing");
79const std::string trx_trace_success_logger_name("transaction_trace_success");
82const std::string trx_trace_failure_logger_name("transaction_trace_failure");
85const std::string trx_logger_name("transaction");
88namespace sysio {
90static appbase::abstract_plugin& _producer_plugin = app().register_plugin<producer_plugin>();
92using namespace sysio::chain;
93using namespace sysio::chain::plugin_interface;
95namespace {
96 bool exception_is_exhausted(const fc::exception& e) {
97 auto code = e.code();
98 return (code == block_cpu_usage_exceeded::code_value) ||
99 (code == block_net_usage_exceeded::code_value) ||
100 (code == deadline_exception::code_value);
101 }
109struct by_id;
110struct by_expiry;
112using transaction_id_with_expiry_index = multi_index_container<
114 indexed_by<
115 hashed_unique<tag<by_id>, BOOST_MULTI_INDEX_MEMBER(transaction_id_with_expiry, transaction_id_type, trx_id)>,
116 ordered_non_unique<tag<by_expiry>, BOOST_MULTI_INDEX_MEMBER(transaction_id_with_expiry, fc::time_point, expiry)>
117 >
120struct by_height;
137 static bfs::path get_final_path(const block_id_type& block_id, const bfs::path& snapshots_dir) {
138 return snapshots_dir / fc::format_string("snapshot-${id}.bin", fc::mutable_variant_object()("id", block_id));
139 }
141 static bfs::path get_pending_path(const block_id_type& block_id, const bfs::path& snapshots_dir) {
142 return snapshots_dir / fc::format_string(".pending-snapshot-${id}.bin", fc::mutable_variant_object()("id", block_id));
143 }
145 static bfs::path get_temp_path(const block_id_type& block_id, const bfs::path& snapshots_dir) {
146 return snapshots_dir / fc::format_string(".incomplete-snapshot-${id}.bin", fc::mutable_variant_object()("id", block_id));
147 }
150 auto block_ptr = chain.fetch_block_by_id( block_id );
151 auto in_chain = (bool)block_ptr;
152 boost::system::error_code ec;
154 if (!in_chain) {
155 bfs::remove(bfs::path(pending_path), ec);
156 SYS_THROW(snapshot_finalization_exception,
157 "Snapshotted block was forked out of the chain. ID: ${block_id}",
158 ("block_id", block_id));
159 }
161 bfs::rename(bfs::path(pending_path), bfs::path(final_path), ec);
162 SYS_ASSERT(!ec, snapshot_finalization_exception,
163 "Unable to finalize valid snapshot of block number ${bn}: [code: ${ec}] ${message}",
164 ("bn", get_height())
165 ("ec", ec.value())
166 ("message", ec.message()));
168 return {block_id, block_ptr->block_num(), block_ptr->timestamp, chain_snapshot_header::current_version, final_path};
169 }
173 std::string pending_path;
174 std::string final_path;
177using pending_snapshot_index = multi_index_container<
179 indexed_by<
180 hashed_unique<tag<by_id>, BOOST_MULTI_INDEX_MEMBER(pending_snapshot, block_id_type, block_id)>,
181 ordered_non_unique<tag<by_height>, BOOST_MULTI_INDEX_CONST_MEM_FUN( pending_snapshot, uint32_t, get_height)>
182 >
186 producing,
190namespace {
192// track multiple failures on unapplied transactions
193class account_failures {
196 //lifetime of sb must outlive account_failures
197 explicit account_failures( const sysio::subjective_billing& sb )
199 {
200 }
202 void set_max_failures_per_account( uint32_t max_failures ) {
203 max_failures_per_account = max_failures;
204 }
206 void add( const account_name& n, int64_t exception_code ) {
207 auto& fa = failed_accounts[n];
208 ++fa.num_failures;
209 fa.add( n, exception_code );
210 }
212 // return true if exceeds max_failures_per_account and should be dropped
213 bool failure_limit( const account_name& n ) {
214 auto fitr = failed_accounts.find( n );
215 bool is_whitelisted = subjective_billing.is_account_disabled( n );
216 if( !is_whitelisted && fitr != failed_accounts.end() && fitr->second.num_failures >= max_failures_per_account ) {
217 ++fitr->second.num_failures;
218 return true;
219 }
220 return false;
221 }
223 void report() const {
225 auto now = fc::time_point::now();
226 for( const auto& e : failed_accounts ) {
227 std::string reason;
228 if( e.second.is_deadline() ) reason += "deadline";
229 if( e.second.is_tx_cpu_usage() ) {
230 if( !reason.empty() ) reason += ", ";
231 reason += "tx_cpu_usage";
232 }
233 if( e.second.is_sysio_assert() ) {
234 if( !reason.empty() ) reason += ", ";
235 reason += "assert";
236 }
237 if( e.second.is_other() ) {
238 if( !reason.empty() ) reason += ", ";
239 reason += "other";
240 }
241 fc_dlog( _log, "Failed ${n} trxs, account: ${a}, sub bill: ${b}us, reason: ${r}",
242 ("n", e.second.num_failures)("b", subjective_billing.get_subjective_bill(e.first, now))
243 ("a", e.first)("r", reason) );
244 }
245 }
246 }
248 void clear() {
249 failed_accounts.clear();
250 }
253 struct account_failure {
254 enum class ex_fields : uint8_t {
255 ex_deadline_exception = 1,
256 ex_tx_cpu_usage_exceeded = 2,
257 ex_sysio_assert_exception = 4,
258 ex_other_exception = 8
259 };
261 void add( const account_name& n, int64_t exception_code ) {
262 if( exception_code == tx_cpu_usage_exceeded::code_value ) {
263 ex_flags = set_field( ex_flags, ex_fields::ex_tx_cpu_usage_exceeded );
264 } else if( exception_code == deadline_exception::code_value ) {
265 ex_flags = set_field( ex_flags, ex_fields::ex_deadline_exception );
266 } else if( exception_code == sysio_assert_message_exception::code_value ||
267 exception_code == sysio_assert_code_exception::code_value ) {
268 ex_flags = set_field( ex_flags, ex_fields::ex_sysio_assert_exception );
269 } else {
270 ex_flags = set_field( ex_flags, ex_fields::ex_other_exception );
271 fc_dlog( _log, "Failed trx, account: ${a}, reason: ${r}",
272 ("a", n)("r", exception_code) );
273 }
274 }
276 bool is_deadline() const { return has_field( ex_flags, ex_fields::ex_deadline_exception ); }
277 bool is_tx_cpu_usage() const { return has_field( ex_flags, ex_fields::ex_tx_cpu_usage_exceeded ); }
278 bool is_sysio_assert() const { return has_field( ex_flags, ex_fields::ex_sysio_assert_exception ); }
279 bool is_other() const { return has_field( ex_flags, ex_fields::ex_other_exception ); }
283 };
285 std::map<account_name, account_failure> failed_accounts;
286 uint32_t max_failures_per_account = 3;
290} // anonymous namespace
292class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin_impl> {
293 public:
294 producer_plugin_impl(boost::asio::io_service& io)
295 :_timer(io)
296 ,_transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
297 {
298 }
300 std::optional<fc::time_point> calculate_next_block_time(const account_name& producer_name, const block_timestamp_type& current_block_time) const;
303 void produce_block();
304 bool maybe_produce_block();
305 bool block_is_exhausted() const;
306 bool remove_expired_trxs( const fc::time_point& deadline );
307 bool remove_expired_blacklisted_trxs( const fc::time_point& deadline );
308 bool process_unapplied_trxs( const fc::time_point& deadline );
309 void process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit );
310 bool process_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit );
312 boost::program_options::variables_map _options;
314 bool _pause_production = false;
317 std::map<chain::public_key_type, signature_provider_type> _signature_providers;
318 std::set<chain::account_name> _producers;
319 boost::asio::deadline_timer _timer;
320 using producer_watermark = std::pair<uint32_t, block_timestamp_type>;
321 std::map<chain::account_name, producer_watermark> _producer_watermarks;
324 std::optional<named_thread_pool> _thread_pool;
326 std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
327 std::atomic<bool> _received_block{false}; // modified by net_plugin thread pool and app thread
340 std::vector<chain::digest_type> _protocol_features_to_activate;
341 bool _protocol_features_signaled = false; // to mark whether it has been signaled in start_block
347 incoming::methods::block_sync::method_type::handle _incoming_block_sync_provider;
348 incoming::methods::transaction_async::method_type::handle _incoming_transaction_async_provider;
355 std::optional<scoped_connection> _accepted_block_connection;
356 std::optional<scoped_connection> _accepted_block_header_connection;
357 std::optional<scoped_connection> _irreversible_block_connection;
359 /*
361 * Boost timers can be in a state where a handler has not yet executed but is not abortable.
362 * As this method needs to mutate state handlers depend on for proper functioning to maintain
363 * invariants for other code (namely accepting incoming transactions in a nearly full block)
364 * the handlers capture a corelation ID at the time they are set. When they are executed
365 * they must check that correlation_id against the global ordinal. If it does not match that
366 * implies that this method has been called with the handler in the state where it should be
367 * cancelled but wasn't able to be.
368 */
371 // keep a expected ratio between defer txn and incoming txn
372 double _incoming_defer_ratio = 1.0; // 1:1
374 // path to write the snapshots to
375 bfs::path _snapshots_dir;
377 void consider_new_watermark( account_name producer, uint32_t block_num, block_timestamp_type timestamp) {
378 auto itr = _producer_watermarks.find( producer );
379 if( itr != _producer_watermarks.end() ) {
380 itr->second.first = std::max( itr->second.first, block_num );
381 itr->second.second = std::max( itr->second.second, timestamp );
382 } else if( _producers.count( producer ) > 0 ) {
383 _producer_watermarks.emplace( producer, std::make_pair(block_num, timestamp) );
384 }
385 }
387 std::optional<producer_watermark> get_watermark( account_name producer ) const {
388 auto itr = _producer_watermarks.find( producer );
390 if( itr == _producer_watermarks.end() ) return {};
392 return itr->second;
393 }
395 void on_block( const block_state_ptr& bsp ) {
396 auto before = _unapplied_transactions.size();
399 fc_dlog( _log, "Removed applied transactions before: ${before}, after: ${after}",
400 ("before", before)("after", _unapplied_transactions.size()) );
401 }
403 void on_block_header( const block_state_ptr& bsp ) {
404 consider_new_watermark( bsp->header.producer, bsp->block_num, bsp->block->timestamp );
405 }
408 _irreversible_block_time = lib->timestamp.to_time_point();
409 const chain::controller& chain = chain_plug->chain();
411 // promote any pending snapshots
412 auto& snapshots_by_height = _pending_snapshot_index.get<by_height>();
413 uint32_t lib_height = lib->block_num();
415 while (!snapshots_by_height.empty() && snapshots_by_height.begin()->get_height() <= lib_height) {
416 const auto& pending = snapshots_by_height.begin();
417 auto next = pending->next;
419 try {
420 next(pending->finalize(chain));
421 } CATCH_AND_CALL(next);
423 snapshots_by_height.erase(snapshots_by_height.begin());
424 }
425 }
427 void abort_block() {
428 auto& chain = chain_plug->chain();
430 _unapplied_transactions.add_aborted( chain.abort_block() );
432 }
434 bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id, const block_state_ptr& bsp) {
435 auto& chain = chain_plug->chain();
437 fc_wlog( _log, "dropped incoming block #${num} id: ${id}",
438 ("num", block->block_num())("id", block_id ? (*block_id).str() : "UNKNOWN") );
439 return false;
440 }
442 // start a new speculative block, speculative start_block may have been interrupted
443 auto ensure = fc::make_scoped_exit([this](){
445 });
447 const auto& id = block_id ? *block_id : block->calculate_id();
448 auto blk_num = block->block_num();
450 fc_dlog(_log, "received incoming block ${n} ${id}", ("n", blk_num)("id", id));
452 SYS_ASSERT( block->timestamp < (fc::time_point::now() + fc::seconds( 7 )), block_from_the_future,
453 "received a block from the future, ignoring it: ${id}", ("id", id) );
455 /* de-dupe here... no point in aborting block if we already know the block */
456 auto existing = chain.fetch_block_by_id( id );
457 if( existing ) { return false; }
459 // start processing of block
460 std::future<block_state_ptr> bsf;
461 if( !bsp ) {
462 bsf = chain.create_block_state_future( id, block );
463 }
465 // abort the pending block
466 abort_block();
468 // push the new block
469 auto handle_error = [&](const auto& e)
470 {
471 elog((e.to_detail_string()));
473 throw;
474 };
476 try {
477 const block_state_ptr& bspr = bsp ? bsp : bsf.get();
478 chain.push_block( bspr, [this]( const branch_type& forked_branch ) {
479 _unapplied_transactions.add_forked( forked_branch );
480 }, [this]( const transaction_id_type& id ) {
481 return _unapplied_transactions.get_trx( id );
482 } );
483 } catch ( const guard_exception& e ) {
485 return false;
486 } catch ( const std::bad_alloc& ) {
488 } catch ( boost::interprocess::bad_alloc& ) {
490 } catch ( const fork_database_exception& e ) {
491 elog("Cannot recover from ${e}. Shutting down.", ("e", e.to_detail_string()));
492 appbase::app().quit();
493 } catch( const fc::exception& e ) {
494 handle_error(e);
495 } catch (const std::exception& e) {
497 }
499 const auto& hbs = chain.head_block_state();
500 if( hbs->header.timestamp.next().to_time_point() >= fc::time_point::now() ) {
501 _production_enabled = true;
502 }
504 if( fc::time_point::now() - block->timestamp < fc::minutes(5) || (blk_num % 1000 == 0) ) {
505 ilog("Received block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, conf: ${confs}, latency: ${latency} ms]",
506 ("p",block->producer)("id",id.str().substr(8,16))("n",blk_num)("t",block->timestamp)
507 ("count",block->transactions.size())("lib",chain.last_irreversible_block_num())
508 ("confs", block->confirmed)("latency", (fc::time_point::now() - block->timestamp).count()/1000 ) );
509 if( chain.get_read_mode() != db_read_mode::IRREVERSIBLE && hbs->id != id && hbs->block != nullptr ) { // not applied to head
510 ilog("Block not applied to head ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, dpos: ${dpos}, conf: ${confs}, latency: ${latency} ms]",
511 ("p",hbs->block->producer)("id",hbs->id.str().substr(8,16))("n",hbs->block_num)("t",hbs->block->timestamp)
512 ("count",hbs->block->transactions.size())("dpos", hbs->dpos_irreversible_blocknum)
513 ("confs", hbs->block->confirmed)("latency", (fc::time_point::now() - hbs->block->timestamp).count()/1000 ) );
514 }
515 }
517 return true;
518 }
522 // abort the pending block
526 }
529 bool persist_until_expired,
530 bool read_only,
531 bool return_failure_traces,
534 const auto max_trx_time_ms = _max_transaction_time_ms.load();
535 fc::microseconds max_trx_cpu_usage = max_trx_time_ms < 0 ? fc::microseconds::maximum() : fc::milliseconds( max_trx_time_ms );
537 auto future = transaction_metadata::start_recover_keys( trx, _thread_pool->get_executor(),
538 chain.get_chain_id(), fc::microseconds( max_trx_cpu_usage ),
539 read_only ? transaction_metadata::trx_type::read_only : transaction_metadata::trx_type::input,
542 boost::asio::post(_thread_pool->get_executor(), [self = this, future{std::move(future)}, persist_until_expired, return_failure_traces,
543 next{std::move(next)}, trx]() mutable {
544 if( future.valid() ) {
545 future.wait();
546 app().post( priority::low, [self, future{std::move(future)}, persist_until_expired, next{std::move( next )}, trx{std::move(trx)}, return_failure_traces]() mutable {
547 auto exception_handler = [self, &next, trx{std::move(trx)}](fc::exception_ptr ex) {
548 fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${txid}, auth: ${a} : ${why} ",
549 ("txid", trx->id())("a",trx->get_transaction().first_authorizer())("why",ex->what()));
550 next(ex);
552 fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${entire_trx}",
553 ("entire_trx", self->chain_plug->get_log_trx(trx->get_transaction())));
554 fc_dlog(_trx_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${trx}",
555 ("trx", self->chain_plug->get_log_trx(trx->get_transaction())));
556 };
557 try {
558 auto result = future.get();
559 if( !self->process_incoming_transaction_async( result, persist_until_expired, return_failure_traces, next) ) {
560 if( self->_pending_block_mode == pending_block_mode::producing ) {
561 self->schedule_maybe_produce_block( true );
562 } else {
563 self->restart_speculative_block();
564 }
565 }
566 } CATCH_AND_CALL(exception_handler);
567 } );
568 }
569 });
570 }
573 bool persist_until_expired,
574 bool return_failure_traces,
576 bool exhausted = false;
577 chain::controller& chain = chain_plug->chain();
579 auto send_response = [this, &trx, &chain, &next](const std::variant<fc::exception_ptr, transaction_trace_ptr>& response) {
580 next(response);
582 auto get_trace = [&](const std::variant<fc::exception_ptr, transaction_trace_ptr>& response) -> fc::variant {
583 if (std::holds_alternative<fc::exception_ptr>(response)) {
584 return fc::variant{std::get<fc::exception_ptr>(response)};
585 } else {
586 return chain_plug->get_log_trx_trace( std::get<transaction_trace_ptr>(response) );
587 }
589 };
591 fc::exception_ptr except_ptr; // rejected
592 if (std::holds_alternative<fc::exception_ptr>(response)) {
593 except_ptr = std::get<fc::exception_ptr>(response);
594 } else if (std::get<transaction_trace_ptr>(response)->except) {
595 except_ptr = std::get<transaction_trace_ptr>(response)->except->dynamic_copy_exception();
596 }
598 if (!trx->read_only) {
599 _transaction_ack_channel.publish(priority::low, std::pair<fc::exception_ptr, packed_transaction_ptr>(except_ptr, trx->packed_trx()));
600 }
602 if (except_ptr) {
603 if (_pending_block_mode == pending_block_mode::producing) {
604 fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is REJECTING tx: ${txid}, auth: ${a} : ${why} ",
605 ("block_num", chain.head_block_num() + 1)
606 ("prod", get_pending_block_producer())
607 ("txid", trx->id())
608 ("a", trx->packed_trx()->get_transaction().first_authorizer())
609 ("why", except_ptr->what()));
610 fc_dlog(_trx_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is REJECTING tx: ${trx}",
611 ("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
612 ("trx", chain_plug->get_log_trx(trx->packed_trx()->get_transaction())));
613 fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is REJECTING tx: ${entire_trace}",
614 ("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
615 ("entire_trace", get_trace(response)));
616 } else {
617 fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${txid}, auth: ${a} : ${why} ",
618 ("txid", trx->id())
619 ("a", trx->packed_trx()->get_transaction().first_authorizer())
620 ("why", except_ptr->what()));
621 fc_dlog(_trx_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${trx} ",
622 ("trx", chain_plug->get_log_trx(trx->packed_trx()->get_transaction())));
623 fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${entire_trace} ",
624 ("entire_trace", get_trace(response)));
625 }
626 } else {
627 if (_pending_block_mode == pending_block_mode::producing) {
628 fc_dlog(_trx_successful_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING tx: ${txid}, auth: ${a}, cpu: ${cpu}",
629 ("block_num", chain.head_block_num() + 1)
630 ("prod", get_pending_block_producer())
631 ("txid", trx->id())
632 ("a", trx->packed_trx()->get_transaction().first_authorizer())
633 ("cpu", trx->billed_cpu_time_us));
634 fc_dlog(_trx_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING tx: ${trx}",
635 ("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
636 ("trx", chain_plug->get_log_trx(trx->packed_trx()->get_transaction())));
637 fc_dlog(_trx_trace_success_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING tx: ${entire_trace}",
638 ("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
639 ("entire_trace", get_trace(response)));
640 } else {
641 fc_dlog(_trx_successful_trace_log, "[TRX_TRACE] Speculative execution is ACCEPTING tx: ${txid}, auth: ${a}, cpu: ${cpu}",
642 ("txid", trx->id())
643 ("a", trx->packed_trx()->get_transaction().first_authorizer())
644 ("cpu", trx->billed_cpu_time_us));
645 fc_dlog(_trx_trace_success_log, "[TRX_TRACE] Speculative execution is ACCEPTING tx: ${entire_trace}",
646 ("entire_trace", get_trace(response)));
647 fc_dlog(_trx_log, "[TRX_TRACE] Speculative execution is ACCEPTING tx: ${trx}",
648 ("trx", chain_plug->get_log_trx(trx->packed_trx()->get_transaction())));
649 }
650 }
651 };
653 try {
654 const auto& id = trx->id();
657 const fc::time_point expire = trx->packed_trx()->expiration();
658 if( expire < bt ) {
659 send_response( std::static_pointer_cast<fc::exception>(
660 std::make_shared<expired_tx_exception>(
661 FC_LOG_MESSAGE( error, "expired transaction ${id}, expiration ${e}, block time ${bt}",
662 ("id", id)("e", expire)( "bt", bt )))));
663 return true;
664 }
666 if( chain.is_known_unexpired_transaction( id )) {
667 send_response( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
668 FC_LOG_MESSAGE( error, "duplicate transaction ${id}", ("id", id)))) );
669 return true;
670 }
672 if( !chain.is_building_block()) {
673 _unapplied_transactions.add_incoming( trx, persist_until_expired, return_failure_traces, next );
674 return true;
675 }
677 auto first_auth = trx->packed_trx()->get_transaction().first_authorizer();
678 if( _account_fails.failure_limit( first_auth ) ) {
679 send_response( std::static_pointer_cast<fc::exception>( std::make_shared<tx_cpu_usage_exceeded>(
680 FC_LOG_MESSAGE( error, "transaction ${id} exceeded failure limit for account ${a}",
681 ("id", trx->id())("a", first_auth) ) ) ) );
682 return true;
683 }
685 auto start = fc::time_point::now();
686 fc::microseconds max_trx_time = fc::milliseconds( _max_transaction_time_ms.load() );
687 if( max_trx_time.count() < 0 ) max_trx_time = fc::microseconds::maximum();
688 const auto block_deadline = calculate_block_deadline( chain.pending_block_time() );
690 bool disable_subjective_billing = ( _pending_block_mode == pending_block_mode::producing )
691 || ( persist_until_expired && _disable_subjective_api_billing )
692 || ( !persist_until_expired && _disable_subjective_p2p_billing )
693 || trx->read_only;
695 int64_t sub_bill = 0;
696 if( !disable_subjective_billing )
697 sub_bill = _subjective_billing.get_subjective_bill( first_auth, fc::time_point::now() );
699 auto prev_billed_cpu_time_us = trx->billed_cpu_time_us;
700 auto trace = chain.push_transaction( trx, block_deadline, max_trx_time, prev_billed_cpu_time_us, false, sub_bill );
701 fc_dlog( _trx_failed_trace_log, "Subjective bill for ${a}: ${b} elapsed ${t}us", ("a",first_auth)("b",sub_bill)("t",trace->elapsed));
702 if( trace->except ) {
703 if( exception_is_exhausted( *trace->except ) ) {
704 _unapplied_transactions.add_incoming( trx, persist_until_expired, return_failure_traces, next );
705 if( _pending_block_mode == pending_block_mode::producing ) {
706 fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
707 ("block_num", chain.head_block_num() + 1)
708 ("prod", get_pending_block_producer())
709 ("txid", trx->id()));
710 } else {
711 fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING",
712 ("txid", trx->id()));
713 }
714 exhausted = block_is_exhausted();
715 } else {
716 if (!disable_subjective_billing)
717 _subjective_billing.subjective_bill_failure( first_auth, trace->elapsed, fc::time_point::now() );
719 if( _pending_block_mode == pending_block_mode::producing ) {
720 auto failure_code = trace->except->code();
721 if( failure_code != tx_duplicate::code_value ) {
722 // this failed our configured maximum transaction time, we don't want to replay it
723 fc_dlog( _log, "Failed ${c} trx, prev billed: ${p}us, ran: ${r}us, id: ${id}",
724 ("c", trace->except->code())( "p", prev_billed_cpu_time_us )
725 ( "r", fc::time_point::now() - start )( "id", trx->id() ) );
726 _account_fails.add( first_auth, failure_code );
727 }
728 }
729 if( return_failure_traces ) {
730 send_response( trace );
731 } else {
732 auto e_ptr = trace->except->dynamic_copy_exception();
733 send_response( e_ptr );
734 }
735 }
736 } else {
737 if( persist_until_expired && !_disable_persist_until_expired ) {
738 // if this trx didnt fail/soft-fail and the persist flag is set, store its ID so that we can
739 // ensure its applied to all future speculative blocks as well.
740 // No need to subjective bill since it will be re-applied
741 _unapplied_transactions.add_persisted( trx );
742 } else {
743 // if db_read_mode SPECULATIVE then trx is in the pending block and not immediately reverted
744 if (!disable_subjective_billing)
745 _subjective_billing.subjective_bill( trx->id(), expire, first_auth, trace->elapsed,
747 }
748 send_response( trace );
749 }
751 } catch ( const guard_exception& e ) {
753 } catch ( boost::interprocess::bad_alloc& ) {
755 } catch ( std::bad_alloc& ) {
757 } CATCH_AND_CALL(send_response);
759 return !exhausted;
760 }
764 auto now = fc::time_point::now();
765 if (now < _irreversible_block_time) {
766 return fc::microseconds(0);
767 } else {
768 return now - _irreversible_block_time;
769 }
770 }
773 auto& chain = chain_plug->chain();
774 if (chain.is_building_block()) {
775 return chain.pending_block_producer();
776 } else {
777 return {};
778 }
779 }
782 return !_production_enabled || _pause_production || (_max_irreversible_block_age_us.count() >= 0 && get_irreversible_block_age() >= _max_irreversible_block_age_us);
783 }
786 succeeded,
787 failed,
788 waiting_for_block,
789 waiting_for_production,
790 exhausted
791 };
793 inline bool should_interrupt_start_block( const fc::time_point& deadline ) const;
794 start_block_result start_block();
796 fc::time_point calculate_pending_block_time() const;
797 fc::time_point calculate_block_deadline( const fc::time_point& ) const;
798 void schedule_delayed_production_loop(const std::weak_ptr<producer_plugin_impl>& weak_this, std::optional<fc::time_point> wake_up_time);
799 std::optional<fc::time_point> calculate_producer_wake_up_time( const block_timestamp_type& ref_block_time ) const;
805 std::cerr << "\n"
806 "*******************************\n"
807 "* *\n"
808 "* ------ NEW CHAIN ------ *\n"
809 "* - Welcome to SYSIO! - *\n"
810 "* ----------------------- *\n"
811 "* *\n"
812 "*******************************\n"
813 "\n";
815 if( db.head_block_state()->header.timestamp.to_time_point() < (fc::time_point::now() - fc::milliseconds(200 * config::block_interval_ms)))
816 {
817 std::cerr << "Your genesis seems to have an old timestamp\n"
818 "Please consider using the --genesis-timestamp option to give your genesis a recent timestamp\n"
819 "\n"
820 ;
821 }
822 return;
826 : my(new producer_plugin_impl(app().get_io_service())){
827 }
832 boost::program_options::options_description& command_line_options,
833 boost::program_options::options_description& config_file_options)
835 auto default_priv_key = private_key_type::regenerate<fc::ecc::private_key_shim>(fc::sha256::hash(std::string("nathan")));
836 auto private_key_default = std::make_pair(default_priv_key.get_public_key(), default_priv_key );
838 boost::program_options::options_description producer_options;
840 producer_options.add_options()
841 ("enable-stale-production,e", boost::program_options::bool_switch()->notifier([this](bool e){my->_production_enabled = e;}), "Enable block production, even if the chain is stale.")
842 ("pause-on-startup,x", boost::program_options::bool_switch()->notifier([this](bool p){my->_pause_production = p;}), "Start this node in a state where production is paused")
843 ("max-transaction-time", bpo::value<int32_t>()->default_value(30),
844 "Limits the maximum time (in milliseconds) that is allowed a pushed transaction's code to execute before being considered invalid")
845 ("max-irreversible-block-age", bpo::value<int32_t>()->default_value( -1 ),
846 "Limits the maximum age (in seconds) of the DPOS Irreversible Block for a chain this node will produce blocks on (use negative value to indicate unlimited)")
847 ("producer-name,p", boost::program_options::value<vector<string>>()->composing()->multitoken(),
848 "ID of producer controlled by this node (e.g. inita; may specify multiple times)")
849 ("private-key", boost::program_options::value<vector<string>>()->composing()->multitoken(),
850 "(DEPRECATED - Use signature-provider instead) Tuple of [public key, WIF private key] (may specify multiple times)")
851 ("signature-provider", boost::program_options::value<vector<string>>()->composing()->multitoken()->default_value(
852 {default_priv_key.get_public_key().to_string() + "=KEY:" + default_priv_key.to_string()},
853 default_priv_key.get_public_key().to_string() + "=KEY:" + default_priv_key.to_string()),
854 "Key=Value pairs in the form <public-key>=<provider-spec>\n"
855 "Where:\n"
856 " <public-key> \tis a string form of a vaild SYSIO public key\n\n"
857 " <provider-spec> \tis a string in the form <provider-type>:<data>\n\n"
858 " <provider-type> \tis KEY, or KSYSD\n\n"
859 " KEY:<data> \tis a string form of a valid SYSIO private key which maps to the provided public key\n\n"
860 " KSYSD:<data> \tis the URL where kiod is available and the approptiate wallet(s) are unlocked")
861 ("kiod-provider-timeout", boost::program_options::value<int32_t>()->default_value(5),
862 "Limits the maximum time (in milliseconds) that is allowed for sending blocks to a kiod provider for signing")
863 ("greylist-account", boost::program_options::value<vector<string>>()->composing()->multitoken(),
864 "account that can not access to extended CPU/NET virtual resources")
865 ("greylist-limit", boost::program_options::value<uint32_t>()->default_value(1000),
866 "Limit (between 1 and 1000) on the multiple that CPU/NET virtual resources can extend during low usage (only enforced subjectively; use 1000 to not enforce any limit)")
867 ("produce-time-offset-us", boost::program_options::value<int32_t>()->default_value(0),
868 "Offset of non last block producing time in microseconds. Valid range 0 .. -block_time_interval.")
869 ("last-block-time-offset-us", boost::program_options::value<int32_t>()->default_value(-200000),
870 "Offset of last block producing time in microseconds. Valid range 0 .. -block_time_interval.")
871 ("cpu-effort-percent", bpo::value<uint32_t>()->default_value(config::default_block_cpu_effort_pct / config::percent_1),
872 "Percentage of cpu block production time used to produce block. Whole number percentages, e.g. 80 for 80%")
873 ("last-block-cpu-effort-percent", bpo::value<uint32_t>()->default_value(config::default_block_cpu_effort_pct / config::percent_1),
874 "Percentage of cpu block production time used to produce last block. Whole number percentages, e.g. 80 for 80%")
875 ("max-block-cpu-usage-threshold-us", bpo::value<uint32_t>()->default_value( 5000 ),
876 "Threshold of CPU block production to consider block full; when within threshold of max-block-cpu-usage block can be produced immediately")
877 ("max-block-net-usage-threshold-bytes", bpo::value<uint32_t>()->default_value( 1024 ),
878 "Threshold of NET block production to consider block full; when within threshold of max-block-net-usage block can be produced immediately")
879 ("max-scheduled-transaction-time-per-block-ms", boost::program_options::value<int32_t>()->default_value(100),
880 "Maximum wall-clock time, in milliseconds, spent retiring scheduled transactions in any block before returning to normal transaction processing.")
881 ("subjective-cpu-leeway-us", boost::program_options::value<int32_t>()->default_value( config::default_subjective_cpu_leeway_us ),
882 "Time in microseconds allowed for a transaction that starts with insufficient CPU quota to complete and cover its CPU usage.")
883 ("subjective-account-max-failures", boost::program_options::value<uint32_t>()->default_value(3),
884 "Sets the maximum amount of failures that are allowed for a given account per block.")
885 ("subjective-account-decay-time-minutes", bpo::value<uint32_t>()->default_value( config::account_cpu_usage_average_window_ms / 1000 / 60 ),
886 "Sets the time to return full subjective cpu for accounts")
887 ("incoming-defer-ratio", bpo::value<double>()->default_value(1.0),
888 "ratio between incoming transactions and deferred transactions when both are queued for execution")
889 ("incoming-transaction-queue-size-mb", bpo::value<uint16_t>()->default_value( 1024 ),
890 "Maximum size (in MiB) of the incoming transaction queue. Exceeding this value will subjectively drop transaction with resource exhaustion.")
891 ("disable-api-persisted-trx", bpo::bool_switch()->default_value(false),
892 "Disable the re-apply of API transactions.")
893 ("disable-subjective-billing", bpo::value<bool>()->default_value(true),
894 "Disable subjective CPU billing for API/P2P transactions")
895 ("disable-subjective-account-billing", boost::program_options::value<vector<string>>()->composing()->multitoken(),
896 "Account which is excluded from subjective CPU billing")
897 ("disable-subjective-p2p-billing", bpo::value<bool>()->default_value(true),
898 "Disable subjective CPU billing for P2P transactions")
899 ("disable-subjective-api-billing", bpo::value<bool>()->default_value(true),
900 "Disable subjective CPU billing for API transactions")
901 ("producer-threads", bpo::value<uint16_t>()->default_value(config::default_controller_thread_pool_size),
902 "Number of worker threads in producer thread pool")
903 ("snapshots-dir", bpo::value<bfs::path>()->default_value("snapshots"),
904 "the location of the snapshots directory (absolute path or relative to application data dir)")
905 ;
906 config_file_options.add(producer_options);
911 auto private_key_itr = my->_signature_providers.find(key);
912 if(private_key_itr != my->_signature_providers.end())
913 return true;
914 return false;
919 return my->_subjective_billing.get_subjective_bill( first_auth, now );
924 if(key != chain::public_key_type()) {
925 auto private_key_itr = my->_signature_providers.find(key);
926 SYS_ASSERT(private_key_itr != my->_signature_providers.end(), producer_priv_key_not_found, "Local producer has no private key in config.ini corresponding to public key ${key}", ("key", key));
928 return private_key_itr->second(digest);
929 }
930 else {
931 return chain::signature_type();
932 }
935template<typename T>
936T dejsonify(const string& s) {
937 return fc::json::from_string(s).as<T>();
940#define LOAD_VALUE_SET(options, op_name, container) \
941if( options.count(op_name) ) { \
942 const std::vector<std::string>& ops = options[op_name].as<std::vector<std::string>>(); \
943 for( const auto& v : ops ) { \
944 container.emplace( sysio::chain::name( v ) ); \
945 } \
949make_key_signature_provider(const private_key_type& key) {
950 return [key]( const chain::digest_type& digest ) {
951 return key.sign(digest);
952 };
956make_kiod_signature_provider(const std::shared_ptr<producer_plugin_impl>& impl, const string& url_str, const public_key_type pubkey) {
957 fc::url kiod_url;
958 if(boost::algorithm::starts_with(url_str, "unix://"))
959 //send the entire string after unix:// to http_plugin. It'll auto-detect which part
960 // is the unix socket path, and which part is the url to hit on the server
961 kiod_url = fc::url("unix", url_str.substr(7), ostring(), ostring(), ostring(), ostring(), ovariant_object(), std::optional<uint16_t>());
962 else
963 kiod_url = fc::url(url_str);
964 std::weak_ptr<producer_plugin_impl> weak_impl = impl;
966 return [weak_impl, kiod_url, pubkey]( const chain::digest_type& digest ) {
967 auto impl = weak_impl.lock();
968 if (impl) {
970 fc::to_variant(std::make_pair(digest, pubkey), params);
971 auto deadline = impl->_kiod_provider_timeout_us.count() >= 0 ? fc::time_point::now() + impl->_kiod_provider_timeout_us : fc::time_point::maximum();
972 return app().get_plugin<http_client_plugin>().get_client().post_sync(kiod_url, params, deadline).as<chain::signature_type>();
973 } else {
974 return signature_type();
975 }
976 };
979void producer_plugin::plugin_initialize(const boost::program_options::variables_map& options)
980{ try {
981 my->chain_plug = app().find_plugin<chain_plugin>();
982 SYS_ASSERT( my->chain_plug, plugin_config_exception, "chain_plugin not found" );
983 my->_options = &options;
984 LOAD_VALUE_SET(options, "producer-name", my->_producers)
986 chain::controller& chain = my->chain_plug->chain();
988 if( options.count("private-key") )
989 {
990 const std::vector<std::string> key_id_to_wif_pair_strings = options["private-key"].as<std::vector<std::string>>();
991 for (const std::string& key_id_to_wif_pair_string : key_id_to_wif_pair_strings)
992 {
993 try {
994 auto key_id_to_wif_pair = dejsonify<std::pair<public_key_type, private_key_type>>(key_id_to_wif_pair_string);
995 my->_signature_providers[key_id_to_wif_pair.first] = make_key_signature_provider(key_id_to_wif_pair.second);
996 auto blanked_privkey = std::string(key_id_to_wif_pair.second.to_string().size(), '*' );
997 wlog("\"private-key\" is DEPRECATED, use \"signature-provider=${pub}=KEY:${priv}\"", ("pub",key_id_to_wif_pair.first)("priv", blanked_privkey));
998 } catch ( const std::exception& e ) {
999 elog("Malformed private key pair");
1000 }
1001 }
1002 }
1004 if( options.count("signature-provider") ) {
1005 const std::vector<std::string> key_spec_pairs = options["signature-provider"].as<std::vector<std::string>>();
1006 for (const auto& key_spec_pair : key_spec_pairs) {
1007 try {
1008 auto delim = key_spec_pair.find("=");
1009 SYS_ASSERT(delim != std::string::npos, plugin_config_exception, "Missing \"=\" in the key spec pair");
1010 auto pub_key_str = key_spec_pair.substr(0, delim);
1011 auto spec_str = key_spec_pair.substr(delim + 1);
1013 auto spec_delim = spec_str.find(":");
1014 SYS_ASSERT(spec_delim != std::string::npos, plugin_config_exception, "Missing \":\" in the key spec pair");
1015 auto spec_type_str = spec_str.substr(0, spec_delim);
1016 auto spec_data = spec_str.substr(spec_delim + 1);
1018 auto pubkey = public_key_type(pub_key_str);
1020 if (spec_type_str == "KEY") {
1021 my->_signature_providers[pubkey] = make_key_signature_provider(private_key_type(spec_data));
1022 } else if (spec_type_str == "KSYSD") {
1023 my->_signature_providers[pubkey] = make_kiod_signature_provider(my, spec_data, pubkey);
1024 }
1026 } catch (...) {
1027 elog("Malformed signature provider: \"${val}\", ignoring!", ("val", key_spec_pair));
1028 }
1029 }
1030 }
1032 my->_kiod_provider_timeout_us = fc::milliseconds(options.at("kiod-provider-timeout").as<int32_t>());
1034 my->_account_fails.set_max_failures_per_account( options.at("subjective-account-max-failures").as<uint32_t>() );
1036 my->_produce_time_offset_us = options.at("produce-time-offset-us").as<int32_t>();
1037 SYS_ASSERT( my->_produce_time_offset_us <= 0 && my->_produce_time_offset_us >= -config::block_interval_us, plugin_config_exception,
1038 "produce-time-offset-us ${o} must be 0 .. -${bi}", ("bi", config::block_interval_us)("o", my->_produce_time_offset_us) );
1040 my->_last_block_time_offset_us = options.at("last-block-time-offset-us").as<int32_t>();
1041 SYS_ASSERT( my->_last_block_time_offset_us <= 0 && my->_last_block_time_offset_us >= -config::block_interval_us, plugin_config_exception,
1042 "last-block-time-offset-us ${o} must be 0 .. -${bi}", ("bi", config::block_interval_us)("o", my->_last_block_time_offset_us) );
1044 uint32_t cpu_effort_pct = options.at("cpu-effort-percent").as<uint32_t>();
1045 SYS_ASSERT( cpu_effort_pct >= 0 && cpu_effort_pct <= 100, plugin_config_exception,
1046 "cpu-effort-percent ${pct} must be 0 - 100", ("pct", cpu_effort_pct) );
1047 cpu_effort_pct *= config::percent_1;
1048 int32_t cpu_effort_offset_us =
1049 -SYS_PERCENT( config::block_interval_us, chain::config::percent_100 - cpu_effort_pct );
1051 uint32_t last_block_cpu_effort_pct = options.at("last-block-cpu-effort-percent").as<uint32_t>();
1052 SYS_ASSERT( last_block_cpu_effort_pct >= 0 && last_block_cpu_effort_pct <= 100, plugin_config_exception,
1053 "last-block-cpu-effort-percent ${pct} must be 0 - 100", ("pct", last_block_cpu_effort_pct) );
1054 last_block_cpu_effort_pct *= config::percent_1;
1055 int32_t last_block_cpu_effort_offset_us =
1056 -SYS_PERCENT( config::block_interval_us, chain::config::percent_100 - last_block_cpu_effort_pct );
1058 my->_produce_time_offset_us = std::min( my->_produce_time_offset_us, cpu_effort_offset_us );
1059 my->_last_block_time_offset_us = std::min( my->_last_block_time_offset_us, last_block_cpu_effort_offset_us );
1061 my->_max_block_cpu_usage_threshold_us = options.at( "max-block-cpu-usage-threshold-us" ).as<uint32_t>();
1062 SYS_ASSERT( my->_max_block_cpu_usage_threshold_us < config::block_interval_us, plugin_config_exception,
1063 "max-block-cpu-usage-threshold-us ${t} must be 0 .. ${bi}", ("bi", config::block_interval_us)("t", my->_max_block_cpu_usage_threshold_us) );
1065 my->_max_block_net_usage_threshold_bytes = options.at( "max-block-net-usage-threshold-bytes" ).as<uint32_t>();
1067 my->_max_scheduled_transaction_time_per_block_ms = options.at("max-scheduled-transaction-time-per-block-ms").as<int32_t>();
1069 if( options.at( "subjective-cpu-leeway-us" ).as<int32_t>() != config::default_subjective_cpu_leeway_us ) {
1070 chain.set_subjective_cpu_leeway( fc::microseconds( options.at( "subjective-cpu-leeway-us" ).as<int32_t>() ) );
1071 }
1073 fc::microseconds subjective_account_decay_time = fc::minutes(options.at( "subjective-account-decay-time-minutes" ).as<uint32_t>());
1074 SYS_ASSERT( subjective_account_decay_time.count() > 0, plugin_config_exception,
1075 "subjective-account-decay-time-minutes ${dt} must be greater than 0", ("dt", subjective_account_decay_time.to_seconds() / 60));
1076 my->_subjective_billing.set_expired_accumulator_average_window( subjective_account_decay_time );
1078 my->_max_transaction_time_ms = options.at("max-transaction-time").as<int32_t>();
1080 my->_max_irreversible_block_age_us = fc::seconds(options.at("max-irreversible-block-age").as<int32_t>());
1082 auto max_incoming_transaction_queue_size = options.at("incoming-transaction-queue-size-mb").as<uint16_t>() * 1024*1024;
1084 SYS_ASSERT( max_incoming_transaction_queue_size > 0, plugin_config_exception,
1085 "incoming-transaction-queue-size-mb ${mb} must be greater than 0", ("mb", max_incoming_transaction_queue_size) );
1087 my->_unapplied_transactions.set_max_transaction_queue_size( max_incoming_transaction_queue_size );
1089 my->_incoming_defer_ratio = options.at("incoming-defer-ratio").as<double>();
1091 my->_disable_persist_until_expired = options.at("disable-api-persisted-trx").as<bool>();
1092 bool disable_subjective_billing = options.at("disable-subjective-billing").as<bool>();
1093 my->_disable_subjective_p2p_billing = options.at("disable-subjective-p2p-billing").as<bool>();
1094 my->_disable_subjective_api_billing = options.at("disable-subjective-api-billing").as<bool>();
1095 dlog( "disable-subjective-billing: ${s}, disable-subjective-p2p-billing: ${p2p}, disable-subjective-api-billing: ${api}",
1096 ("s", disable_subjective_billing)("p2p", my->_disable_subjective_p2p_billing)("api", my->_disable_subjective_api_billing) );
1097 if( !disable_subjective_billing ) {
1098 my->_disable_subjective_p2p_billing = my->_disable_subjective_api_billing = false;
1099 } else if( !my->_disable_subjective_p2p_billing || !my->_disable_subjective_api_billing ) {
1100 disable_subjective_billing = false;
1101 }
1102 if( disable_subjective_billing ) {
1103 my->_subjective_billing.disable();
1104 ilog( "Subjective CPU billing disabled" );
1105 } else if( !my->_disable_subjective_p2p_billing && !my->_disable_subjective_api_billing ) {
1106 ilog( "Subjective CPU billing enabled" );
1107 } else {
1108 if( my->_disable_subjective_p2p_billing ) ilog( "Subjective CPU billing of P2P trxs disabled " );
1109 if( my->_disable_subjective_api_billing ) ilog( "Subjective CPU billing of API trxs disabled " );
1110 }
1112 auto thread_pool_size = options.at( "producer-threads" ).as<uint16_t>();
1113 SYS_ASSERT( thread_pool_size > 0, plugin_config_exception,
1114 "producer-threads ${num} must be greater than 0", ("num", thread_pool_size));
1115 my->_thread_pool.emplace( "prod", thread_pool_size );
1117 if( options.count( "snapshots-dir" )) {
1118 auto sd = options.at( "snapshots-dir" ).as<bfs::path>();
1119 if( sd.is_relative()) {
1120 my->_snapshots_dir = app().data_dir() / sd;
1121 if (!fc::exists(my->_snapshots_dir)) {
1122 fc::create_directories(my->_snapshots_dir);
1123 }
1124 } else {
1125 my->_snapshots_dir = sd;
1126 }
1128 SYS_ASSERT( fc::is_directory(my->_snapshots_dir), snapshot_directory_not_found_exception,
1129 "No such directory '${dir}'", ("dir", my->_snapshots_dir.generic_string()) );
1131 if (auto resmon_plugin = app().find_plugin<resource_monitor_plugin>()) {
1132 resmon_plugin->monitor_directory(my->_snapshots_dir);
1133 }
1134 }
1136 my->_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider(
1137 [this](const signed_block_ptr& block, const std::optional<block_id_type>& block_id, const block_state_ptr& bsp) {
1138 return my->on_incoming_block(block, block_id, bsp);
1139 });
1141 my->_incoming_transaction_async_provider = app().get_method<incoming::methods::transaction_async>().register_provider(
1142 [this](const packed_transaction_ptr& trx, bool persist_until_expired, bool read_only, bool return_failure_traces, next_function<transaction_trace_ptr> next) -> void {
1143 return my->on_incoming_transaction_async(trx, persist_until_expired, read_only, return_failure_traces, next );
1144 });
1146 if (options.count("greylist-account")) {
1147 std::vector<std::string> greylist = options["greylist-account"].as<std::vector<std::string>>();
1148 greylist_params param;
1149 for (auto &a : greylist) {
1150 param.accounts.push_back(account_name(a));
1151 }
1152 add_greylist_accounts(param);
1153 }
1155 {
1156 uint32_t greylist_limit = options.at("greylist-limit").as<uint32_t>();
1157 chain.set_greylist_limit( greylist_limit );
1158 }
1160 if( options.count("disable-subjective-account-billing") ) {
1161 std::vector<std::string> accounts = options["disable-subjective-account-billing"].as<std::vector<std::string>>();
1162 for( const auto& a : accounts ) {
1163 my->_subjective_billing.disable_account( account_name(a) );
1164 }
1165 }
1170{ try {
1171 handle_sighup(); // Sets loggers
1173 try {
1174 ilog("producer plugin: plugin_startup() begin");
1176 chain::controller& chain = my->chain_plug->chain();
1177 SYS_ASSERT( my->_producers.empty() || chain.get_read_mode() == chain::db_read_mode::SPECULATIVE, plugin_config_exception,
1178 "node cannot have any producer-name configured because block production is impossible when read_mode is not \"speculative\"" );
1180 SYS_ASSERT( my->_producers.empty() || chain.get_validation_mode() == chain::validation_mode::FULL, plugin_config_exception,
1181 "node cannot have any producer-name configured because block production is not safe when validation_mode is not \"full\"" );
1183 SYS_ASSERT( my->_producers.empty() || my->chain_plug->accept_transactions(), plugin_config_exception,
1184 "node cannot have any producer-name configured because no block production is possible with no [api|p2p]-accepted-transactions" );
1186 my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } ));
1187 my->_accepted_block_header_connection.emplace(chain.accepted_block_header.connect( [this]( const auto& bsp ){ my->on_block_header( bsp ); } ));
1188 my->_irreversible_block_connection.emplace(chain.irreversible_block.connect( [this]( const auto& bsp ){ my->on_irreversible_block( bsp->block ); } ));
1190 const auto lib_num = chain.last_irreversible_block_num();
1191 const auto lib = chain.fetch_block_by_number(lib_num);
1192 if (lib) {
1193 my->on_irreversible_block(lib);
1194 } else {
1195 my->_irreversible_block_time = fc::time_point::maximum();
1196 }
1198 if (!my->_producers.empty()) {
1199 ilog("Launching block production for ${n} producers at ${time}.", ("n", my->_producers.size())("time",fc::time_point::now()));
1201 if (my->_production_enabled) {
1202 if (chain.head_block_num() == 0) {
1203 new_chain_banner(chain);
1204 }
1205 }
1206 }
1208 my->schedule_production_loop();
1210 ilog("producer plugin: plugin_startup() end");
1211 } catch( ... ) {
1212 // always call plugin_shutdown, even on exception
1214 throw;
1215 }
1219 try {
1220 my->_timer.cancel();
1221 } catch ( const std::bad_alloc& ) {
1223 } catch ( const boost::interprocess::bad_alloc& ) {
1225 } catch(const fc::exception& e) {
1226 edump((e.to_detail_string()));
1227 } catch(const std::exception& e) {
1229 }
1231 if( my->_thread_pool ) {
1232 my->_thread_pool->stop();
1233 }
1235 app().post( 0, [me = my](){} ); // keep my pointer alive until queue is drained
1236 fc_ilog(_log, "exit shutdown");
1249 fc_ilog(_log, "Producer paused.");
1250 my->_pause_production = true;
1254 my->_pause_production = false;
1255 // it is possible that we are only speculating because of this policy which we have now changed
1256 // re-evaluate that now
1257 //
1258 if (my->_pending_block_mode == pending_block_mode::speculating) {
1259 my->abort_block();
1260 fc_ilog(_log, "Producer resumed. Scheduling production.");
1261 my->schedule_production_loop();
1262 } else {
1263 fc_ilog(_log, "Producer resumed.");
1264 }
1268 return my->_pause_production;
1271void producer_plugin::update_runtime_options(const runtime_options& options) {
1272 chain::controller& chain = my->chain_plug->chain();
1273 bool check_speculating = false;
1275 if (options.max_transaction_time) {
1276 my->_max_transaction_time_ms = *options.max_transaction_time;
1277 }
1279 if (options.max_irreversible_block_age) {
1280 my->_max_irreversible_block_age_us = fc::seconds(*options.max_irreversible_block_age);
1281 check_speculating = true;
1282 }
1284 if (options.produce_time_offset_us) {
1285 my->_produce_time_offset_us = *options.produce_time_offset_us;
1286 }
1288 if (options.last_block_time_offset_us) {
1289 my->_last_block_time_offset_us = *options.last_block_time_offset_us;
1290 }
1292 if (options.max_scheduled_transaction_time_per_block_ms) {
1293 my->_max_scheduled_transaction_time_per_block_ms = *options.max_scheduled_transaction_time_per_block_ms;
1294 }
1296 if (options.incoming_defer_ratio) {
1297 my->_incoming_defer_ratio = *options.incoming_defer_ratio;
1298 }
1300 if (check_speculating && my->_pending_block_mode == pending_block_mode::speculating) {
1301 my->abort_block();
1302 my->schedule_production_loop();
1303 }
1305 if (options.subjective_cpu_leeway_us) {
1306 chain.set_subjective_cpu_leeway(fc::microseconds(*options.subjective_cpu_leeway_us));
1307 }
1309 if (options.greylist_limit) {
1310 chain.set_greylist_limit(*options.greylist_limit);
1311 }
1314producer_plugin::runtime_options producer_plugin::get_runtime_options() const {
1315 return {
1316 my->_max_transaction_time_ms,
1317 my->_max_irreversible_block_age_us.count() < 0 ? -1 : my->_max_irreversible_block_age_us.count() / 1'000'000,
1318 my->_produce_time_offset_us,
1319 my->_last_block_time_offset_us,
1320 my->_max_scheduled_transaction_time_per_block_ms,
1321 my->chain_plug->chain().get_subjective_cpu_leeway() ?
1322 my->chain_plug->chain().get_subjective_cpu_leeway()->count() :
1323 std::optional<int32_t>(),
1324 my->_incoming_defer_ratio,
1325 my->chain_plug->chain().get_greylist_limit()
1326 };
1330 SYS_ASSERT(params.accounts.size() > 0, chain::invalid_http_request, "At least one account is required");
1332 chain::controller& chain = my->chain_plug->chain();
1333 for (auto &acc : params.accounts) {
1334 chain.add_resource_greylist(acc);
1335 }
1339 SYS_ASSERT(params.accounts.size() > 0, chain::invalid_http_request, "At least one account is required");
1341 chain::controller& chain = my->chain_plug->chain();
1342 for (auto &acc : params.accounts) {
1343 chain.remove_resource_greylist(acc);
1344 }
1348 chain::controller& chain = my->chain_plug->chain();
1349 greylist_params result;
1350 const auto& list = chain.get_resource_greylist();
1351 result.accounts.reserve(list.size());
1352 for (auto &acc: list) {
1353 result.accounts.push_back(acc);
1354 }
1355 return result;
1359 chain::controller& chain = my->chain_plug->chain();
1360 return {
1361 chain.get_actor_whitelist(),
1362 chain.get_actor_blacklist(),
1363 chain.get_contract_whitelist(),
1364 chain.get_contract_blacklist(),
1365 chain.get_action_blacklist(),
1366 chain.get_key_blacklist()
1367 };
1371 SYS_ASSERT(params.actor_whitelist || params.actor_blacklist || params.contract_whitelist || params.contract_blacklist || params.action_blacklist || params.key_blacklist,
1372 chain::invalid_http_request,
1373 "At least one of actor_whitelist, actor_blacklist, contract_whitelist, contract_blacklist, action_blacklist, and key_blacklist is required"
1374 );
1376 chain::controller& chain = my->chain_plug->chain();
1377 if(params.actor_whitelist) chain.set_actor_whitelist(*params.actor_whitelist);
1378 if(params.actor_blacklist) chain.set_actor_blacklist(*params.actor_blacklist);
1379 if(params.contract_whitelist) chain.set_contract_whitelist(*params.contract_whitelist);
1380 if(params.contract_blacklist) chain.set_contract_blacklist(*params.contract_blacklist);
1381 if(params.action_blacklist) chain.set_action_blacklist(*params.action_blacklist);
1382 if(params.key_blacklist) chain.set_key_blacklist(*params.key_blacklist);
1386 chain::controller& chain = my->chain_plug->chain();
1388 auto reschedule = fc::make_scoped_exit([this](){
1389 my->schedule_production_loop();
1390 });
1392 if (chain.is_building_block()) {
1393 // abort the pending block
1394 my->abort_block();
1395 } else {
1396 reschedule.cancel();
1397 }
1399 return {chain.head_block_id(), chain.calculate_integrity_hash()};
1403 chain::controller& chain = my->chain_plug->chain();
1405 auto head_id = chain.head_block_id();
1406 const auto head_block_num = chain.head_block_num();
1407 const auto head_block_time = chain.head_block_time();
1408 const auto& snapshot_path = pending_snapshot::get_final_path(head_id, my->_snapshots_dir);
1409 const auto& temp_path = pending_snapshot::get_temp_path(head_id, my->_snapshots_dir);
1411 // maintain legacy exception if the snapshot exists
1412 if( fc::is_regular_file(snapshot_path) ) {
1413 auto ex = snapshot_exists_exception( FC_LOG_MESSAGE( error, "snapshot named ${name} already exists", ("name", snapshot_path.generic_string()) ) );
1414 next(ex.dynamic_copy_exception());
1415 return;
1416 }
1418 auto write_snapshot = [&]( const bfs::path& p ) -> void {
1419 auto reschedule = fc::make_scoped_exit([this](){
1420 my->schedule_production_loop();
1421 });
1423 if (chain.is_building_block()) {
1424 // abort the pending block
1425 my->abort_block();
1426 } else {
1427 reschedule.cancel();
1428 }
1430 bfs::create_directory( p.parent_path() );
1432 // create the snapshot
1433 auto snap_out = std::ofstream(p.generic_string(), (std::ios::out | std::ios::binary));
1434 auto writer = std::make_shared<ostream_snapshot_writer>(snap_out);
1435 chain.write_snapshot(writer);
1436 writer->finalize();
1437 snap_out.flush();
1438 snap_out.close();
1439 };
1441 // If in irreversible mode, create snapshot and return path to snapshot immediately.
1442 if( chain.get_read_mode() == db_read_mode::IRREVERSIBLE ) {
1443 try {
1444 write_snapshot( temp_path );
1446 boost::system::error_code ec;
1447 bfs::rename(temp_path, snapshot_path, ec);
1448 SYS_ASSERT(!ec, snapshot_finalization_exception,
1449 "Unable to finalize valid snapshot of block number ${bn}: [code: ${ec}] ${message}",
1450 ("bn", head_block_num)
1451 ("ec", ec.value())
1452 ("message", ec.message()));
1454 next( producer_plugin::snapshot_information{head_id, head_block_num, head_block_time, chain_snapshot_header::current_version, snapshot_path.generic_string()} );
1455 } CATCH_AND_CALL (next);
1456 return;
1457 }
1459 // Otherwise, the result will be returned when the snapshot becomes irreversible.
1461 // determine if this snapshot is already in-flight
1462 auto& pending_by_id = my->_pending_snapshot_index.get<by_id>();
1463 auto existing = pending_by_id.find(head_id);
1464 if( existing != pending_by_id.end() ) {
1465 // if a snapshot at this block is already pending, attach this requests handler to it
1466 pending_by_id.modify(existing, [&next]( auto& entry ){
1467 entry.next = [prev = entry.next, next](const std::variant<fc::exception_ptr, producer_plugin::snapshot_information>& res){
1468 prev(res);
1469 next(res);
1470 };
1471 });
1472 } else {
1473 const auto& pending_path = pending_snapshot::get_pending_path(head_id, my->_snapshots_dir);
1475 try {
1476 write_snapshot( temp_path ); // create a new pending snapshot
1478 boost::system::error_code ec;
1479 bfs::rename(temp_path, pending_path, ec);
1480 SYS_ASSERT(!ec, snapshot_finalization_exception,
1481 "Unable to promote temp snapshot to pending for block number ${bn}: [code: ${ec}] ${message}",
1482 ("bn", head_block_num)
1483 ("ec", ec.value())
1484 ("message", ec.message()));
1486 my->_pending_snapshot_index.emplace(head_id, next, pending_path.generic_string(), snapshot_path.generic_string());
1487 } CATCH_AND_CALL (next);
1488 }
1493 return {my->_protocol_features_to_activate};
1497 const chain::controller& chain = my->chain_plug->chain();
1498 std::set<digest_type> set_of_features_to_activate( schedule.protocol_features_to_activate.begin(),
1499 schedule.protocol_features_to_activate.end() );
1500 SYS_ASSERT( set_of_features_to_activate.size() == schedule.protocol_features_to_activate.size(),
1501 invalid_protocol_features_to_activate, "duplicate digests" );
1502 chain.validate_protocol_features( schedule.protocol_features_to_activate );
1503 const auto& pfs = chain.get_protocol_feature_manager().get_protocol_feature_set();
1504 for (auto &feature_digest : set_of_features_to_activate) {
1505 const auto& pf = pfs.get_protocol_feature(feature_digest);
1506 SYS_ASSERT( !pf.preactivation_required, protocol_feature_exception,
1507 "protocol feature requires preactivation: ${digest}",
1508 ("digest", feature_digest));
1509 }
1510 my->_protocol_features_to_activate = schedule.protocol_features_to_activate;
1511 my->_protocol_features_signaled = false;
1516 const chain::controller& chain = my->chain_plug->chain();
1517 const auto& pfs = chain.get_protocol_feature_manager().get_protocol_feature_set();
1518 const auto next_block_time = chain.head_block_time() + fc::milliseconds(config::block_interval_ms);
1520 flat_map<digest_type, bool> visited_protocol_features;
1521 visited_protocol_features.reserve( pfs.size() );
1523 std::function<bool(const protocol_feature&)> add_feature =
1524 [&results, &pfs, &params, next_block_time, &visited_protocol_features, &add_feature]
1525 ( const protocol_feature& pf ) -> bool {
1526 if( ( params.exclude_disabled || params.exclude_unactivatable ) && !pf.enabled ) return false;
1527 if( params.exclude_unactivatable && ( next_block_time < pf.earliest_allowed_activation_time ) ) return false;
1529 auto res = visited_protocol_features.emplace( pf.feature_digest, false );
1530 if( !res.second ) return res.first->second;
1532 const auto original_size = results.size();
1533 for( const auto& dependency : pf.dependencies ) {
1534 if( !add_feature( pfs.get_protocol_feature( dependency ) ) ) {
1535 results.resize( original_size );
1536 return false;
1537 }
1538 }
1540 res.first->second = true;
1541 results.emplace_back( pf.to_variant(true) );
1542 return true;
1543 };
1545 for( const auto& pf : pfs ) {
1546 add_feature( pf );
1547 }
1549 return results;
1555 const auto& db = my->chain_plug->chain().db();
1557 const auto& idx = db.get_index<chain::account_ram_correction_index, chain::by_name>();
1558 account_name lower_bound_value{ std::numeric_limits<uint64_t>::lowest() };
1559 account_name upper_bound_value{ std::numeric_limits<uint64_t>::max() };
1561 if( params.lower_bound ) {
1562 lower_bound_value = *params.lower_bound;
1563 }
1565 if( params.upper_bound ) {
1566 upper_bound_value = *params.upper_bound;
1567 }
1569 if( upper_bound_value < lower_bound_value )
1570 return result;
1572 auto walk_range = [&]( auto itr, auto end_itr ) {
1573 for( unsigned int count = 0;
1574 count < params.limit && itr != end_itr;
1575 ++itr )
1576 {
1577 result.rows.push_back( fc::variant( *itr ) );
1578 ++count;
1579 }
1580 if( itr != end_itr ) {
1581 result.more = itr->name;
1582 }
1583 };
1585 auto lower = idx.lower_bound( lower_bound_value );
1586 auto upper = idx.upper_bound( upper_bound_value );
1587 if( params.reverse ) {
1588 walk_range( boost::make_reverse_iterator(upper), boost::make_reverse_iterator(lower) );
1589 } else {
1590 walk_range( lower, upper );
1591 }
1593 return result;
1596std::optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const account_name& producer_name, const block_timestamp_type& current_block_time) const {
1598 const auto& hbs = chain.head_block_state();
1599 const auto& active_schedule = hbs->active_schedule.producers;
1601 // determine if this producer is in the active schedule and if so, where
1602 auto itr = std::find_if(active_schedule.begin(), active_schedule.end(), [&](const auto& asp){ return asp.producer_name == producer_name; });
1603 if (itr == active_schedule.end()) {
1604 // this producer is not in the active producer set
1605 return std::optional<fc::time_point>();
1606 }
1608 size_t producer_index = itr - active_schedule.begin();
1609 uint32_t minimum_offset = 1; // must at least be the "next" block
1611 // account for a watermark in the future which is disqualifying this producer for now
1612 // this is conservative assuming no blocks are dropped. If blocks are dropped the watermark will
1613 // disqualify this producer for longer but it is assumed they will wake up, determine that they
1614 // are disqualified for longer due to skipped blocks and re-caculate their next block with better
1615 // information then
1616 auto current_watermark = get_watermark(producer_name);
1617 if (current_watermark) {
1618 const auto watermark = *current_watermark;
1619 auto block_num = chain.head_block_state()->block_num;
1620 if (chain.is_building_block()) {
1621 ++block_num;
1622 }
1623 if (watermark.first > block_num) {
1624 // if I have a watermark block number then I need to wait until after that watermark
1625 minimum_offset = watermark.first - block_num + 1;
1626 }
1627 if (watermark.second > current_block_time) {
1628 // if I have a watermark block timestamp then I need to wait until after that watermark timestamp
1629 minimum_offset = std::max(minimum_offset, watermark.second.slot - current_block_time.slot + 1);
1630 }
1631 }
1633 // this producers next opportuity to produce is the next time its slot arrives after or at the calculated minimum
1634 uint32_t minimum_slot = current_block_time.slot + minimum_offset;
1635 size_t minimum_slot_producer_index = (minimum_slot % (active_schedule.size() * config::producer_repetitions)) / config::producer_repetitions;
1636 if ( producer_index == minimum_slot_producer_index ) {
1637 // this is the producer for the minimum slot, go with that
1638 return block_timestamp_type(minimum_slot).to_time_point();
1639 } else {
1640 // calculate how many rounds are between the minimum producer and the producer in question
1641 size_t producer_distance = producer_index - minimum_slot_producer_index;
1642 // check for unsigned underflow
1643 if (producer_distance > producer_index) {
1644 producer_distance += active_schedule.size();
1645 }
1647 // align the minimum slot to the first of its set of reps
1648 uint32_t first_minimum_producer_slot = minimum_slot - (minimum_slot % config::producer_repetitions);
1650 // offset the aligned minimum to the *earliest* next set of slots for this producer
1651 uint32_t next_block_slot = first_minimum_producer_slot + (producer_distance * config::producer_repetitions);
1652 return block_timestamp_type(next_block_slot).to_time_point();
1653 }
1657 const chain::controller& chain = chain_plug->chain();
1658 const fc::time_point now = fc::time_point::now();
1659 const fc::time_point base = std::max<fc::time_point>(now, chain.head_block_time());
1660 const int64_t min_time_to_next_block = (config::block_interval_us) - (base.time_since_epoch().count() % (config::block_interval_us) );
1661 fc::time_point block_time = base + fc::microseconds(min_time_to_next_block);
1662 return block_time;
1667 bool last_block = ((block_timestamp_type( block_time ).slot % config::producer_repetitions) == config::producer_repetitions - 1);
1668 return block_time + fc::microseconds(last_block ? _last_block_time_offset_us : _produce_time_offset_us);
1669 } else {
1670 return block_time + fc::microseconds(_produce_time_offset_us);
1671 }
1676 return deadline <= fc::time_point::now();
1677 }
1678 // if we can produce then honor deadline so production starts on time
1679 return (!_producers.empty() && deadline <= fc::time_point::now()) || _received_block;
1688 const auto& hbs = chain.head_block_state();
1690 if( chain.get_terminate_at_block() > 0 && chain.get_terminate_at_block() < chain.head_block_num() ) {
1691 ilog("Reached configured maximum block ${num}; terminating", ("num", chain.get_terminate_at_block()));
1692 app().quit();
1694 }
1696 const fc::time_point now = fc::time_point::now();
1697 const fc::time_point block_time = calculate_pending_block_time();
1698 const fc::time_point preprocess_deadline = calculate_block_deadline(block_time);
1700 const pending_block_mode previous_pending_mode = _pending_block_mode;
1703 // Not our turn
1704 const auto& scheduled_producer = hbs->get_scheduled_producer(block_time);
1706 const auto current_watermark = get_watermark(scheduled_producer.producer_name);
1708 size_t num_relevant_signatures = 0;
1709 scheduled_producer.for_each_key([&](const public_key_type& key){
1710 const auto& iter = _signature_providers.find(key);
1711 if(iter != _signature_providers.end()) {
1712 num_relevant_signatures++;
1713 }
1714 });
1716 auto irreversible_block_age = get_irreversible_block_age();
1718 // If the next block production opportunity is in the present or future, we're synced.
1719 if( !_production_enabled ) {
1721 } else if( _producers.find(scheduled_producer.producer_name) == _producers.end()) {
1723 } else if (num_relevant_signatures == 0) {
1724 elog("Not producing block because I don't have any private keys relevant to authority: ${authority}", ("authority", scheduled_producer.authority));
1726 } else if ( _pause_production ) {
1727 elog("Not producing block because production is explicitly paused");
1729 } else if ( _max_irreversible_block_age_us.count() >= 0 && irreversible_block_age >= _max_irreversible_block_age_us ) {
1730 elog("Not producing block because the irreversible block is too old [age:${age}s, max:${max}s]", ("age", irreversible_block_age.count() / 1'000'000)( "max", _max_irreversible_block_age_us.count() / 1'000'000 ));
1732 }
1735 // determine if our watermark excludes us from producing at this point
1736 if (current_watermark) {
1737 const block_timestamp_type block_timestamp{block_time};
1738 if (current_watermark->first > hbs->block_num) {
1739 elog("Not producing block because \"${producer}\" signed a block at a higher block number (${watermark}) than the current fork's head (${head_block_num})",
1740 ("producer", scheduled_producer.producer_name)
1741 ("watermark", current_watermark->first)
1742 ("head_block_num", hbs->block_num));
1744 } else if (current_watermark->second >= block_timestamp) {
1745 elog("Not producing block because \"${producer}\" signed a block at the next block time or later (${watermark}) than the pending block time (${block_timestamp})",
1746 ("producer", scheduled_producer.producer_name)
1747 ("watermark", current_watermark->second)
1748 ("block_timestamp", block_timestamp));
1750 }
1751 }
1752 }
1755 auto head_block_age = now - chain.head_block_time();
1756 if (head_block_age > fc::seconds(5))
1758 }
1761 const auto start_block_time = block_time - fc::microseconds( config::block_interval_us );
1762 if( now < start_block_time ) {
1763 fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", hbs->block_num + 1)("bt", block_time) );
1764 // start_block_time instead of block_time because schedule_delayed_production_loop calculates next block time from given time
1765 schedule_delayed_production_loop(weak_from_this(), calculate_producer_wake_up_time(start_block_time));
1767 }
1768 } else if (previous_pending_mode == pending_block_mode::producing) {
1769 // just produced our last block of our round
1770 const auto start_block_time = block_time - fc::microseconds( config::block_interval_us );
1771 fc_dlog(_log, "Not starting speculative block until ${bt}", ("bt", start_block_time) );
1772 schedule_delayed_production_loop( weak_from_this(), start_block_time);
1774 }
1776 fc_dlog(_log, "Starting block #${n} at ${time} producer ${p}",
1777 ("n", hbs->block_num + 1)("time", now)("p", scheduled_producer.producer_name));
1779 try {
1780 uint16_t blocks_to_confirm = 0;
1783 // determine how many blocks this producer can confirm
1784 // 1) if it is not a producer from this node, assume no confirmations (we will discard this block anyway)
1785 // 2) if it is a producer on this node that has never produced, the conservative approach is to assume no
1786 // confirmations to make sure we don't double sign after a crash TODO: make these watermarks durable?
1787 // 3) if it is a producer on this node where this node knows the last block it produced, safely set it -UNLESS-
1788 // 4) the producer on this node's last watermark is higher (meaning on a different fork)
1789 if (current_watermark) {
1790 auto watermark_bn = current_watermark->first;
1791 if (watermark_bn < hbs->block_num) {
1792 blocks_to_confirm = (uint16_t)(std::min<uint32_t>(std::numeric_limits<uint16_t>::max(), (uint32_t)(hbs->block_num - watermark_bn)));
1793 }
1794 }
1796 // can not confirm irreversible blocks
1797 blocks_to_confirm = (uint16_t)(std::min<uint32_t>(blocks_to_confirm, (uint32_t)(hbs->block_num - hbs->dpos_irreversible_blocknum)));
1798 }
1800 abort_block();
1802 auto features_to_activate = chain.get_preactivated_protocol_features();
1804 bool drop_features_to_activate = false;
1805 try {
1807 } catch ( const std::bad_alloc& ) {
1809 } catch ( const boost::interprocess::bad_alloc& ) {
1811 } catch( const fc::exception& e ) {
1812 wlog( "protocol features to activate are no longer all valid: ${details}",
1813 ("details",e.to_detail_string()) );
1814 drop_features_to_activate = true;
1815 } catch( const std::exception& e ) {
1816 wlog( "protocol features to activate are no longer all valid: ${details}",
1817 ("details",fc::std_exception_wrapper::from_current_exception(e).to_detail_string()) );
1818 drop_features_to_activate = true;
1819 }
1821 if( drop_features_to_activate ) {
1823 } else {
1824 auto protocol_features_to_activate = _protocol_features_to_activate; // do a copy as pending_block might be aborted
1825 if( features_to_activate.size() > 0 ) {
1826 protocol_features_to_activate.reserve( protocol_features_to_activate.size()
1827 + features_to_activate.size() );
1828 std::set<digest_type> set_of_features_to_activate( protocol_features_to_activate.begin(),
1829 protocol_features_to_activate.end() );
1830 for( const auto& f : features_to_activate ) {
1831 auto res = set_of_features_to_activate.insert( f );
1832 if( res.second ) {
1833 protocol_features_to_activate.push_back( f );
1834 }
1835 }
1836 features_to_activate.clear();
1837 }
1838 std::swap( features_to_activate, protocol_features_to_activate );
1840 ilog( "signaling activation of the following protocol features in block ${num}: ${features_to_activate}",
1841 ("num", hbs->block_num + 1)("features_to_activate", features_to_activate) );
1842 }
1843 }
1845 chain.start_block( block_time, blocks_to_confirm, features_to_activate, preprocess_deadline );
1846 } LOG_AND_DROP();
1848 if( chain.is_building_block() ) {
1849 const auto& pending_block_signing_authority = chain.pending_block_signing_authority();
1851 if (_pending_block_mode == pending_block_mode::producing && pending_block_signing_authority != scheduled_producer.authority) {
1852 elog("Unexpected block signing authority, reverting to speculative mode! [expected: \"${expected}\", actual: \"${actual\"", ("expected", scheduled_producer.authority)("actual", pending_block_signing_authority));
1854 }
1856 try {
1857 _account_fails.clear();
1859 if( !remove_expired_trxs( preprocess_deadline ) )
1861 if( !remove_expired_blacklisted_trxs( preprocess_deadline ) )
1864 [&](){ return should_interrupt_start_block( preprocess_deadline ); } ) ) {
1866 }
1868 // limit execution of pending incoming to once per block
1869 size_t pending_incoming_process_limit = _unapplied_transactions.incoming_size();
1871 if( !process_unapplied_trxs( preprocess_deadline ) )
1875 auto scheduled_trx_deadline = preprocess_deadline;
1877 scheduled_trx_deadline = std::min<fc::time_point>(
1878 scheduled_trx_deadline,
1880 );
1881 }
1882 // may exhaust scheduled_trx_deadline but not preprocess_deadline, exhausted preprocess_deadline checked below
1883 process_scheduled_and_incoming_trxs( scheduled_trx_deadline, pending_incoming_process_limit );
1884 }
1886 if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit()
1888 if ( should_interrupt_start_block( preprocess_deadline ) || block_is_exhausted() ) {
1890 } else {
1891 if( !process_incoming_trxs( preprocess_deadline, pending_incoming_process_limit ) )
1894 }
1896 } catch ( const guard_exception& e ) {
1899 } catch ( std::bad_alloc& ) {
1901 } catch ( boost::interprocess::bad_alloc& ) {
1903 }
1905 }
1913 auto pending_block_time = chain.pending_block_time();
1915 // remove all expired transactions
1916 size_t num_expired_persistent = 0;
1917 size_t num_expired_other = 0;
1918 size_t orig_count = _unapplied_transactions.size();
1919 bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block( deadline ); },
1920 [chain_plug = chain_plug, &num_expired_persistent, &num_expired_other, pbm = _pending_block_mode,
1921 &chain, has_producers = !_producers.empty()]( const packed_transaction_ptr& packed_trx_ptr, trx_enum_type trx_type ) {
1922 if( trx_type == trx_enum_type::persisted ) {
1923 if( pbm == pending_block_mode::producing ) {
1925 "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${txid}",
1926 ("block_num", chain.head_block_num() + 1)("txid", packed_trx_ptr->id())
1927 ("prod", chain.is_building_block() ? chain.pending_block_producer() : name()) );
1928 fc_dlog(_trx_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${trx}",
1929 ("block_num", chain.head_block_num() + 1)
1930 ("prod", chain.is_building_block() ? chain.pending_block_producer() : name())
1931 ("trx", chain_plug->get_log_trx(packed_trx_ptr->get_transaction())));
1932 fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${entire_trx}",
1933 ("block_num", chain.head_block_num() + 1)
1934 ("prod", chain.is_building_block() ? chain.pending_block_producer() : name())
1935 ("entire_trx", chain_plug->get_log_trx(packed_trx_ptr->get_transaction())));
1937 } else {
1938 fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${txid}", ("txid", packed_trx_ptr->id()));
1940 fc_dlog(_trx_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${trx}",
1941 ("trx", chain_plug->get_log_trx(packed_trx_ptr->get_transaction())));
1942 fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${entire_trx}",
1943 ("entire_trx", chain_plug->get_log_trx(packed_trx_ptr->get_transaction())));
1944 }
1945 ++num_expired_persistent;
1946 } else {
1947 if (has_producers) {
1949 "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED : ${txid}",
1950 ("txid", packed_trx_ptr->id()));
1952 fc_dlog(_trx_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED: ${trx}",
1953 ("trx", chain_plug->get_log_trx(packed_trx_ptr->get_transaction())));
1954 fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED: ${entire_trx}",
1955 ("entire_trx", chain_plug->get_log_trx(packed_trx_ptr->get_transaction())));
1956 }
1957 ++num_expired_other;
1958 }
1959 });
1962 fc_wlog( _log, "Unable to process all expired transactions in unapplied queue before deadline, "
1963 "Persistent expired ${persistent_expired}, Other expired ${other_expired}",
1964 ("persistent_expired", num_expired_persistent)("other_expired", num_expired_other) );
1965 } else {
1966 fc_dlog( _log, "Processed ${m} expired transactions of the ${n} transactions in the unapplied queue, "
1967 "Persistent expired ${persistent_expired}, Other expired ${other_expired}",
1968 ("m", num_expired_persistent+num_expired_other)("n", orig_count)
1969 ("persistent_expired", num_expired_persistent)("other_expired", num_expired_other) );
1970 }
1972 return !exhausted;
1977 bool exhausted = false;
1978 auto& blacklist_by_expiry = _blacklisted_transactions.get<by_expiry>();
1979 if(!blacklist_by_expiry.empty()) {
1980 const chain::controller& chain = chain_plug->chain();
1981 const auto lib_time = chain.last_irreversible_block_time();
1983 int num_expired = 0;
1984 int orig_count = _blacklisted_transactions.size();
1986 while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= lib_time) {
1987 if ( should_interrupt_start_block( deadline ) ) {
1988 exhausted = true;
1989 break;
1990 }
1991 blacklist_by_expiry.erase(blacklist_by_expiry.begin());
1992 num_expired++;
1993 }
1995 fc_dlog(_log, "Processed ${n} blacklisted transactions, Expired ${expired}",
1996 ("n", orig_count)("expired", num_expired));
1997 }
1998 return !exhausted;
2003 bool exhausted = false;
2006 const auto& rl = chain.get_resource_limits_manager();
2007 int num_applied = 0, num_failed = 0, num_processed = 0;
2008 auto unapplied_trxs_size = _unapplied_transactions.size();
2009 // unapplied and persisted do not have a next method to call
2014 while( itr != end_itr ) {
2015 if( should_interrupt_start_block( deadline ) ) {
2016 exhausted = true;
2017 break;
2018 }
2020 const transaction_metadata_ptr trx = itr->trx_meta;
2021 ++num_processed;
2022 try {
2023 auto start = fc::time_point::now();
2025 auto first_auth = trx->packed_trx()->get_transaction().first_authorizer();
2026 if( _account_fails.failure_limit( first_auth ) ) {
2027 ++num_failed;
2028 if( itr->next ) {
2029 itr->next( std::make_shared<tx_cpu_usage_exceeded>(
2030 FC_LOG_MESSAGE( error, "transaction ${id} exceeded failure limit for account ${a}",
2031 ("id", trx->id())("a", first_auth) ) ) );
2032 }
2033 itr = _unapplied_transactions.erase( itr );
2034 continue;
2035 }
2038 if( max_trx_time.count() < 0 ) max_trx_time = fc::microseconds::maximum();
2040 auto prev_billed_cpu_time_us = trx->billed_cpu_time_us;
2041 if( prev_billed_cpu_time_us > 0 && !_subjective_billing.is_account_disabled( first_auth ) && !rl.is_unlimited_cpu( first_auth )) {
2042 uint64_t prev_billed_plus100_us = prev_billed_cpu_time_us + SYS_PERCENT( prev_billed_cpu_time_us, 100 * config::percent_1 );
2043 if( prev_billed_plus100_us < max_trx_time.count() ) max_trx_time = fc::microseconds( prev_billed_plus100_us );
2044 }
2046 // no subjective billing since we are producing or processing persisted trxs
2047 const int64_t sub_bill = 0;
2048 bool disable_subjective_billing = ( _pending_block_mode == pending_block_mode::producing )
2049 || ( (itr->trx_type == trx_enum_type::persisted) && _disable_subjective_api_billing )
2050 || ( !(itr->trx_type == trx_enum_type::persisted) && _disable_subjective_p2p_billing )
2051 || trx->read_only;
2053 auto trace = chain.push_transaction( trx, deadline, max_trx_time, prev_billed_cpu_time_us, false, sub_bill );
2054 fc_dlog( _trx_failed_trace_log, "Subjective unapplied bill for ${a}: ${b} prev ${t}us", ("a",first_auth)("b",prev_billed_cpu_time_us)("t",trace->elapsed));
2055 if( trace->except ) {
2056 if( exception_is_exhausted( *trace->except ) ) {
2057 if( block_is_exhausted() ) {
2058 exhausted = true;
2059 // don't erase, subjective failure so try again next time
2060 break;
2061 }
2062 } else {
2063 fc_dlog( _trx_failed_trace_log, "Subjective unapplied bill for failed ${a}: ${b} prev ${t}us", ("a",first_auth)("b",prev_billed_cpu_time_us)("t",trace->elapsed));
2064 auto failure_code = trace->except->code();
2065 if( failure_code != tx_duplicate::code_value ) {
2066 // this failed our configured maximum transaction time, we don't want to replay it
2067 fc_dlog( _log, "Failed ${c} trx, prev billed: ${p}us, ran: ${r}us, id: ${id}",
2068 ("c", trace->except->code())("p", prev_billed_cpu_time_us)
2069 ("r", fc::time_point::now() - start)("id", trx->id()) );
2070 _account_fails.add( first_auth, failure_code );
2071 if (!disable_subjective_billing)
2072 _subjective_billing.subjective_bill_failure( first_auth, trace->elapsed, fc::time_point::now() );
2073 }
2074 ++num_failed;
2075 if( itr->next ) {
2076 itr->next( trace->except->dynamic_copy_exception() );
2077 }
2078 itr = _unapplied_transactions.erase( itr );
2079 continue;
2080 }
2081 } else {
2082 fc_dlog( _trx_successful_trace_log, "Subjective unapplied bill for success ${a}: ${b} prev ${t}us", ("a",first_auth)("b",prev_billed_cpu_time_us)("t",trace->elapsed));
2083 // if db_read_mode SPECULATIVE then trx is in the pending block and not immediately reverted
2084 if (!disable_subjective_billing)
2085 _subjective_billing.subjective_bill( trx->id(), trx->packed_trx()->expiration(), first_auth, trace->elapsed,
2087 ++num_applied;
2088 if( itr->trx_type != trx_enum_type::persisted ) {
2089 if( itr->next ) itr->next( trace );
2090 itr = _unapplied_transactions.erase( itr );
2091 continue;
2092 }
2093 }
2094 } LOG_AND_DROP();
2095 ++itr;
2096 }
2098 fc_dlog( _log, "Processed ${m} of ${n} previously applied transactions, Applied ${applied}, Failed/Dropped ${failed}",
2099 ("m", num_processed)( "n", unapplied_trxs_size )("applied", num_applied)("failed", num_failed) );
2100 }
2101 return !exhausted;
2104void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit )
2106 // scheduled transactions
2107 int num_applied = 0;
2108 int num_failed = 0;
2109 int num_processed = 0;
2110 bool exhausted = false;
2111 double incoming_trx_weight = 0.0;
2113 auto& blacklist_by_id = _blacklisted_transactions.get<by_id>();
2115 time_point pending_block_time = chain.pending_block_time();
2118 const auto& sch_idx = chain.db().get_index<generated_transaction_multi_index,by_delay>();
2119 const auto scheduled_trxs_size = sch_idx.size();
2120 auto sch_itr = sch_idx.begin();
2121 while( sch_itr != sch_idx.end() ) {
2122 if( sch_itr->delay_until > pending_block_time) break; // not scheduled yet
2123 if( exhausted || deadline <= fc::time_point::now() ) {
2124 exhausted = true;
2125 break;
2126 }
2127 if( sch_itr->published >= pending_block_time ) {
2128 ++sch_itr;
2129 continue; // do not allow schedule and execute in same block
2130 }
2132 if (blacklist_by_id.find(sch_itr->trx_id) != blacklist_by_id.end()) {
2133 ++sch_itr;
2134 continue;
2135 }
2137 const transaction_id_type trx_id = sch_itr->trx_id; // make copy since reference could be invalidated
2138 const auto sch_expiration = sch_itr->expiration;
2139 auto sch_itr_next = sch_itr; // save off next since sch_itr may be invalidated by loop
2140 ++sch_itr_next;
2141 const auto next_delay_until = sch_itr_next != sch_idx.end() ? sch_itr_next->delay_until : sch_itr->delay_until;
2142 const auto next_id = sch_itr_next != sch_idx.end() ? sch_itr_next->id : sch_itr->id;
2144 num_processed++;
2146 // configurable ratio of incoming txns vs deferred txns
2147 while (incoming_trx_weight >= 1.0 && pending_incoming_process_limit && itr != end ) {
2148 if (deadline <= fc::time_point::now()) {
2149 exhausted = true;
2150 break;
2151 }
2153 --pending_incoming_process_limit;
2154 incoming_trx_weight -= 1.0;
2156 auto trx_meta = itr->trx_meta;
2157 auto next = itr->next;
2158 bool persist_until_expired = itr->trx_type == trx_enum_type::incoming_persisted;
2159 bool return_failure_trace = itr->return_failure_trace;
2160 itr = _unapplied_transactions.erase( itr );
2161 if( !process_incoming_transaction_async( trx_meta, persist_until_expired, return_failure_trace, next ) ) {
2162 exhausted = true;
2163 break;
2164 }
2165 }
2167 if (exhausted || deadline <= fc::time_point::now()) {
2168 exhausted = true;
2169 break;
2170 }
2172 try {
2174 if( max_trx_time.count() < 0 ) max_trx_time = fc::microseconds::maximum();
2176 auto trace = chain.push_scheduled_transaction(trx_id, deadline, max_trx_time, 0, false);
2177 if (trace->except) {
2178 if (exception_is_exhausted(*trace->except)) {
2179 if( block_is_exhausted() ) {
2180 exhausted = true;
2181 break;
2182 }
2183 } else {
2184 // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist
2185 _blacklisted_transactions.insert(transaction_id_with_expiry{trx_id, sch_expiration});
2186 num_failed++;
2187 }
2188 } else {
2189 num_applied++;
2190 }
2191 } LOG_AND_DROP();
2193 incoming_trx_weight += _incoming_defer_ratio;
2194 if (!pending_incoming_process_limit) incoming_trx_weight = 0.0;
2196 if( sch_itr_next == sch_idx.end() ) break;
2197 sch_itr = sch_idx.lower_bound( boost::make_tuple( next_delay_until, next_id ) );
2198 }
2200 if( scheduled_trxs_size > 0 ) {
2201 fc_dlog( _log,
2202 "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}",
2203 ( "m", num_processed )( "n", scheduled_trxs_size )( "applied", num_applied )( "failed", num_failed ) );
2204 }
2207bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline, size_t& pending_incoming_process_limit )
2209 bool exhausted = false;
2210 if( pending_incoming_process_limit ) {
2211 size_t processed = 0;
2212 fc_dlog( _log, "Processing ${n} pending transactions", ("n", pending_incoming_process_limit) );
2215 while( pending_incoming_process_limit && itr != end ) {
2216 if ( should_interrupt_start_block( deadline ) ) {
2217 exhausted = true;
2218 break;
2219 }
2220 --pending_incoming_process_limit;
2222 auto trx_meta = itr->trx_meta;
2223 auto next = itr->next;
2224 bool persist_until_expired = itr->trx_type == trx_enum_type::incoming_persisted;
2225 bool return_failure_trace = itr->return_failure_trace;
2226 itr = _unapplied_transactions.erase( itr );
2227 if( !process_incoming_transaction_async( trx_meta, persist_until_expired, return_failure_trace, next ) ) {
2228 exhausted = true;
2229 break;
2230 }
2231 ++processed;
2232 }
2233 fc_dlog( _log, "Processed ${n} pending transactions, ${p} left", ("n", processed)("p", _unapplied_transactions.incoming_size()) );
2234 }
2235 return !exhausted;
2239 const chain::controller& chain = chain_plug->chain();
2240 const auto& rl = chain.get_resource_limits_manager();
2242 const uint64_t cpu_limit = rl.get_block_cpu_limit();
2243 if( cpu_limit < _max_block_cpu_usage_threshold_us ) return true;
2244 const uint64_t net_limit = rl.get_block_net_limit();
2245 if( net_limit < _max_block_net_usage_threshold_bytes ) return true;
2246 return false;
2249// Example:
2250// --> Start block A (block time x.500) at time x.000
2251// -> start_block()
2252// --> deadline, produce block x.500 at time x.400 (assuming 80% cpu block effort)
2253// -> Idle
2254// --> Start block B (block time y.000) at time x.500
2256 _received_block = false;
2257 _timer.cancel();
2259 auto result = start_block();
2261 if (result == start_block_result::failed) {
2262 elog("Failed to start a pending block, will try again later");
2263 _timer.expires_from_now( boost::posix_time::microseconds( config::block_interval_us / 10 ));
2265 // we failed to start a block, so try again later?
2266 _timer.async_wait( app().get_priority_queue().wrap( priority::high,
2267 [weak_this = weak_from_this(), cid = ++_timer_corelation_id]( const boost::system::error_code& ec ) {
2268 auto self = weak_this.lock();
2269 if( self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id ) {
2270 self->schedule_production_loop();
2271 }
2272 } ) );
2273 } else if (result == start_block_result::waiting_for_block){
2274 if (!_producers.empty() && !production_disabled_by_policy()) {
2275 fc_dlog(_log, "Waiting till another block is received and scheduling Speculative/Production Change");
2277 } else {
2278 fc_dlog(_log, "Waiting till another block is received");
2279 // nothing to do until more blocks arrive
2280 }
2282 } else if (result == start_block_result::waiting_for_production) {
2283 // scheduled in start_block()
2290 fc_dlog(_log, "Speculative Block Created; Scheduling Speculative/Production Change");
2291 SYS_ASSERT( chain.is_building_block(), missing_pending_block_state, "speculating without pending_block_state" );
2293 } else {
2294 fc_dlog(_log, "Speculative Block Created");
2295 }
2301 // we succeeded but block may be exhausted
2302 static const boost::posix_time::ptime epoch( boost::gregorian::date( 1970, 1, 1 ) );
2303 auto deadline = calculate_block_deadline( chain.pending_block_time() );
2305 if( !exhausted && deadline > fc::time_point::now() ) {
2306 // ship this block off no later than its deadline
2307 SYS_ASSERT( chain.is_building_block(), missing_pending_block_state,
2308 "producing without pending_block_state, start_block succeeded" );
2309 _timer.expires_at( epoch + boost::posix_time::microseconds( deadline.time_since_epoch().count() ) );
2310 fc_dlog( _log, "Scheduling Block Production on Normal Block #${num} for ${time}",
2311 ("num", chain.head_block_num() + 1)( "time", deadline ) );
2312 } else {
2313 SYS_ASSERT( chain.is_building_block(), missing_pending_block_state, "producing without pending_block_state" );
2314 _timer.expires_from_now( boost::posix_time::microseconds( 0 ) );
2315 fc_dlog( _log, "Scheduling Block Production on ${desc} Block #${num} immediately",
2316 ("num", chain.head_block_num() + 1)("desc", block_is_exhausted() ? "Exhausted" : "Deadline exceeded") );
2317 }
2319 _timer.async_wait( app().get_priority_queue().wrap( priority::high,
2320 [&chain, weak_this = weak_from_this(), cid=++_timer_corelation_id](const boost::system::error_code& ec) {
2321 auto self = weak_this.lock();
2322 if( self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id ) {
2323 // pending_block_state expected, but can't assert inside async_wait
2324 auto block_num = chain.is_building_block() ? chain.head_block_num() + 1 : 0;
2325 fc_dlog( _log, "Produce block timer for ${num} running at ${time}", ("num", block_num)("time", fc::time_point::now()) );
2326 auto res = self->maybe_produce_block();
2327 fc_dlog( _log, "Producing Block #${num} returned: ${res}", ("num", block_num)( "res", res ) );
2328 }
2329 } ) );
2332std::optional<fc::time_point> producer_plugin_impl::calculate_producer_wake_up_time( const block_timestamp_type& ref_block_time ) const {
2333 // if we have any producers then we should at least set a timer for our next available slot
2334 std::optional<fc::time_point> wake_up_time;
2335 for (const auto& p : _producers) {
2336 auto next_producer_block_time = calculate_next_block_time(p, ref_block_time);
2337 if (next_producer_block_time) {
2338 auto producer_wake_up_time = *next_producer_block_time - fc::microseconds(config::block_interval_us);
2339 if (wake_up_time) {
2340 // wake up with a full block interval to the deadline
2341 if( producer_wake_up_time < *wake_up_time ) {
2342 wake_up_time = producer_wake_up_time;
2343 }
2344 } else {
2345 wake_up_time = producer_wake_up_time;
2346 }
2347 }
2348 }
2349 if( !wake_up_time ) {
2350 fc_dlog(_log, "Not Scheduling Speculative/Production, no local producers had valid wake up times");
2351 }
2353 return wake_up_time;
2356void producer_plugin_impl::schedule_delayed_production_loop(const std::weak_ptr<producer_plugin_impl>& weak_this, std::optional<fc::time_point> wake_up_time) {
2357 if (wake_up_time) {
2358 fc_dlog(_log, "Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time));
2359 static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
2360 _timer.expires_at(epoch + boost::posix_time::microseconds(wake_up_time->time_since_epoch().count()));
2361 _timer.async_wait( app().get_priority_queue().wrap( priority::high,
2362 [weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
2363 auto self = weak_this.lock();
2364 if( self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id ) {
2365 self->schedule_production_loop();
2366 }
2367 } ) );
2368 }
2373 auto reschedule = fc::make_scoped_exit([this]{
2375 });
2377 try {
2378 produce_block();
2379 return true;
2380 } LOG_AND_DROP();
2382 fc_dlog(_log, "Aborting block due to produce_block error");
2383 abort_block();
2384 return false;
2387static auto make_debug_time_logger() {
2388 auto start = fc::time_point::now();
2389 return fc::make_scoped_exit([=](){
2390 fc_dlog(_log, "Signing took ${ms}us", ("ms", fc::time_point::now() - start) );
2391 });
2394static auto maybe_make_debug_time_logger() -> std::optional<decltype(make_debug_time_logger())> {
2396 return make_debug_time_logger();
2397 } else {
2398 return {};
2399 }
2403 //ilog("produce_block ${t}", ("t", fc::time_point::now())); // for testing _produce_time_offset_us
2404 SYS_ASSERT(_pending_block_mode == pending_block_mode::producing, producer_exception, "called produce_block while not actually producing");
2406 SYS_ASSERT(chain.is_building_block(), missing_pending_block_state, "pending_block_state does not exist but it should, another plugin may have corrupted it");
2408 const auto& auth = chain.pending_block_signing_authority();
2409 std::vector<std::reference_wrapper<const signature_provider_type>> relevant_providers;
2411 relevant_providers.reserve(_signature_providers.size());
2414 const auto& iter = _signature_providers.find(key);
2415 if (iter != _signature_providers.end()) {
2416 relevant_providers.emplace_back(iter->second);
2417 }
2418 });
2420 SYS_ASSERT(relevant_providers.size() > 0, producer_priv_key_not_found, "Attempting to produce a block for which we don't have any relevant private keys");
2423 _protocol_features_to_activate.clear(); // clear _protocol_features_to_activate as it is already set in pending_block
2425 }
2430 auto& sub_chain_plug = app().get_plugin<sub_chain_plugin>();
2431 // TODO: verify sub chain is enabled, otherwise skip? or require?
2432 // Step 1: Scans transactions to find relevant transactions by comparing to a list of known contracts.
2433 auto relevant_s_transactions = sub_chain_plug.find_relevant_transactions(chain);
2434 // Only calculate if there are relevant s-transactions
2435 if (!relevant_s_transactions.empty()) {
2436 ilog("Relevant S-Transactions found: ${count}", ("count", relevant_s_transactions.size()));
2437 // Step 2: Builds array of leaves in the same order as they appear in the transaction list.
2438 // Step 3: Computes a Merkle root hash: the S-Root.
2439 checksum256_type s_root = sub_chain_plug.calculate_s_root(relevant_s_transactions);
2440 ilog("s_root calculated: ${root}", ("root", s_root.str()));
2441 // Step 4: S-Root is hashed with the previous S-ID using SHA-256
2442 // Step 5: Takes the 32 least-significant bits from the previous S-ID to get the previous S-Block number, and increment to get the new S-Block number
2443 // Step 6: Hashes the S-Root with the previous S-ID with SHA-256, then replace the 32 least significant bits with the new S-Block number to produce the new S-ID
2444 checksum256_type curr_s_id = sub_chain_plug.compute_curr_s_id(s_root);
2445 ilog("curr_s_id calculated: ${curr_s_id}", ("curr_s_id", curr_s_id.str()));
2446 // Prepare the s_header for the current block to be added to the header extension
2448 sub_chain_plug.get_contract_name(),
2449 sub_chain_plug.get_prev_s_id(),
2450 curr_s_id,
2451 s_root
2452 };
2453 ilog("S-Header prepared: ${s_header}", ("s_header", s_header.to_string()));
2455 // Set the s_root in the chain controller for the building block state
2456 // to be referenced and passed to make_block_header on finalize_block below
2457 chain.set_s_header( s_header );
2459 // Update the plugin's stored prev_s_id with the newly calculated S-ID
2460 sub_chain_plug.update_prev_s_id(curr_s_id);
2461 }
2463 //idump( (fc::time_point::now() - chain.pending_block_time()) );
2464 chain.finalize_block( [&]( const digest_type& d ) {
2465 auto debug_logger = maybe_make_debug_time_logger();
2467 sigs.reserve(relevant_providers.size());
2469 // sign with all relevant public keys
2470 for (const auto& p : relevant_providers) {
2471 sigs.emplace_back(p.get()(d));
2472 }
2473 return sigs;
2474 } );
2476 chain.commit_block();
2478 block_state_ptr new_bs = chain.head_block_state();
2480 _account_fails.report();
2481 _account_fails.clear();
2483 ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]",
2484 ("p",new_bs->header.producer)("id",new_bs->id.str().substr(8,16))
2485 ("n",new_bs->block_num)("t",new_bs->header.timestamp)
2486 ("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", new_bs->header.confirmed));
2491 my->_received_block = true;
2494void producer_plugin::log_failed_transaction(const transaction_id_type& trx_id, const packed_transaction_ptr& packed_trx_ptr, const char* reason) const {
2495 fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${txid} : ${why}",
2496 ("txid", trx_id)("why", reason));
2497 fc_dlog(_trx_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${trx}",
2498 ("entire_trx", packed_trx_ptr ? my->chain_plug->get_log_trx(packed_trx_ptr->get_transaction()) : fc::variant{trx_id}));
2499 fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${entire_trx}",
2500 ("entire_trx", packed_trx_ptr ? my->chain_plug->get_log_trx(packed_trx_ptr->get_transaction()) : fc::variant{trx_id}));
2503} // namespace sysio
