Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
trx_retry_db.cpp
Go to the documentation of this file.
2
6
9
11
12#include <boost/multi_index_container.hpp>
13#include <boost/multi_index/ordered_index.hpp>
14#include <boost/multi_index/hashed_index.hpp>
15#include <boost/multi_index/composite_key.hpp>
16#include <boost/multi_index/member.hpp>
17
18
19using namespace sysio;
20using namespace sysio::chain;
21using namespace sysio::chain::literals;
22using namespace boost::multi_index;
23
24namespace {
25
26constexpr uint16_t lib_totem = std::numeric_limits<uint16_t>::max();
27
28struct tracked_transaction {
29 const packed_transaction_ptr ptrx;
30 const uint16_t num_blocks = 0; // lib is lib_totem
31 uint32_t block_num = 0;
32 fc::variant trx_trace_v;
33 fc::time_point last_try;
35
36 const transaction_id_type& id()const { return ptrx->id(); }
37 fc::time_point_sec expiry()const { return ptrx->expiration(); }
38
39 uint32_t ready_block_num()const {
40 if( block_num == 0 ) return std::numeric_limits<uint32_t>::max() - 1; // group not seen in middle
41 if( num_blocks == lib_totem ) return std::numeric_limits<uint32_t>::max(); // lib at the end
42 return block_num + num_blocks;
43 }
44
45 fc::time_point last_try_time()const {
46 if( block_num != 0 ) return fc::time_point::maximum();
47 return last_try;
48 }
49
50 bool is_ready()const {
51 return block_num != 0;
52 }
53
54 size_t memory_size()const { return ptrx->get_estimated_size() + trx_trace_v.estimated_size() + sizeof(*this); }
55
56 tracked_transaction(const tracked_transaction&) = delete;
57 tracked_transaction() = delete;
58 tracked_transaction& operator=(const tracked_transaction&) = delete;
59 tracked_transaction(tracked_transaction&&) = default;
60};
61
62struct by_trx_id;
63struct by_expiry;
64struct by_ready_block_num;
65struct by_block_num;
66struct by_last_try;
67
68using tracked_transaction_index_t = multi_index_container<tracked_transaction,
69 indexed_by<
70 hashed_unique<tag<by_trx_id>,
71 const_mem_fun<tracked_transaction, const transaction_id_type&, &tracked_transaction::id>, std::hash<transaction_id_type>
72 >,
73 ordered_non_unique<tag<by_expiry>,
74 const_mem_fun<tracked_transaction, fc::time_point_sec, &tracked_transaction::expiry>
75 >,
76 ordered_non_unique<tag<by_ready_block_num>,
77 const_mem_fun<tracked_transaction, uint32_t, &tracked_transaction::ready_block_num>
78 >,
79 ordered_non_unique<tag<by_block_num>,
80 member<tracked_transaction, uint32_t, &tracked_transaction::block_num>
81 >,
82 ordered_non_unique<tag<by_last_try>,
83 const_mem_fun<tracked_transaction, fc::time_point, &tracked_transaction::last_try_time>
84 >
85 >
86>;
87
88} // anonymous namespace
89
90namespace sysio::chain_apis {
91
93 explicit trx_retry_db_impl(const chain::controller& controller, size_t max_mem_usage_size,
94 fc::microseconds retry_interval, fc::microseconds max_expiration_time,
96 : _controller(controller)
97 , _transaction_ack_channel(appbase::app().get_channel<chain::plugin_interface::compat::channels::transaction_ack>())
98 , _abi_serializer_max_time(abi_serializer_max_time)
99 , _max_mem_usage_size(max_mem_usage_size)
100 , _retry_interval(retry_interval)
101 , _max_expiration_time(max_expiration_time)
102 {}
103
105 return _max_expiration_time;
106 }
107
108 size_t size()const {
109 return _tracked_trxs.index().size();
110 }
111
112 void track_transaction( packed_transaction_ptr ptrx, std::optional<uint16_t> num_blocks, next_function<std::unique_ptr<fc::variant>> next ) {
113 SYS_ASSERT( _tracked_trxs.memory_size() < _max_mem_usage_size, tx_resource_exhaustion,
114 "Transaction exceeded transaction-retry-max-storage-size-gb limit: ${m} bytes", ("m", _tracked_trxs.memory_size()) );
115 auto i = _tracked_trxs.index().get<by_trx_id>().find( ptrx->id() );
116 if( i == _tracked_trxs.index().end() ) {
117 _tracked_trxs.insert( {std::move(ptrx),
118 !num_blocks.has_value() ? lib_totem : *num_blocks,
119 0,
120 {},
122 std::move(next)} );
123 } else {
124 // already tracking transaction
125 }
126 }
127
129 if( !trace->receipt ) return;
130 // include only executed incoming transactions.
131 // soft_fail not included as only interested in incoming
132 if(trace->receipt->status != chain::transaction_receipt_header::executed) {
133 return;
134 }
135 // only incoming
136 if( trace->scheduled ) return;
137 // Only want transactions in a block, if no producer id then not in a block
138 if( !trace->producer_block_id ) return;
139 // Don't care about implicit
140 if( chain::is_onblock( *trace ) ) return;
141
142 // Is this a transaction we are tracking
143 auto& idx = _tracked_trxs.index().get<by_trx_id>();
144 auto itr = idx.find(trace->id);
145 if( itr != idx.end() ) {
146 _tracked_trxs.modify( itr, [&trace, &control=_controller, &abi_max_time=_abi_serializer_max_time]( tracked_transaction& tt ) {
147 tt.block_num = trace->block_num;
148 try {
149 // send_transaction trace output format.
150 // Convert to variant with abi here and now because abi could change in very next transaction.
151 // Alternatively, we could store off all the abis needed and do the conversion later, but as this is designed
152 // to run on an API node, probably the best trade off to perform the abi serialization during block processing.
153 tt.trx_trace_v = control.to_variant_with_abi( *trace, abi_serializer::create_yield_function( abi_max_time ) );
154 } catch( chain::abi_exception& ) {
155 tt.trx_trace_v = *trace;
156 }
157 } );
158 }
159 }
160
161 void on_block_start( uint32_t block_num ) {
162 // on forks rollback any accepted block transactions
163 rollback_to( block_num );
164 }
165
167 // good time to perform processing
168 ack_ready_trxs_by_block_num( bsp->block_num );
169 retry_trxs();
170 }
171
173 ack_ready_trxs_by_lib( bsp->block_num );
174 clear_expired( bsp->block->timestamp );
175 }
176
177private:
178
179 void rollback_to( uint32_t block_num ) {
180 const auto& idx = _tracked_trxs.index().get<by_block_num>();
181 // determine what to rollback
182 deque<decltype(_tracked_trxs.index().project<0>(idx.begin()))> to_process;
183 for( auto i = idx.rbegin(); i != idx.rend(); ++i ) {
184 // called on block_start, so any block_num greater or equal have been rolled back
185 if( i->block_num < block_num ) break;
186
187 auto ii = i.base(); // switch to forward iterator, then -- to get back to item
188 to_process.emplace_back( _tracked_trxs.index().project<0>( --ii ) );
189 }
190 // perform rollback
191 auto now = fc::time_point::now();
192 for( auto& i : to_process ) {
193 _tracked_trxs.modify( i, [&]( tracked_transaction& tt ) {
194 // if forked out, then need to retry, which will happen according to last_try.
195 // if last_try would cause it to immediately resend, then push it out 10 seconds to allow time for
196 // fork-switch to complete.
197 if( tt.last_try + _retry_interval <= now ) {
198 tt.last_try += fc::seconds( 10 );
199 }
200 tt.block_num = 0;
201 tt.trx_trace_v.clear();
202 } );
203 }
204 }
205
206 void retry_trxs() {
207 const auto& idx = _tracked_trxs.index().get<by_last_try>();
208 auto now = fc::time_point::now();
209 // determine what to retry
210 deque<decltype(_tracked_trxs.index().project<0>(idx.begin()))> to_process;
211 for( auto i = idx.begin(); i != idx.end(); ++i ) {
212 if( i->is_ready() ) break;
213
214 if( i->last_try + _retry_interval <= now ) {
215 to_process.emplace_back( _tracked_trxs.index().project<0>( i ) );
216 }
217 }
218 // retry
219 for( auto& i: to_process ) {
220 _transaction_ack_channel.publish(
221 appbase::priority::low, std::pair<fc::exception_ptr, packed_transaction_ptr>( nullptr, i->ptrx ) );
222 dlog( "retry send trx ${id}", ("id", i->ptrx->id()) );
223 _tracked_trxs.modify( i, [&]( tracked_transaction& tt ) {
224 tt.last_try = now;
225 } );
226 }
227 }
228
229 void ack_ready_trxs_by_block_num( uint32_t block_num ) {
230 const auto& idx = _tracked_trxs.index().get<by_ready_block_num>();
231 // if we have reached requested block height then ack to user
232 deque<decltype(_tracked_trxs.index().project<0>(idx.begin()))> to_process;
233 auto end = idx.upper_bound(block_num);
234 for( auto i = idx.begin(); i != end; ++i ) {
235 to_process.emplace_back( _tracked_trxs.index().project<0>( i ) );
236 }
237 // ack
238 for( auto& i: to_process ) {
239 _tracked_trxs.modify( i, [&]( tracked_transaction& tt ) {
240 tt.next( std::make_unique<fc::variant>( std::move( tt.trx_trace_v ) ) );
241 tt.trx_trace_v.clear();
242 } );
243 _tracked_trxs.erase( i );
244 }
245 }
246
247 void ack_ready_trxs_by_lib( uint32_t lib_block_num ) {
248 const auto& idx = _tracked_trxs.index().get<by_block_num>();
249 // determine what to ack
250 deque<decltype(_tracked_trxs.index().project<0>(idx.begin()))> to_process;
251 auto end = idx.upper_bound(lib_block_num); // process until lib_block_num
252 for( auto i = idx.lower_bound(1); i != end; ++i ) { // skip over not ready, block_num == 0
253 to_process.emplace_back( _tracked_trxs.index().project<0>( i ) );
254 }
255 // ack
256 for( auto& i: to_process ) {
257 _tracked_trxs.modify( i, [&]( tracked_transaction& tt ) {
258 tt.next( std::make_unique<fc::variant>( std::move( tt.trx_trace_v ) ) );
259 tt.trx_trace_v.clear();
260 } );
261 _tracked_trxs.erase( i );
262 }
263 }
264
265 void clear_expired(const block_timestamp_type& block_timestamp) {
266 const fc::time_point block_time = block_timestamp;
267 auto& idx = _tracked_trxs.index().get<by_expiry>();
268 while( !idx.empty() ) {
269 auto itr = idx.begin();
270 if( itr->expiry() > block_time ) {
271 break;
272 }
273 itr->next( std::static_pointer_cast<fc::exception>(
274 std::make_shared<expired_tx_exception>(
275 FC_LOG_MESSAGE( error, "expired retry transaction ${id}, expiration ${e}, block time ${bt}",
276 ("id", itr->id())("e", itr->ptrx->expiration())
277 ("bt", block_timestamp) ) ) ) );
278 _tracked_trxs.erase( _tracked_trxs.index().project<0>( itr ) );
279 }
280 }
281
282private:
283 const chain::controller& _controller;
285 const fc::microseconds _abi_serializer_max_time;
286 const size_t _max_mem_usage_size;
287 const fc::microseconds _retry_interval;
288 const fc::microseconds _max_expiration_time;
290};
291
292trx_retry_db::trx_retry_db( const chain::controller& controller, size_t max_mem_usage_size,
293 fc::microseconds retry_interval, fc::microseconds max_expiration_time,
295:_impl(std::make_unique<trx_retry_db_impl>(controller, max_mem_usage_size, retry_interval, max_expiration_time, abi_serializer_max_time))
296{
297}
298
300
301void trx_retry_db::track_transaction( chain::packed_transaction_ptr ptrx, std::optional<uint16_t> num_blocks, next_function<std::unique_ptr<fc::variant>> next ) {
302 _impl->track_transaction( std::move( ptrx ), num_blocks, next );
303}
304
306 // conversion from time_point to time_point_sec rounds down, round up to nearest second to avoid appearing expired
307 return fc::time_point::now() + _impl->get_max_expiration() + fc::microseconds(999'999);
308}
309
310size_t trx_retry_db::size()const {
311 return _impl->size();
312}
313
315 try {
316 _impl->on_applied_transaction(trace, ptrx);
317 } FC_LOG_AND_DROP(("trx retry on_applied_transaction ERROR"));
318}
319
321 try {
322 _impl->on_block_start(block_num);
323 } FC_LOG_AND_DROP(("trx retry block_start ERROR"));
324}
325
327 try {
328 _impl->on_accepted_block(block);
329 } FC_LOG_AND_DROP(("trx retry accepted_block ERROR"));
330}
331
333 try {
334 _impl->on_irreversible_block(block);
335 } FC_LOG_AND_DROP(("trx retry irreversible_block ERROR"));
336}
337
338} // namespace sysio::chain_apis
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
void publish(int priority, const Data &data)
static time_point now()
Definition time.cpp:14
static constexpr time_point maximum()
Definition time.hpp:46
tracks the size of storage allocated to its underlying multi_index
void erase(const Key &key)
size_t memory_size() const
std::pair< typename primary_index_type::iterator, bool > insert(typename ContainerType::value_type obj)
void modify(typename primary_index_type::iterator itr, Lam lam)
const ContainerType & index() const
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
Definition variant.hpp:191
void clear()
Definition variant.cpp:168
size_t estimated_size() const
Definition variant.cpp:575
void on_applied_transaction(const chain::transaction_trace_ptr &trace, const chain::packed_transaction_ptr &ptrx)
trx_retry_db(const chain::controller &controller, size_t max_mem_usage_size, fc::microseconds retry_interval, fc::microseconds max_expiration_time, fc::microseconds abi_serializer_max_time)
fc::time_point_sec get_max_expiration_time() const
void on_irreversible_block(const chain::block_state_ptr &block)
void track_transaction(chain::packed_transaction_ptr ptrx, std::optional< uint16_t > num_blocks, next_function< std::unique_ptr< fc::variant > > next)
void on_accepted_block(const chain::block_state_ptr &block)
void on_block_start(uint32_t block_num)
uint64_t id
Definition code_cache.cpp:0
#define FC_LOG_AND_DROP(...)
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
#define dlog(FORMAT,...)
Definition logger.hpp:101
application & app()
size_t memory_size(const T &obj)
constexpr microseconds seconds(int64_t s)
Definition time.hpp:32
Definition name.hpp:106
std::function< void(const std::variant< fc::exception_ptr, T > &)> next_function
std::shared_ptr< transaction_trace > transaction_trace_ptr
Definition trace.hpp:20
std::shared_ptr< const packed_transaction > packed_transaction_ptr
bool is_onblock(const transaction_trace &tt)
Definition trace.hpp:71
std::shared_ptr< block_state > block_state_ptr
uint32_t next(octet_iterator &it, octet_iterator end)
Definition checked.h:137
const fc::microseconds abi_serializer_max_time
Definition main.cpp:173
unsigned short uint16_t
Definition stdint.h:125
unsigned int uint32_t
Definition stdint.h:126
static yield_function_t create_yield_function(const fc::microseconds &max_serialization_time)
@ executed
succeed, no error handler executed
Definition block.hpp:14
void on_irreversible_block(const chain::block_state_ptr &bsp)
void on_accepted_block(const chain::block_state_ptr &bsp)
void on_applied_transaction(const chain::transaction_trace_ptr &trace, const chain::packed_transaction_ptr &ptrx)
void on_block_start(uint32_t block_num)
const fc::microseconds & get_max_expiration() const
void track_transaction(packed_transaction_ptr ptrx, std::optional< uint16_t > num_blocks, next_function< std::unique_ptr< fc::variant > > next)
trx_retry_db_impl(const chain::controller &controller, size_t max_mem_usage_size, fc::microseconds retry_interval, fc::microseconds max_expiration_time, fc::microseconds abi_serializer_max_time)