Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
message_buffer.hpp
Go to the documentation of this file.
1#pragma once
2#include <boost/asio/ip/tcp.hpp>
3#include <boost/pool/singleton_pool.hpp>
4#include <fc/io/raw.hpp>
5#include <deque>
6#include <array>
8
9namespace fc {
10 template <uint32_t buffer_len>
11 class mb_datastream;
12 template <uint32_t buffer_len>
13 class mb_peek_datastream;
14
24 template <uint32_t buffer_len>
26 public:
27 /*
28 * index abstraction that references a point in the chain of buffers.
29 * first refers to the buffer's index in the deque.
30 * second refers to the byte in the character buffer.
31 */
32 typedef std::pair<uint32_t, uint32_t> index_t;
33
34 message_buffer() : buffers{malloc()}, read_ind{0,0}, write_ind{0,0}, sanity_check (1) { }
35
37 while (buffers.size() > 0) {
38 free(buffers.back());
39 buffers.pop_back();
40 }
41 }
42
43 /*
44 * Returns the current read index referencing the byte in the buffer
45 * that is next to be read.
46 */
47 index_t read_index() const { return read_ind; }
48
49 /*
50 * Returns the current write index referencing the byte in the buffer
51 * that is next to be written to.
52 */
53 index_t write_index() const { return write_ind; }
54
55 /*
56 * Returns the current read pointer pointing to the memory location
57 * of the next byte to be read.
58 */
59 char* read_ptr() {
60 return &buffers[read_ind.first]->at(read_ind.second);
61 }
62
63 /*
64 * Returns the current write pointer pointing to the memory location
65 * of the next byte to be written to.
66 */
67 char* write_ptr() {
68 return &buffers[write_ind.first]->at(write_ind.second);
69 }
70
71 /*
72 * Adds an additional buffer of buffer_len to the chain of buffers.
73 * Does not affect the read or write pointer.
74 */
76 sanity_check++;
77 buffers.push_back(malloc());
78 }
79
80 /*
81 * Adds additional buffers of length buffer_len to the chain of buffers
82 * in order to add at least bytes to the space available.
83 * Does not affect the read or write pointer.
84 */
86 int buffers_to_add = bytes / buffer_len + 1;
87 for (int i = 0; i < buffers_to_add; i++) {
88 sanity_check++;
89 buffers.push_back(malloc());
90 }
91 }
92
93 /*
94 * Resets the message buffer to the initial state. Any unread data is
95 * discarded.
96 */
97 void reset() {
98 // some condition exists that can send *both* buffers.size() and sanity_check to well over 10^6.
99 // this seems to be related to some sort of memory overrun possibly. By forcing an exit here, an
100 // external watchdog can be used to restart the process and avoid hanging.
101 if( buffers.size() != sanity_check || buffers.size() > 1000000) {
102 elog( "read_ind = ${r1}, ${r2} write_ind = ${w1}, ${w2}, buff.size = ${bs}, sanity = ${s}",
103 ( "r1", read_ind.first )( "r2", read_ind.second )( "w1", write_ind.first )( "w2", write_ind.second )
104 ( "bs", buffers.size() )( "s", sanity_check ) );
105 elog("Buffer manager overwrite detected. Terminating to allow external restart");
106 exit(1);
107 }
108 while (buffers.size() > 1) {
109 sanity_check--;
110 free(buffers.back());
111 buffers.pop_back();
112 }
113
114 read_ind = { 0, 0 };
115 write_ind = { 0, 0 };
116 }
117
118 /*
119 * Returns the current number of bytes remaining to be read.
120 * Logically, this is the different between where the read and write pointers are.
121 */
123 return bytes_to_read_from_index(read_ind);
124 }
125
126 /*
127 * Returns the current number of bytes remaining to be read from a given index
128 * Logically, this is the different between where the given index is and the write pointers.
129 */
131 return (write_ind.first - ind.first) * buffer_len + write_ind.second - ind.second;
132 }
133
134 /*
135 * Returns the current number of bytes available to be written.
136 * Logically, this is the different between the write pointer and the
137 * end of the buffer. If this is not enough room, call either
138 * add_buffer_to_chain() or add_room() to create more space.
139 */
141 return total_bytes() - write_ind.first * buffer_len - write_ind.second;
142 }
143
144 /*
145 * Returns the total number of bytes in the buffer chain.
146 */
148 return buffer_len * buffers.size();
149 }
150
151 /*
152 * Advances the read pointer/index the supplied number of bytes along
153 * the buffer chain. Any buffers that the read pointer moves beyond
154 * will be removed from the buffer chain. If the read pointer becomes
155 * equal to the write pointer, the message buffer will be reset to
156 * its initial state (one buffer with read and write pointers at the
157 * start).
158 */
160 advance_index(read_ind, bytes);
161 if (read_ind == write_ind) {
162 reset();
163 } else if (read_ind.first > 0) {
164 while (read_ind.first > 0) {
165 free(buffers.front());
166 buffers.pop_front();
167 sanity_check--;
168 read_ind.first--;
169 write_ind.first--;
170 }
171 }
172 }
173
174 /*
175 * Advances the write pointer/index the supplied number of bytes along
176 * the buffer chain.
177 */
179 advance_index(write_ind, bytes);
180 while (write_ind.first >= buffers.size()) {
181 sanity_check++;
182 buffers.push_back(malloc());
183 }
184 }
185
186 /*
187 * Creates and returns a vector of boost mutable_buffers that can
188 * be passed to boost async_read() and async_read_some() functions.
189 * The beginning of the vector will be the write pointer, which
190 * should be advanced the number of bytes read after the read returns.
191 */
192 std::vector<boost::asio::mutable_buffer> get_buffer_sequence_for_boost_async_read() {
193 std::vector<boost::asio::mutable_buffer> seq;
194 FC_ASSERT(write_ind.first < buffers.size());
195 seq.push_back(boost::asio::buffer(&buffers[write_ind.first]->at(write_ind.second),
196 buffer_len - write_ind.second));
197 for (std::size_t i = write_ind.first + 1; i < buffers.size(); i++) {
198 seq.push_back(boost::asio::buffer(&buffers[i]->at(0), buffer_len));
199 }
200 return seq;
201 }
202
203 /*
204 * Reads size bytes from the buffer chain starting at the read pointer.
205 * The read pointer is advanced size bytes.
206 */
207 bool read(void* s, uint32_t size) {
208 if (bytes_to_read() < size) {
209 FC_THROW_EXCEPTION( out_of_range_exception, "tried to read ${r} but only ${s} left",
210 ("r", size)( "s", bytes_to_read() ) );
211 }
212 if (read_ind.second + size <= buffer_len) {
213 memcpy(s, read_ptr(), size);
214 advance_read_ptr(size);
215 } else {
216 uint32_t num_in_buffer = buffer_len - read_ind.second;
217 memcpy(s, read_ptr(), num_in_buffer);
218 advance_read_ptr(num_in_buffer);
219 read((char*)s + num_in_buffer, size - num_in_buffer);
220 }
221 return true;
222 }
223
224 /*
225 * Reads size bytes from the buffer chain starting at the supplied index.
226 * The supplied index is advanced, but the read pointer is unaffected.
227 */
228 bool peek(void* s, uint32_t size, index_t& index) const {
229 if (bytes_to_read_from_index(index) < size) {
230 FC_THROW_EXCEPTION( out_of_range_exception, "tried to peek ${r} but only ${s} left",
231 ("r", size)( "s", bytes_to_read_from_index( index ) ) );
232 }
233 if (index.second + size <= buffer_len) {
234 memcpy(s, get_ptr(index), size);
235 advance_index(index, size);
236 } else {
237 uint32_t num_in_buffer = buffer_len - index.second;
238 memcpy(s, get_ptr(index), num_in_buffer);
239 advance_index(index, num_in_buffer);
240 peek((char*)s + num_in_buffer, size - num_in_buffer, index);
241 }
242 return true;
243 }
244
245 /*
246 * Advances the supplied index along the buffer chain the specified
247 * number of bytes.
248 */
249 static void advance_index(index_t& index, uint32_t bytes) {
250 index.first += (bytes + index.second) / buffer_len;
251 index.second = (bytes + index.second) % buffer_len;
252 }
253
254 /*
255 * Creates an mb_datastream object that can be used with the
256 * fc library's unpack functionality.
257 */
259
260 /*
261 * Creates an mb_peek_datastream object that can be used with the
262 * fc library's unpack functionality.
263 */
265
266 private:
267 using buffer_type = std::array<char, buffer_len>;
268 using pool_type = boost::singleton_pool<message_buffer, sizeof(buffer_type)>;
269 static buffer_type* malloc() { return static_cast<buffer_type*>(pool_type::malloc()); }
270 static void free(buffer_type* ptr) { pool_type::free(ptr); }
271
272 /*
273 * Returns the character pointer associated with the supplied index.
274 */
275 char* get_ptr(const index_t& index) const {
276 return &buffers[index.first]->at(index.second);
277 }
278
279 std::deque<std::array<char, buffer_len>* > buffers;
280 index_t read_ind;
281 index_t write_ind;
282 size_t sanity_check;
283
284 };
285
286 /*
287 * @brief datastream adapter that adapts message_buffer for use with fc unpack
288 *
289 * This class supports unpack functionality but not pack.
290 */
291 template <uint32_t buffer_len>
293 public:
295
296 inline void skip( size_t s ) { mb.advance_read_ptr(s); }
297 inline bool read( char* d, size_t s ) {
298 if (mb.bytes_to_read() >= s) {
299 mb.read(d, s);
300 return true;
301 }
302 fc::detail::throw_datastream_range_error( "read", mb.bytes_to_read(), s - mb.bytes_to_read());
303 }
304
305 inline bool get( unsigned char& c ) { return mb.read(&c, 1); }
306 inline bool get( char& c ) { return mb.read(&c, 1); }
307
308 private:
310 };
311
312 template <uint32_t buffer_len>
316
317 /*
318 * @brief peek datastream adapter that adapts message_buffer for use with fc unpack
319 *
320 * This class supports unpack functionality but not pack.
321 */
322 template <uint32_t buffer_len>
324 public:
326 : mb( m ), index( m.read_index() ) {}
327
328 inline void skip( size_t s ) { message_buffer<buffer_len>::advance_index( index, s ); }
329
330 inline bool read( char* d, size_t s ) {
331 if( mb.bytes_to_read_from_index(index) >= s ) {
332 mb.peek( d, s, index );
333 return true;
334 }
336 mb.bytes_to_read_from_index(index), s - mb.bytes_to_read_from_index(index) );
337 }
338
339 inline bool get( unsigned char& c ) { return mb.peek( &c, 1, index ); }
340 inline bool get( char& c ) { return mb.peek( &c, 1, index ); }
341
342 private:
344 typename message_buffer<buffer_len>::index_t index{0,0};
345 };
346
347 template <uint32_t buffer_len>
351
352} // namespace fc
void skip(size_t s)
mb_datastream(message_buffer< buffer_len > &m)
bool get(unsigned char &c)
bool read(char *d, size_t s)
bool get(unsigned char &c)
bool read(char *d, size_t s)
mb_peek_datastream(const message_buffer< buffer_len > &m)
abstraction for a message buffer that spans a chain of physical buffers
uint32_t bytes_to_read_from_index(const index_t &ind) const
mb_datastream< buffer_len > create_datastream()
uint32_t bytes_to_read() const
void add_space(uint32_t bytes)
bool read(void *s, uint32_t size)
void advance_write_ptr(uint32_t bytes)
static void advance_index(index_t &index, uint32_t bytes)
std::vector< boost::asio::mutable_buffer > get_buffer_sequence_for_boost_async_read()
mb_peek_datastream< buffer_len > create_peek_datastream()
index_t write_index() const
bool peek(void *s, uint32_t size, index_t &index) const
void advance_read_ptr(uint32_t bytes)
std::pair< uint32_t, uint32_t > index_t
uint32_t total_bytes() const
index_t read_index() const
uint32_t bytes_to_write() const
Defines exception's used by fc.
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
#define elog(FORMAT,...)
Definition logger.hpp:130
NO_RETURN void throw_datastream_range_error(const char *file, size_t len, int64_t over)
Definition datastream.cpp:4
namespace sysio::chain
Definition authority.cpp:3
std::vector< char > bytes
Definition alt_bn128.hpp:10
unsigned int uint32_t
Definition stdint.h:126
CK_ULONG d
char * s
int seq
memcpy((char *) pInfo->slotDescription, s, l)