Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
unapplied_transaction_queue.hpp
Go to the documentation of this file.
1#pragma once
2
7
8#include <boost/multi_index_container.hpp>
9#include <boost/multi_index/hashed_index.hpp>
10#include <boost/multi_index/member.hpp>
11
12namespace fc {
13 inline std::size_t hash_value( const fc::sha256& v ) {
14 return v._hash[3];
15 }
16}
17
18namespace sysio { namespace chain {
19
20using namespace boost::multi_index;
21
22enum class trx_enum_type {
23 unknown = 0,
24 persisted = 1,
25 forked = 2,
26 aborted = 3,
28 incoming = 5 // incoming_end() needs to be updated if this changes
29};
30
31using next_func_t = std::function<void(const std::variant<fc::exception_ptr, transaction_trace_ptr>&)>;
32
47
53private:
54 struct by_trx_id;
55 struct by_type;
56 struct by_expiry;
57
58 typedef multi_index_container< unapplied_transaction,
59 indexed_by<
60 hashed_unique< tag<by_trx_id>,
61 const_mem_fun<unapplied_transaction, const transaction_id_type&, &unapplied_transaction::id>
62 >,
63 ordered_non_unique< tag<by_type>, member<unapplied_transaction, trx_enum_type, &unapplied_transaction::trx_type> >,
64 ordered_non_unique< tag<by_expiry>, member<unapplied_transaction, const fc::time_point, &unapplied_transaction::expiry> >
65 >
66 > unapplied_trx_queue_type;
67
68 unapplied_trx_queue_type queue;
69 uint64_t max_transaction_queue_size = 1024*1024*1024; // enforced for incoming
70 uint64_t size_in_bytes = 0;
71 size_t incoming_count = 0;
72
73public:
74
75 void set_max_transaction_queue_size( uint64_t v ) { max_transaction_queue_size = v; }
76
77 bool empty() const {
78 return queue.empty();
79 }
80
81 size_t size() const {
82 return queue.size();
83 }
84
85 void clear() {
86 queue.clear();
87 }
88
89 size_t incoming_size()const {
90 return incoming_count;
91 }
92
94 auto itr = queue.get<by_trx_id>().find( id );
95 if( itr == queue.get<by_trx_id>().end() ) return {};
96 return itr->trx_meta;
97 }
98
99 template <typename Yield, typename Callback>
100 bool clear_expired( const time_point& pending_block_time, Yield&& yield, Callback&& callback ) {
101 auto& persisted_by_expiry = queue.get<by_expiry>();
102 while( !persisted_by_expiry.empty() ) {
103 const auto& itr = persisted_by_expiry.begin();
104 if( itr->expiry > pending_block_time ) {
105 break;
106 }
107 if( yield() ) {
108 return false;
109 }
110 callback( itr->trx_meta->packed_trx(), itr->trx_type );
111 if( itr->next ) {
112 itr->next( std::static_pointer_cast<fc::exception>(
113 std::make_shared<expired_tx_exception>(
114 FC_LOG_MESSAGE( error, "expired transaction ${id}, expiration ${e}, block time ${bt}",
115 ("id", itr->id())("e", itr->trx_meta->packed_trx()->expiration())
116 ("bt", pending_block_time) ) ) ) );
117 }
118 removed( itr );
119 persisted_by_expiry.erase( itr );
120 }
121 return true;
122 }
123
124 void clear_applied( const block_state_ptr& bs ) {
125 if( empty() ) return;
126 auto& idx = queue.get<by_trx_id>();
127 for( const auto& receipt : bs->block->transactions ) {
128 if( std::holds_alternative<packed_transaction>(receipt.trx) ) {
129 const auto& pt = std::get<packed_transaction>(receipt.trx);
130 auto itr = idx.find( pt.id() );
131 if( itr != idx.end() ) {
132 if( itr->next ) {
133 itr->next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
134 FC_LOG_MESSAGE( info, "duplicate transaction ${id}", ("id", itr->trx_meta->id())))));
135 }
136 removed( itr );
137 idx.erase( itr );
138 }
139 }
140 }
141 }
142
143 void add_forked( const branch_type& forked_branch ) {
144 // forked_branch is in reverse order
145 for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) {
146 const block_state_ptr& bsptr = *ritr;
147 for( auto itr = bsptr->trxs_metas().begin(), end = bsptr->trxs_metas().end(); itr != end; ++itr ) {
148 const auto& trx = *itr;
149 fc::time_point expiry = trx->packed_trx()->expiration();
150 auto insert_itr = queue.insert( { trx, expiry, trx_enum_type::forked } );
151 if( insert_itr.second ) added( insert_itr.first );
152 }
153 }
154 }
155
156 void add_aborted( std::vector<transaction_metadata_ptr> aborted_trxs ) {
157 for( auto& trx : aborted_trxs ) {
158 fc::time_point expiry = trx->packed_trx()->expiration();
159 auto insert_itr = queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
160 if( insert_itr.second ) added( insert_itr.first );
161 }
162 }
163
165 auto itr = queue.get<by_trx_id>().find( trx->id() );
166 if( itr == queue.get<by_trx_id>().end() ) {
167 fc::time_point expiry = trx->packed_trx()->expiration();
168 auto insert_itr = queue.insert( { trx, expiry, trx_enum_type::persisted } );
169 if( insert_itr.second ) added( insert_itr.first );
170 } else if( itr->trx_type != trx_enum_type::persisted ) {
171 if (itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted)
172 --incoming_count;
173 queue.get<by_trx_id>().modify( itr, [](auto& un){
174 un.trx_type = trx_enum_type::persisted;
175 } );
176 }
177 }
178
179 void add_incoming( const transaction_metadata_ptr& trx, bool persist_until_expired, bool return_failure_trace, next_func_t next ) {
180 auto itr = queue.get<by_trx_id>().find( trx->id() );
181 if( itr == queue.get<by_trx_id>().end() ) {
182 fc::time_point expiry = trx->packed_trx()->expiration();
183 auto insert_itr = queue.insert(
184 { trx, expiry, persist_until_expired ? trx_enum_type::incoming_persisted : trx_enum_type::incoming, return_failure_trace, std::move( next ) } );
185 if( insert_itr.second ) added( insert_itr.first );
186 } else {
187 if( itr->trx_meta == trx ) return; // same trx meta pointer
188 if( next ) {
189 next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
190 FC_LOG_MESSAGE( info, "duplicate transaction ${id}", ("id", trx->id()) ) ) ) );
191 }
192 }
193 }
194
195 using iterator = unapplied_trx_queue_type::index<by_type>::type::iterator;
196
197 iterator begin() { return queue.get<by_type>().begin(); }
198 iterator end() { return queue.get<by_type>().end(); }
199
200 // persisted, forked, aborted
201 iterator unapplied_begin() { return queue.get<by_type>().begin(); }
202 iterator unapplied_end() { return queue.get<by_type>().upper_bound( trx_enum_type::aborted ); }
203
204 iterator persisted_begin() { return queue.get<by_type>().lower_bound( trx_enum_type::persisted ); }
205 iterator persisted_end() { return queue.get<by_type>().upper_bound( trx_enum_type::persisted ); }
206
207 iterator incoming_begin() { return queue.get<by_type>().lower_bound( trx_enum_type::incoming_persisted ); }
208 iterator incoming_end() { return queue.get<by_type>().end(); } // if changed to upper_bound, verify usage performance
209
212 removed( itr );
213 return queue.get<by_type>().erase( itr );
214 }
215
216private:
217 template<typename Itr>
218 void added( Itr itr ) {
219 auto size = calc_size( itr->trx_meta );
220 if( itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted ) {
221 ++incoming_count;
222 SYS_ASSERT( size_in_bytes + size < max_transaction_queue_size, tx_resource_exhaustion,
223 "Transaction ${id}, size ${s} bytes would exceed configured "
224 "incoming-transaction-queue-size-mb ${qs}, current queue size ${cs} bytes",
225 ("id", itr->trx_meta->id())("s", size)("qs", max_transaction_queue_size/(1024*1024))
226 ("cs", size_in_bytes) );
227 }
228 size_in_bytes += size;
229 }
230
231 template<typename Itr>
232 void removed( Itr itr ) {
233 if( itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted ) {
234 --incoming_count;
235 }
236 size_in_bytes -= calc_size( itr->trx_meta );
237 }
238
239 static uint64_t calc_size( const transaction_metadata_ptr& trx ) {
240 // packed_trx caches unpacked transaction so double
241 return (trx->packed_trx()->get_unprunable_size() + trx->packed_trx()->get_prunable_size()) * 2 + sizeof( *trx );
242 }
243
244};
245
246} } //sysio::chain
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
uint64_t _hash[4]
Definition sha256.hpp:100
bool clear_expired(const time_point &pending_block_time, Yield &&yield, Callback &&callback)
void add_forked(const branch_type &forked_branch)
iterator erase(iterator itr)
caller's responsibility to call next() if applicable
void add_incoming(const transaction_metadata_ptr &trx, bool persist_until_expired, bool return_failure_trace, next_func_t next)
void add_aborted(std::vector< transaction_metadata_ptr > aborted_trxs)
unapplied_trx_queue_type::index< by_type >::type::iterator iterator
void add_persisted(const transaction_metadata_ptr &trx)
transaction_metadata_ptr get_trx(const transaction_id_type &id) const
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
namespace sysio::chain
Definition authority.cpp:3
std::size_t hash_value(const fc::sha256 &v)
deque< block_state_ptr > branch_type
std::function< void(const std::variant< fc::exception_ptr, transaction_trace_ptr > &)> next_func_t
std::shared_ptr< block_state > block_state_ptr
std::shared_ptr< transaction_metadata > transaction_metadata_ptr
unsigned __int64 uint64_t
Definition stdint.h:136
const transaction_id_type & id() const
unapplied_transaction(unapplied_transaction &&)=default
unapplied_transaction(const unapplied_transaction &)=delete
unapplied_transaction & operator=(const unapplied_transaction &)=delete