Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
buffered_iostream.cpp
Go to the documentation of this file.
3#include <boost/asio/streambuf.hpp>
4#include <iostream>
5
6#include <fc/log/logger.hpp>
7
8namespace fc
9{
10 namespace detail
11 {
13 {
14 public:
15 buffered_istream_impl( istream_ptr is ) :
16 _istr(fc::move(is))
17#ifndef NDEBUG
19#endif
20 {}
21
22 istream_ptr _istr;
23 boost::asio::streambuf _rdbuf;
24 std::shared_ptr<char> _shared_read_buffer;
25#ifndef NDEBUG
27#endif
28 };
29 static const size_t minimum_read_size = 1024;
30 }
31
33 :my( new detail::buffered_istream_impl( fc::move(is) ) )
34 {
35 FC_ASSERT( my->_istr != nullptr, " this shouldn't be null" );
36 }
37
40
42 {
43 my = fc::move(i.my);
44 return *this;
45 }
46
48
49 size_t buffered_istream::readsome( char* buf, size_t len )
50 {
51 size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(buf, len));
52 if (bytes_from_rdbuf)
53 return bytes_from_rdbuf;
54
55
56 if( len > detail::minimum_read_size )
57 return my->_istr->readsome(buf,len);
58
59 char tmp[detail::minimum_read_size];
60 size_t bytes_read = my->_istr->readsome( tmp, detail::minimum_read_size );
61
62 size_t bytes_to_deliver_immediately = std::min<size_t>(bytes_read,len);
63
64 memcpy( buf, tmp, bytes_to_deliver_immediately );
65
66
67 if( bytes_read > len )
68 {
69 my->_rdbuf.sputn( tmp + len, bytes_read - len );
70 }
71
72 return bytes_to_deliver_immediately;
73 }
74
75 size_t buffered_istream::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
76 {
77 size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(buf.get() + offset, len));
78 if (bytes_from_rdbuf)
79 return bytes_from_rdbuf;
80
81
82 if( len > detail::minimum_read_size )
83 return my->_istr->readsome(buf.get() + offset, len);
84
85#ifndef NDEBUG
86 // This code was written with the assumption that you'd only be making one call to readsome
87 // at a time so it reuses _shared_read_buffer. If you really need to make concurrent calls to
88 // readsome(), you'll need to prevent reusing _shared_read_buffer here
89 struct check_buffer_in_use {
90 bool& _buffer_in_use;
91 check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
92 ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
93 } buffer_in_use_checker(my->_shared_read_buffer_in_use);
94#endif
95
96 if (!my->_shared_read_buffer)
97 my->_shared_read_buffer.reset(new char[detail::minimum_read_size], [](char* p){ delete[] p; });
98 size_t bytes_read = my->_istr->readsome( my->_shared_read_buffer, detail::minimum_read_size, 0 );
99
100 size_t bytes_to_deliver_immediately = std::min<size_t>(bytes_read,len);
101
102 memcpy( buf.get() + offset, my->_shared_read_buffer.get(), bytes_to_deliver_immediately );
103
104 if( bytes_read > len )
105 {
106 my->_rdbuf.sputn( my->_shared_read_buffer.get() + len, bytes_read - len );
107 }
108
109 return bytes_to_deliver_immediately;
110 }
111
113 {
114 if( my->_rdbuf.size() )
115 {
116 return my->_rdbuf.sgetc();
117 }
118
119 char tmp[detail::minimum_read_size];
120 size_t bytes_read = my->_istr->readsome( tmp, detail::minimum_read_size );
121 my->_rdbuf.sputn( tmp, bytes_read );
122
123 if( my->_rdbuf.size() )
124 {
125 return my->_rdbuf.sgetc();
126 }
127 FC_THROW_EXCEPTION( assert_exception,
128 "at least one byte should be available, or eof should have been thrown" );
129 }
130
131
132 namespace detail
133 {
135 {
136 public:
137 buffered_ostream_impl( ostream_ptr os ) :
138 _ostr(fc::move(os))
139#ifndef NDEBUG
141#endif
142 {}
143
144 ostream_ptr _ostr;
145 boost::asio::streambuf _rdbuf;
146 std::shared_ptr<char> _shared_write_buffer;
147#ifndef NDEBUG
149#endif
150 };
151 }
152
153 buffered_ostream::buffered_ostream( ostream_ptr os, size_t bufsize )
154 :my( new detail::buffered_ostream_impl( fc::move(os) ) )
155 {
156 }
157
160
162 {
163 my = fc::move(i.my);
164 return *this;
165 }
166
168
169 size_t buffered_ostream::writesome( const char* buf, size_t len )
170 {
171 size_t written = static_cast<size_t>(my->_rdbuf.sputn( buf, len ));
172 if( written < len ) { flush(); }
173 return written + static_cast<size_t>(my->_rdbuf.sputn( buf+written, len-written ));
174 }
175
176 size_t buffered_ostream::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
177 {
178 return writesome(buf.get() + offset, len);
179 }
180
182 {
183#ifndef NDEBUG
184 // This code was written with the assumption that you'd only be making one call to flush
185 // at a time so it reuses _shared_write_buffer. If you really need to make concurrent calls to
186 // flush(), you'll need to prevent reusing _shared_write_buffer here
187 struct check_buffer_in_use {
188 bool& _buffer_in_use;
189 check_buffer_in_use(bool& buffer_in_use) : _buffer_in_use(buffer_in_use) { assert(!_buffer_in_use); _buffer_in_use = true; }
190 ~check_buffer_in_use() { assert(_buffer_in_use); _buffer_in_use = false; }
191 } buffer_in_use_checker(my->_shared_write_buffer_in_use);
192#endif
193 const size_t write_buffer_size = 2048;
194 if (!my->_shared_write_buffer)
195 my->_shared_write_buffer.reset(new char[write_buffer_size], [](char* p){ delete[] p; });
196
197 while( size_t bytes_from_rdbuf = static_cast<size_t>(my->_rdbuf.sgetn(my->_shared_write_buffer.get(), write_buffer_size)) )
198 my->_ostr->write( my->_shared_write_buffer, bytes_from_rdbuf );
199 my->_ostr->flush();
200 }
201
203 {
204 flush();
205 my->_ostr->close();
206 }
207
208
209}
const mie::Vuint & p
Definition bn.cpp:27
Reads data from an unbuffered stream and enables peek functionality.
buffered_istream(istream_ptr is)
virtual char peek() const
buffered_istream & operator=(buffered_istream &&i)
virtual std::size_t readsome(char *buf, std::size_t len)
buffered_ostream(ostream_ptr o, size_t bufsize=4096)
virtual size_t writesome(const char *buf, size_t len)
buffered_ostream & operator=(buffered_ostream &&m)
std::shared_ptr< char > _shared_read_buffer
std::shared_ptr< char > _shared_write_buffer
os_t os
Defines exception's used by fc.
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
namespace sysio::chain
Definition authority.cpp:3
size_t len
uint8_t buf[2048]
memcpy((char *) pInfo->slotDescription, s, l)