2#include <boost/asio/ip/tcp.hpp>
3#include <boost/pool/singleton_pool.hpp>
10 template <u
int32_t buffer_len>
12 template <u
int32_t buffer_len>
13 class mb_peek_datastream;
24 template <u
int32_t buffer_len>
32 typedef std::pair<uint32_t, uint32_t>
index_t;
34 message_buffer() : buffers{malloc()}, read_ind{0,0}, write_ind{0,0}, sanity_check (1) { }
37 while (buffers.size() > 0) {
60 return &buffers[read_ind.first]->at(read_ind.second);
68 return &buffers[write_ind.first]->at(write_ind.second);
77 buffers.push_back(malloc());
86 int buffers_to_add =
bytes / buffer_len + 1;
87 for (
int i = 0; i < buffers_to_add; i++) {
89 buffers.push_back(malloc());
101 if( buffers.size() != sanity_check || buffers.size() > 1000000) {
102 elog(
"read_ind = ${r1}, ${r2} write_ind = ${w1}, ${w2}, buff.size = ${bs}, sanity = ${s}",
103 (
"r1", read_ind.first )(
"r2", read_ind.second )(
"w1", write_ind.first )(
"w2", write_ind.second )
104 (
"bs", buffers.size() )(
"s", sanity_check ) );
105 elog(
"Buffer manager overwrite detected. Terminating to allow external restart");
108 while (buffers.size() > 1) {
110 free(buffers.back());
115 write_ind = { 0, 0 };
131 return (write_ind.first - ind.first) * buffer_len + write_ind.second - ind.second;
141 return total_bytes() - write_ind.first * buffer_len - write_ind.second;
148 return buffer_len * buffers.size();
161 if (read_ind == write_ind) {
163 }
else if (read_ind.first > 0) {
164 while (read_ind.first > 0) {
165 free(buffers.front());
180 while (write_ind.first >= buffers.size()) {
182 buffers.push_back(malloc());
193 std::vector<boost::asio::mutable_buffer>
seq;
194 FC_ASSERT(write_ind.first < buffers.size());
195 seq.push_back(boost::asio::buffer(&buffers[write_ind.first]->at(write_ind.second),
196 buffer_len - write_ind.second));
197 for (std::size_t i = write_ind.first + 1; i < buffers.size(); i++) {
198 seq.push_back(boost::asio::buffer(&buffers[i]->at(0), buffer_len));
212 if (read_ind.second + size <= buffer_len) {
216 uint32_t num_in_buffer = buffer_len - read_ind.second;
219 read((
char*)
s + num_in_buffer, size - num_in_buffer);
233 if (index.second + size <= buffer_len) {
234 memcpy(
s, get_ptr(index), size);
237 uint32_t num_in_buffer = buffer_len - index.second;
238 memcpy(
s, get_ptr(index), num_in_buffer);
240 peek((
char*)
s + num_in_buffer, size - num_in_buffer, index);
250 index.first += (
bytes + index.second) / buffer_len;
251 index.second = (
bytes + index.second) % buffer_len;
267 using buffer_type = std::array<char, buffer_len>;
268 using pool_type = boost::singleton_pool<
message_buffer,
sizeof(buffer_type)>;
269 static buffer_type* malloc() {
return static_cast<buffer_type*
>(pool_type::malloc()); }
270 static void free(buffer_type* ptr) { pool_type::free(ptr); }
275 char* get_ptr(
const index_t& index)
const {
276 return &buffers[index.first]->at(index.second);
279 std::deque<std::array<char, buffer_len>* > buffers;
291 template <u
int32_t buffer_len>
296 inline void skip(
size_t s ) { mb.advance_read_ptr(
s); }
297 inline bool read(
char* d,
size_t s ) {
298 if (mb.bytes_to_read() >=
s) {
305 inline bool get(
unsigned char& c ) {
return mb.read(&c, 1); }
306 inline bool get(
char& c ) {
return mb.read(&c, 1); }
312 template <u
int32_t buffer_len>
322 template <u
int32_t buffer_len>
326 : mb( m ), index( m.read_index() ) {}
330 inline bool read(
char* d,
size_t s ) {
331 if( mb.bytes_to_read_from_index(index) >=
s ) {
332 mb.peek(
d,
s, index );
336 mb.bytes_to_read_from_index(index),
s - mb.bytes_to_read_from_index(index) );
339 inline bool get(
unsigned char& c ) {
return mb.peek( &c, 1, index ); }
340 inline bool get(
char& c ) {
return mb.peek( &c, 1, index ); }
347 template <u
int32_t buffer_len>
mb_datastream(message_buffer< buffer_len > &m)
bool get(unsigned char &c)
bool read(char *d, size_t s)
bool get(unsigned char &c)
bool read(char *d, size_t s)
mb_peek_datastream(const message_buffer< buffer_len > &m)
abstraction for a message buffer that spans a chain of physical buffers
uint32_t bytes_to_read_from_index(const index_t &ind) const
mb_datastream< buffer_len > create_datastream()
uint32_t bytes_to_read() const
void add_space(uint32_t bytes)
bool read(void *s, uint32_t size)
void advance_write_ptr(uint32_t bytes)
static void advance_index(index_t &index, uint32_t bytes)
std::vector< boost::asio::mutable_buffer > get_buffer_sequence_for_boost_async_read()
mb_peek_datastream< buffer_len > create_peek_datastream()
index_t write_index() const
bool peek(void *s, uint32_t size, index_t &index) const
void advance_read_ptr(uint32_t bytes)
std::pair< uint32_t, uint32_t > index_t
uint32_t total_bytes() const
index_t read_index() const
uint32_t bytes_to_write() const
void add_buffer_to_chain()
Defines exception's used by fc.
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
NO_RETURN void throw_datastream_range_error(const char *file, size_t len, int64_t over)
std::vector< char > bytes
memcpy((char *) pInfo->slotDescription, s, l)