12#include <boost/multi_index_container.hpp>
13#include <boost/multi_index/ordered_index.hpp>
14#include <boost/multi_index/hashed_index.hpp>
15#include <boost/multi_index/composite_key.hpp>
16#include <boost/multi_index/member.hpp>
22using namespace boost::multi_index;
26constexpr uint16_t lib_totem = std::numeric_limits<uint16_t>::max();
28struct tracked_transaction {
40 if( block_num == 0 )
return std::numeric_limits<uint32_t>::max() - 1;
41 if( num_blocks == lib_totem )
return std::numeric_limits<uint32_t>::max();
42 return block_num + num_blocks;
50 bool is_ready()
const {
51 return block_num != 0;
56 tracked_transaction(
const tracked_transaction&) =
delete;
57 tracked_transaction() =
delete;
58 tracked_transaction& operator=(
const tracked_transaction&) =
delete;
59 tracked_transaction(tracked_transaction&&) =
default;
64struct by_ready_block_num;
68using tracked_transaction_index_t = multi_index_container<tracked_transaction,
70 hashed_unique<tag<by_trx_id>,
71 const_mem_fun<tracked_transaction, const transaction_id_type&, &tracked_transaction::id>, std::hash<transaction_id_type>
73 ordered_non_unique<tag<by_expiry>,
74 const_mem_fun<tracked_transaction, fc::time_point_sec, &tracked_transaction::expiry>
76 ordered_non_unique<tag<by_ready_block_num>,
77 const_mem_fun<tracked_transaction, uint32_t, &tracked_transaction::ready_block_num>
79 ordered_non_unique<tag<by_block_num>,
80 member<tracked_transaction, uint32_t, &tracked_transaction::block_num>
82 ordered_non_unique<tag<by_last_try>,
83 const_mem_fun<tracked_transaction, fc::time_point, &tracked_transaction::last_try_time>
97 , _transaction_ack_channel(
appbase::
app().get_channel<chain::plugin_interface::compat::channels::transaction_ack>())
99 , _max_mem_usage_size(max_mem_usage_size)
100 , _retry_interval(retry_interval)
101 , _max_expiration_time(max_expiration_time)
105 return _max_expiration_time;
109 return _tracked_trxs.
index().size();
114 "Transaction exceeded transaction-retry-max-storage-size-gb limit: ${m} bytes", (
"m", _tracked_trxs.
memory_size()) );
115 auto i = _tracked_trxs.
index().get<by_trx_id>().find( ptrx->id() );
116 if( i == _tracked_trxs.
index().end() ) {
117 _tracked_trxs.
insert( {std::move(ptrx),
118 !num_blocks.has_value() ? lib_totem : *num_blocks,
129 if( !trace->receipt )
return;
136 if( trace->scheduled )
return;
138 if( !trace->producer_block_id )
return;
143 auto& idx = _tracked_trxs.
index().get<by_trx_id>();
144 auto itr = idx.find(trace->id);
145 if( itr != idx.end() ) {
146 _tracked_trxs.
modify( itr, [&trace, &control=_controller, &abi_max_time=_abi_serializer_max_time]( tracked_transaction& tt ) {
147 tt.block_num = trace->block_num;
154 }
catch( chain::abi_exception& ) {
155 tt.trx_trace_v = *trace;
163 rollback_to( block_num );
168 ack_ready_trxs_by_block_num( bsp->block_num );
173 ack_ready_trxs_by_lib( bsp->block_num );
174 clear_expired( bsp->block->timestamp );
179 void rollback_to(
uint32_t block_num ) {
180 const auto& idx = _tracked_trxs.
index().get<by_block_num>();
182 deque<
decltype(_tracked_trxs.
index().project<0>(idx.begin()))> to_process;
183 for(
auto i = idx.rbegin(); i != idx.rend(); ++i ) {
185 if( i->block_num < block_num )
break;
188 to_process.emplace_back( _tracked_trxs.
index().project<0>( --ii ) );
192 for(
auto& i : to_process ) {
193 _tracked_trxs.
modify( i, [&]( tracked_transaction& tt ) {
197 if( tt.last_try + _retry_interval <= now ) {
201 tt.trx_trace_v.
clear();
207 const auto& idx = _tracked_trxs.
index().get<by_last_try>();
210 deque<
decltype(_tracked_trxs.
index().project<0>(idx.begin()))> to_process;
211 for(
auto i = idx.begin(); i != idx.end(); ++i ) {
212 if( i->is_ready() )
break;
214 if( i->last_try + _retry_interval <= now ) {
215 to_process.emplace_back( _tracked_trxs.
index().project<0>( i ) );
219 for(
auto& i: to_process ) {
220 _transaction_ack_channel.
publish(
222 dlog(
"retry send trx ${id}", (
"id", i->ptrx->id()) );
223 _tracked_trxs.
modify( i, [&]( tracked_transaction& tt ) {
229 void ack_ready_trxs_by_block_num(
uint32_t block_num ) {
230 const auto& idx = _tracked_trxs.
index().get<by_ready_block_num>();
232 deque<
decltype(_tracked_trxs.
index().project<0>(idx.begin()))> to_process;
233 auto end = idx.upper_bound(block_num);
234 for(
auto i = idx.begin(); i != end; ++i ) {
235 to_process.emplace_back( _tracked_trxs.
index().project<0>( i ) );
238 for(
auto& i: to_process ) {
239 _tracked_trxs.
modify( i, [&]( tracked_transaction& tt ) {
240 tt.next( std::make_unique<fc::variant>( std::move( tt.trx_trace_v ) ) );
241 tt.trx_trace_v.
clear();
243 _tracked_trxs.
erase( i );
247 void ack_ready_trxs_by_lib(
uint32_t lib_block_num ) {
248 const auto& idx = _tracked_trxs.
index().get<by_block_num>();
250 deque<
decltype(_tracked_trxs.
index().project<0>(idx.begin()))> to_process;
251 auto end = idx.upper_bound(lib_block_num);
252 for(
auto i = idx.lower_bound(1); i != end; ++i ) {
253 to_process.emplace_back( _tracked_trxs.
index().project<0>( i ) );
256 for(
auto& i: to_process ) {
257 _tracked_trxs.
modify( i, [&]( tracked_transaction& tt ) {
258 tt.next( std::make_unique<fc::variant>( std::move( tt.trx_trace_v ) ) );
259 tt.trx_trace_v.
clear();
261 _tracked_trxs.
erase( i );
267 auto& idx = _tracked_trxs.
index().get<by_expiry>();
268 while( !idx.empty() ) {
269 auto itr = idx.begin();
270 if( itr->expiry() > block_time ) {
273 itr->next( std::static_pointer_cast<fc::exception>(
274 std::make_shared<expired_tx_exception>(
275 FC_LOG_MESSAGE( error,
"expired retry transaction ${id}, expiration ${e}, block time ${bt}",
276 (
"id", itr->id())(
"e", itr->ptrx->expiration())
278 _tracked_trxs.
erase( _tracked_trxs.
index().project<0>( itr ) );
286 const size_t _max_mem_usage_size;
302 _impl->track_transaction( std::move( ptrx ), num_blocks, next );
311 return _impl->size();
316 _impl->on_applied_transaction(trace, ptrx);
322 _impl->on_block_start(block_num);
328 _impl->on_accepted_block(block);
334 _impl->on_irreversible_block(block);
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
void publish(int priority, const Data &data)
static constexpr time_point maximum()
tracks the size of storage allocated to its underlying multi_index
void erase(const Key &key)
size_t memory_size() const
std::pair< typename primary_index_type::iterator, bool > insert(typename ContainerType::value_type obj)
void modify(typename primary_index_type::iterator itr, Lam lam)
const ContainerType & index() const
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
size_t estimated_size() const
void on_applied_transaction(const chain::transaction_trace_ptr &trace, const chain::packed_transaction_ptr &ptrx)
trx_retry_db(const chain::controller &controller, size_t max_mem_usage_size, fc::microseconds retry_interval, fc::microseconds max_expiration_time, fc::microseconds abi_serializer_max_time)
fc::time_point_sec get_max_expiration_time() const
void on_irreversible_block(const chain::block_state_ptr &block)
void track_transaction(chain::packed_transaction_ptr ptrx, std::optional< uint16_t > num_blocks, next_function< std::unique_ptr< fc::variant > > next)
void on_accepted_block(const chain::block_state_ptr &block)
void on_block_start(uint32_t block_num)
#define FC_LOG_AND_DROP(...)
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
size_t memory_size(const T &obj)
constexpr microseconds seconds(int64_t s)
std::function< void(const std::variant< fc::exception_ptr, T > &)> next_function
std::shared_ptr< transaction_trace > transaction_trace_ptr
std::shared_ptr< const packed_transaction > packed_transaction_ptr
bool is_onblock(const transaction_trace &tt)
std::shared_ptr< block_state > block_state_ptr
uint32_t next(octet_iterator &it, octet_iterator end)
const fc::microseconds abi_serializer_max_time
static yield_function_t create_yield_function(const fc::microseconds &max_serialization_time)
void on_irreversible_block(const chain::block_state_ptr &bsp)
void on_accepted_block(const chain::block_state_ptr &bsp)
void on_applied_transaction(const chain::transaction_trace_ptr &trace, const chain::packed_transaction_ptr &ptrx)
void on_block_start(uint32_t block_num)
const fc::microseconds & get_max_expiration() const
void track_transaction(packed_transaction_ptr ptrx, std::optional< uint16_t > num_blocks, next_function< std::unique_ptr< fc::variant > > next)
trx_retry_db_impl(const chain::controller &controller, size_t max_mem_usage_size, fc::microseconds retry_interval, fc::microseconds max_expiration_time, fc::microseconds abi_serializer_max_time)