Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
trx_finality_status_processing.cpp
Go to the documentation of this file.
3
4
5using namespace sysio;
6using namespace sysio::finality_status;
7
8namespace sysio::chain_apis {
9
42
44 : _my(new trx_finality_status_processing_impl(max_storage, success_duration, failure_duration))
45 {
46 }
47
49
51 try {
52 _my->_irr_block_id = bsp->id;
53 _my->_irr_block_timestamp = bsp->block->timestamp;
54 } FC_LOG_AND_DROP(("Failed to signal irreversible block for finality status"));
55 }
56
58 try {
59 // since a new block is started, no block state was received, so the speculative block did not get eventually produced
60 _my->_speculative_trxs.clear();
61 } FC_LOG_AND_DROP(("Failed to signal block start for finality status"));
62 }
63
65 try {
66 _my->signal_applied_transaction(trace, ptrx);
67 } FC_LOG_AND_DROP(("Failed to signal applied transaction for finality status"));
68 }
69
71 try {
72 _my->signal_accepted_block(bsp);
73 } FC_LOG_AND_DROP(("Failed to signal accepted block for finality status"));
74 }
75
78 // use the head block num if we are in a block, otherwise don't provide block number for speculative blocks
79 chain::block_id_type block_id;
81 bool modified = false;
82 if (trace->producer_block_id) {
83 block_id = *trace->producer_block_id;
84 const bool block_changed = block_id != _head_block_id;
85 if (block_changed) {
86 _head_block_id = block_id;
87 _head_block_timestamp = trace->block_time;
88 }
90
91 const auto head_block_num = chain::block_header::num_from_id(_head_block_id);
92 if (block_changed && head_block_num <= _last_proc_block_num) {
94 modified = true;
95 }
96
97 _last_proc_block_num = head_block_num;
98
99 if (status_expiry_of_trxs(now)) {
100 modified = true;
101 }
102 }
103
104 if (!trace->receipt) return;
105 if (trace->receipt->status != chain::transaction_receipt_header::executed) {
106 return;
107 }
108 if (trace->scheduled) return;
109 if (chain::is_onblock(*trace)) return;
110
111 if (!trace->producer_block_id) {
112 _speculative_trxs.push_back(trace->id);
113 }
114
115 if(ensure_storage()) {
116 modified = true;
117 }
118
119 const auto& trx_id = trace->id;
120 auto iter = _storage.find(trx_id);
121 if (iter != _storage.index().cend()) {
122 _storage.modify( iter, [&block_id,&block_timestamp]( finality_status_object& obj ) {
123 obj.block_id = block_id;
125 obj.forked_out = false;
126 } );
127 }
128 else {
130 finality_status_object{.trx_id = trx_id,
131 .trx_expiry = ptrx->expiration(),
132 .received = now,
133 .block_id = block_id,
134 .block_timestamp = block_timestamp});
135 }
136
139 }
140 }
141
143 // if this block had any transactions, then we have processed everything we need to already
144 if (bsp->id == _head_block_id) {
145 return;
146 }
147
148 _head_block_id = bsp->id;
149 _head_block_timestamp = bsp->block->timestamp;
150
151 const auto head_block_num = chain::block_header::num_from_id(_head_block_id);
152 if (head_block_num <= _last_proc_block_num) {
154 }
155
157 bool status_expiry = status_expiry_of_trxs(now);
158 if (status_expiry) {
160 }
161
162 // if this approve block was preceded by speculative transactions then we produced the block, update trx state.
164 obj.block_id = block_id;
165 obj.block_timestamp = block_timestamp;
166 obj.forked_out = false;
167 };
168 for (const auto& trx_id : _speculative_trxs) {
169 auto iter = _storage.find(trx_id);
170 FC_ASSERT( iter != _storage.index().cend(),
171 "CODE ERROR: Should not have speculative transactions that have not already"
172 "been identified prior to the block being accepted. trx id: ${trx_id}",
173 ("trx_id", trx_id) );
174 _storage.modify( iter, mod );
175 }
176 _speculative_trxs.clear();
177
178 _last_proc_block_num = head_block_num;
179 }
180
181
183 const auto& indx = _storage.index().get<by_block_num>();
184 chain::deque<decltype(_storage.index().project<0>(indx.begin()))> trxs;
185 for (auto iter = indx.lower_bound(chain::block_header::num_from_id(_head_block_id)); iter != indx.end(); ++iter) {
186 trxs.push_back(_storage.index().project<0>(iter));
187 }
188 for (const auto& trx_iter : trxs) {
189 _storage.modify( trx_iter, []( finality_status_object& obj ) {
190 obj.forked_out = true;
191 } );
192 }
193 }
194
196 const auto& indx = _storage.index().get<by_status_expiry>();
197 chain::deque<decltype(_storage.index().project<0>(indx.begin()))> remove_trxs;
198
199 // find the successful (in any block) transactions that are past the failure expiry times
200 auto success_iter = indx.lower_bound(boost::make_tuple(true, fc::time_point{}));
201
202 const fc::time_point success_expiry = now - _success_duration;
203 const auto success_end = indx.upper_bound(boost::make_tuple(true, success_expiry));
204 for (; success_iter != success_end; ++success_iter) {
205 remove_trxs.push_back(_storage.index().project<0>(success_iter));
206 }
207
208 const fc::time_point fail_expiry = now - _failure_duration;
209 const auto fail_end = indx.upper_bound(boost::make_tuple(false, fail_expiry));
210 // find the failure (not in a block) transactions that are past the failure expiry time
211 for (auto fail_iter = indx.begin(); fail_iter != fail_end; ++fail_iter) {
212 remove_trxs.push_back(_storage.index().project<0>(fail_iter));
213 }
214
215 for (const auto& trx_iter : remove_trxs) {
216 _storage.erase(trx_iter);
217 }
218 return !remove_trxs.empty();
219 }
220
222 const int64_t remaining_storage = _max_storage - _storage.memory_size();
223 if (remaining_storage > 0) {
224 return false;
225 }
226
227 auto percentage = [](uint64_t mem) {
228 const uint64_t pcnt = 90;
229 return (mem * pcnt)/100;
230 };
231 // determine how much we need to free to get back to at least the desired percentage of the storage
232 int64_t storage_to_free = _max_storage - percentage(_max_storage) - remaining_storage;
233 ilog("Finality Status exceeded max storage (${max_storage}GB) need to free up ${storage_to_free} GB",
234 ("max_storage",_max_storage/1024/1024/1024)
235 ("storage_to_free",storage_to_free/1024/1024/1024));
236 const auto& block_indx = _storage.index().get<by_block_num>();
237 const auto& status_expiry_indx = _storage.index().get<by_status_expiry>();
238 using index_iter_type = decltype(_storage.index().project<0>(block_indx.begin()));
239 chain::deque<index_iter_type> remove_trxs;
240
241 auto reduce_storage = [&storage_to_free,&remove_trxs,&storage=this->_storage](auto iter) {
242 storage_to_free -= iter->memory_size();
243 remove_trxs.push_back(storage.index().project<0>(iter));
244 };
245
246 auto block_upper_bound = finality_status::no_block_num;
247 // start at the beginning of the oldest_failure section and just keep iterating from there
248 auto oldest_failure_iter = status_expiry_indx.begin();
249 // the end of the oldest failure section
250 const auto oldest_failure_end = status_expiry_indx.lower_bound( std::make_tuple( true, fc::time_point{} ) );
252 while (storage_to_free > 0) {
253 auto oldest_block_iter = block_indx.upper_bound(block_upper_bound);
254 if (oldest_block_iter == block_indx.end()) {
255 FC_ASSERT( oldest_failure_iter != oldest_failure_end,
256 "CODE ERROR: can not free more storage, but still exceeding limit. "
257 "Total entries: ${total_entries}, storage memory to free: ${storage}, "
258 "entries slated for removal: ${remove_entries}",
259 ("total_entries", _storage.index().size())
260 ("storage", storage_to_free)
261 ("remove_entries", remove_trxs.size()));
262 for (; oldest_failure_iter != oldest_failure_end && storage_to_free > 0; ++oldest_failure_iter) {
263 reduce_storage(oldest_failure_iter);
264 }
265 FC_ASSERT( storage_to_free < 1,
266 "CODE ERROR: can not free more storage, but still exceeding limit. "
267 "Total entries: ${total_entries}, storage memory to free: ${storage}, "
268 "entries slated for removal: ${remove_entries}",
269 ("total_entries", _storage.index().size())
270 ("storage", storage_to_free)
271 ("remove_entries", remove_trxs.size()));
272 break;
273 }
274 else {
275 const auto block_num = oldest_block_iter->block_num();
276 if (earliest_block == finality_status::no_block_num) {
277 earliest_block = block_num;
278 }
279 block_upper_bound = block_num;
280 const auto block_timestamp = oldest_block_iter->block_timestamp;
281 for (; oldest_block_iter != block_indx.end() && oldest_block_iter->block_num() == block_num; ++oldest_block_iter) {
282 reduce_storage(oldest_block_iter);
283 }
284 const auto oldest_failure_upper_bound = status_expiry_indx.upper_bound( std::make_tuple( false, block_timestamp.to_time_point() ));
285 for (; oldest_failure_iter != oldest_failure_upper_bound; ++oldest_failure_iter) {
286 reduce_storage(oldest_failure_iter);
287 }
288 }
289 }
290
291 for (const auto& trx_iter : remove_trxs) {
292 _storage.erase(trx_iter);
293 }
294
295 if (earliest_block != finality_status::no_block_num) {
296 ilog( "Finality Status dropped ${trx_count} transactions, which were removed from block # ${block_num_start} to block # ${block_num_end}",
297 ("trx_count", remove_trxs.size())("block_num_start", earliest_block)("block_num_end", block_upper_bound) );
298 }
299 else {
300 ilog( "Finality Status dropped ${trx_count} transactions, all were failed transactions", ("trx_count", remove_trxs.size()) );
301 }
302
303 return true;
304 }
305
307 return { .head_id = _my->_head_block_id, .head_block_timestamp = _my->_head_block_timestamp, .irr_id = _my->_irr_block_id, .irr_block_timestamp = _my->_irr_block_timestamp, .earliest_tracked_block_id = _my->_earliest_tracked_block_id };
308 }
309
310 std::optional<trx_finality_status_processing::trx_state> trx_finality_status_processing::get_trx_state( const chain::transaction_id_type& id ) const {
311 auto iter = _my->_storage.find(id);
312 if (iter == _my->_storage.index().cend()) {
313 return {};
314 }
315
316 const char* status;
317 if (!iter->is_in_block()) {
318 if (fc::time_point::now() >= iter->trx_expiry) {
319 status = "FAILED";
320 }
321 else {
322 status = iter->forked_out ? "FORKED_OUT" : "LOCALLY_APPLIED";
323 }
324 }
325 else {
326 const auto block_num = iter->block_num();
327 const auto lib = chain::block_header::num_from_id(_my->_irr_block_id);
328 status = (block_num > lib) ? "IN_BLOCK" : "IRREVERSIBLE";
329 }
330 return trx_finality_status_processing::trx_state{ .block_id = iter->block_id, .block_timestamp = iter->block_timestamp, .received = iter->received, .expiration = iter->trx_expiry, .status = status };
331 }
332
334 return _my->_storage.memory_size();
335 }
336
338 const auto& indx = _storage.index().get<by_status_expiry>();
339
340 // find the lowest value successful block
341 auto success_iter = indx.lower_bound(boost::make_tuple(true, fc::time_point{}));
342 if (success_iter != indx.cend()) {
343 _earliest_tracked_block_id = success_iter->block_id;
344 }
345 }
346}
static time_point now()
Definition time.cpp:14
tracks the size of storage allocated to its underlying multi_index
void erase(const Key &key)
size_t memory_size() const
std::pair< typename primary_index_type::iterator, bool > insert(typename ContainerType::value_type obj)
void modify(typename primary_index_type::iterator itr, Lam lam)
const ContainerType & index() const
primary_index_type::iterator find(const Key &key)
fc::time_point to_time_point() const
void signal_applied_transaction(const chain::transaction_trace_ptr &trace, const chain::packed_transaction_ptr &ptrx)
trx_finality_status_processing(uint64_t max_storage, const fc::microseconds &success_duration, const fc::microseconds &failure_duration)
std::optional< trx_state > get_trx_state(const chain::transaction_id_type &id) const
void signal_irreversible_block(const chain::block_state_ptr &bsp)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
#define FC_LOG_AND_DROP(...)
#define ilog(FORMAT,...)
Definition logger.hpp:118
std::shared_ptr< transaction_trace > transaction_trace_ptr
Definition trace.hpp:20
std::shared_ptr< const packed_transaction > packed_transaction_ptr
bool is_onblock(const transaction_trace &tt)
Definition trace.hpp:71
std::shared_ptr< block_state > block_state_ptr
signed __int64 int64_t
Definition stdint.h:135
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
static uint32_t num_from_id(const block_id_type &id)
@ executed
succeed, no error handler executed
Definition block.hpp:14
trx_finality_status_processing_impl(uint64_t max_storage, const fc::microseconds &success_duration, const fc::microseconds &failure_duration)
void signal_applied_transaction(const chain::transaction_trace_ptr &trace, const chain::packed_transaction_ptr &ptrx)
fc::tracked_storage< finality_status_multi_index > _storage
tracks status related to a transaction in the blockchain
chain::block_timestamp_type block_timestamp