8#include <boost/multi_index_container.hpp>
9#include <boost/multi_index/hashed_index.hpp>
10#include <boost/multi_index/member.hpp>
18namespace sysio {
namespace chain {
20using namespace boost::multi_index;
31using next_func_t = std::function<void(
const std::variant<fc::exception_ptr, transaction_trace_ptr>&)>;
60 hashed_unique< tag<by_trx_id>,
61 const_mem_fun<unapplied_transaction, const transaction_id_type&, &unapplied_transaction::id>
63 ordered_non_unique< tag<by_type>, member<unapplied_transaction, trx_enum_type, &unapplied_transaction::trx_type> >,
64 ordered_non_unique< tag<by_expiry>, member<unapplied_transaction, const fc::time_point, &unapplied_transaction::expiry> >
66 > unapplied_trx_queue_type;
68 unapplied_trx_queue_type queue;
69 uint64_t max_transaction_queue_size = 1024*1024*1024;
71 size_t incoming_count = 0;
90 return incoming_count;
94 auto itr = queue.get<by_trx_id>().find(
id );
95 if( itr == queue.get<by_trx_id>().end() )
return {};
99 template <
typename Yield,
typename Callback>
101 auto& persisted_by_expiry = queue.get<by_expiry>();
102 while( !persisted_by_expiry.empty() ) {
103 const auto& itr = persisted_by_expiry.begin();
104 if( itr->expiry > pending_block_time ) {
110 callback( itr->trx_meta->packed_trx(), itr->trx_type );
112 itr->next( std::static_pointer_cast<fc::exception>(
113 std::make_shared<expired_tx_exception>(
114 FC_LOG_MESSAGE( error,
"expired transaction ${id}, expiration ${e}, block time ${bt}",
115 (
"id", itr->id())(
"e", itr->trx_meta->packed_trx()->expiration())
116 (
"bt", pending_block_time) ) ) ) );
119 persisted_by_expiry.erase( itr );
125 if(
empty() )
return;
126 auto& idx = queue.get<by_trx_id>();
127 for(
const auto& receipt : bs->block->transactions ) {
128 if( std::holds_alternative<packed_transaction>(receipt.trx) ) {
129 const auto& pt = std::get<packed_transaction>(receipt.trx);
130 auto itr = idx.find( pt.id() );
131 if( itr != idx.end() ) {
133 itr->next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
134 FC_LOG_MESSAGE( info,
"duplicate transaction ${id}", (
"id", itr->trx_meta->id())))));
145 for(
auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) {
147 for(
auto itr = bsptr->trxs_metas().begin(), end = bsptr->trxs_metas().end(); itr != end; ++itr ) {
148 const auto& trx = *itr;
151 if( insert_itr.second ) added( insert_itr.first );
156 void add_aborted( std::vector<transaction_metadata_ptr> aborted_trxs ) {
157 for(
auto& trx : aborted_trxs ) {
160 if( insert_itr.second ) added( insert_itr.first );
165 auto itr = queue.get<by_trx_id>().find( trx->id() );
166 if( itr == queue.get<by_trx_id>().end() ) {
169 if( insert_itr.second ) added( insert_itr.first );
173 queue.get<by_trx_id>().modify( itr, [](
auto& un){
180 auto itr = queue.get<by_trx_id>().find( trx->id() );
181 if( itr == queue.get<by_trx_id>().end() ) {
183 auto insert_itr = queue.insert(
185 if( insert_itr.second ) added( insert_itr.first );
187 if( itr->trx_meta == trx )
return;
189 next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
190 FC_LOG_MESSAGE( info,
"duplicate transaction ${id}", (
"id", trx->id()) ) ) ) );
195 using iterator = unapplied_trx_queue_type::index<by_type>::type::iterator;
213 return queue.get<by_type>().
erase( itr );
217 template<
typename Itr>
218 void added( Itr itr ) {
219 auto size = calc_size( itr->trx_meta );
222 SYS_ASSERT( size_in_bytes +
size < max_transaction_queue_size, tx_resource_exhaustion,
223 "Transaction ${id}, size ${s} bytes would exceed configured "
224 "incoming-transaction-queue-size-mb ${qs}, current queue size ${cs} bytes",
225 (
"id", itr->trx_meta->id())(
"s",
size)(
"qs", max_transaction_queue_size/(1024*1024))
226 (
"cs", size_in_bytes) );
228 size_in_bytes +=
size;
231 template<
typename Itr>
232 void removed( Itr itr ) {
236 size_in_bytes -= calc_size( itr->trx_meta );
241 return (trx->packed_trx()->get_unprunable_size() + trx->packed_trx()->get_prunable_size()) * 2 +
sizeof( *trx );
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
bool clear_expired(const time_point &pending_block_time, Yield &&yield, Callback &&callback)
size_t incoming_size() const
iterator incoming_begin()
iterator unapplied_begin()
void add_forked(const branch_type &forked_branch)
iterator erase(iterator itr)
caller's responsibility to call next() if applicable
void add_incoming(const transaction_metadata_ptr &trx, bool persist_until_expired, bool return_failure_trace, next_func_t next)
void add_aborted(std::vector< transaction_metadata_ptr > aborted_trxs)
void set_max_transaction_queue_size(uint64_t v)
unapplied_trx_queue_type::index< by_type >::type::iterator iterator
iterator persisted_begin()
void add_persisted(const transaction_metadata_ptr &trx)
void clear_applied(const block_state_ptr &bs)
transaction_metadata_ptr get_trx(const transaction_id_type &id) const
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
std::size_t hash_value(const fc::sha256 &v)
deque< block_state_ptr > branch_type
std::function< void(const std::variant< fc::exception_ptr, transaction_trace_ptr > &)> next_func_t
std::shared_ptr< block_state > block_state_ptr
std::shared_ptr< transaction_metadata > transaction_metadata_ptr
unsigned __int64 uint64_t
unapplied_transaction()=delete
const transaction_id_type & id() const
unapplied_transaction(unapplied_transaction &&)=default
unapplied_transaction(const unapplied_transaction &)=delete
unapplied_transaction & operator=(const unapplied_transaction &)=delete
const fc::time_point expiry
bool return_failure_trace
const transaction_metadata_ptr trx_meta