Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
sysio::state_history_plugin_impl::session Struct Reference
Inheritance diagram for sysio::state_history_plugin_impl::session:
Collaboration diagram for sysio::state_history_plugin_impl::session:

Public Types

using result_type = void
 

Public Member Functions

 session (std::shared_ptr< state_history_plugin_impl > plugin)
 
void start (tcp::socket socket)
 
void start_read ()
 
void send (const char *s)
 
template<typename T >
void send (T obj)
 
void send ()
 
void operator() (get_status_request_v0 &)
 
void operator() (get_blocks_request_v0 &req)
 
void operator() (get_blocks_ack_request_v0 &req)
 
void send_update (get_blocks_result_v0 result, const block_state_ptr &block_state)
 
void send_update (const block_state_ptr &block_state)
 
void send_update (bool changed=false)
 
template<typename F >
void catch_and_close (F f)
 
template<typename F >
void callback (boost::system::error_code ec, const char *what, F f)
 
void on_fail (boost::system::error_code ec, const char *what)
 
void close ()
 

Public Attributes

std::shared_ptr< state_history_plugin_implplugin
 
std::unique_ptr< ws::stream< tcp::socket > > socket_stream
 
bool sending = false
 
bool sent_abi = false
 
std::vector< std::vector< char > > send_queue
 
std::optional< get_blocks_request_v0current_request
 
bool need_to_send_update = false
 

Detailed Description

Definition at line 100 of file state_history_plugin.cpp.

Member Typedef Documentation

◆ result_type

Constructor & Destructor Documentation

◆ session()

sysio::state_history_plugin_impl::session::session ( std::shared_ptr< state_history_plugin_impl > plugin)
inline

Definition at line 109 of file state_history_plugin.cpp.

110 : plugin(std::move(plugin)) {}
std::shared_ptr< state_history_plugin_impl > plugin

Member Function Documentation

◆ callback()

template<typename F >
void sysio::state_history_plugin_impl::session::callback ( boost::system::error_code ec,
const char * what,
F f )
inline

Definition at line 281 of file state_history_plugin.cpp.

281 {
282 app().post(priority::medium, [=]() {
283 if (plugin->stopping)
284 return;
285 if (ec)
286 return on_fail(ec, what);
288 });
289 }
auto post(int priority, Func &&func)
application & app()
static constexpr int medium
void on_fail(boost::system::error_code ec, const char *what)
Here is the call graph for this function:

◆ catch_and_close()

template<typename F >
void sysio::state_history_plugin_impl::session::catch_and_close ( F f)
inline

Definition at line 265 of file state_history_plugin.cpp.

265 {
266 try {
267 f();
268 } catch (const fc::exception& e) {
269 elog("${e}", ("e", e.to_detail_string()));
270 close();
271 } catch (const std::exception& e) {
272 elog("${e}", ("e", e.what()));
273 close();
274 } catch (...) {
275 elog("unknown exception");
276 close();
277 }
278 }
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_detail_string(log_level ll=log_level::all) const
#define elog(FORMAT,...)
Definition logger.hpp:130
Here is the call graph for this function:
Here is the caller graph for this function:

◆ close()

void sysio::state_history_plugin_impl::session::close ( )
inline

Definition at line 304 of file state_history_plugin.cpp.

304 {
305 socket_stream->next_layer().close();
306 plugin->sessions.erase(this);
307 }
std::unique_ptr< ws::stream< tcp::socket > > socket_stream
Here is the caller graph for this function:

◆ on_fail()

void sysio::state_history_plugin_impl::session::on_fail ( boost::system::error_code ec,
const char * what )
inline

Definition at line 291 of file state_history_plugin.cpp.

291 {
292 try {
293 if (ec == boost::asio::error::eof) {
294 dlog("${w}: ${m}", ("w", what)("m", ec.message()));
295 } else {
296 elog("${w}: ${m}", ("w", what)("m", ec.message()));
297 }
298 close();
299 } catch (...) {
300 elog("uncaught exception on close");
301 }
302 }
#define dlog(FORMAT,...)
Definition logger.hpp:101
Here is the call graph for this function:
Here is the caller graph for this function:

◆ operator()() [1/3]

void sysio::state_history_plugin_impl::session::operator() ( get_blocks_ack_request_v0 & req)
inline

Definition at line 204 of file state_history_plugin.cpp.

204 {
205 if (!current_request)
206 return;
207 current_request->max_messages_in_flight += req.num_messages;
208 send_update();
209 }
void send_update(get_blocks_result_v0 result, const block_state_ptr &block_state)
std::optional< get_blocks_request_v0 > current_request
Here is the call graph for this function:

◆ operator()() [2/3]

void sysio::state_history_plugin_impl::session::operator() ( get_blocks_request_v0 & req)
inline

Definition at line 191 of file state_history_plugin.cpp.

191 {
192 for (auto& cp : req.have_positions) {
193 if (req.start_block_num <= cp.block_num)
194 continue;
195 auto id = plugin->get_block_id(cp.block_num);
196 if (!id || *id != cp.block_id)
197 req.start_block_num = std::min(req.start_block_num, cp.block_num);
198 }
199 req.have_positions.clear();
200 current_request = req;
201 send_update(true);
202 }
Here is the call graph for this function:

◆ operator()() [3/3]

void sysio::state_history_plugin_impl::session::operator() ( get_status_request_v0 & )
inline

Definition at line 174 of file state_history_plugin.cpp.

174 {
175 auto& chain = plugin->chain_plug->chain();
176 get_status_result_v0 result;
177 result.head = {chain.head_block_num(), chain.head_block_id()};
178 result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
179 result.chain_id = chain.get_chain_id();
180 if (plugin->trace_log) {
181 result.trace_begin_block = plugin->trace_log->begin_block();
182 result.trace_end_block = plugin->trace_log->end_block();
183 }
184 if (plugin->chain_state_log) {
185 result.chain_state_begin_block = plugin->chain_state_log->begin_block();
186 result.chain_state_end_block = plugin->chain_state_log->end_block();
187 }
188 send(std::move(result));
189 }
Here is the call graph for this function:

◆ send() [1/3]

void sysio::state_history_plugin_impl::session::send ( )
inline

Definition at line 154 of file state_history_plugin.cpp.

154 {
155 if (sending)
156 return;
157 if (send_queue.empty())
158 return send_update();
159 sending = true;
160 socket_stream->binary(sent_abi);
161 sent_abi = true;
162 socket_stream->async_write( //
163 boost::asio::buffer(send_queue[0]), //
164 [self = shared_from_this()](boost::system::error_code ec, size_t) {
165 self->callback(ec, "async_write", [self] {
166 self->send_queue.erase(self->send_queue.begin());
167 self->sending = false;
168 self->send();
169 });
170 });
171 }
@ self
the connection is to itself
Definition protocol.hpp:48
std::vector< std::vector< char > > send_queue
Here is the call graph for this function:
Here is the caller graph for this function:

◆ send() [2/3]

void sysio::state_history_plugin_impl::session::send ( const char * s)
inline

Definition at line 143 of file state_history_plugin.cpp.

143 {
144 send_queue.push_back({s, s + strlen(s)});
145 send();
146 }
char * s
Here is the call graph for this function:

◆ send() [3/3]

template<typename T >
void sysio::state_history_plugin_impl::session::send ( T obj)
inline

Definition at line 149 of file state_history_plugin.cpp.

149 {
150 send_queue.push_back(fc::raw::pack(state_result{std::move(obj)}));
151 send();
152 }
void pack(Stream &s, const std::deque< T > &value)
Definition raw.hpp:531
std::variant< get_status_result_v0, get_blocks_result_v0 > state_result
Definition types.hpp:115
Here is the call graph for this function:

◆ send_update() [1/3]

void sysio::state_history_plugin_impl::session::send_update ( bool changed = false)
inline

Definition at line 252 of file state_history_plugin.cpp.

252 {
253 if (changed)
254 need_to_send_update = true;
255 if (!send_queue.empty() || !need_to_send_update || !current_request ||
256 !current_request->max_messages_in_flight)
257 return;
258 auto& chain = plugin->chain_plug->chain();
259 get_blocks_result_v0 result;
260 result.head = {chain.head_block_num(), chain.head_block_id()};
261 send_update(std::move(result), {});
262 }
Here is the call graph for this function:

◆ send_update() [2/3]

void sysio::state_history_plugin_impl::session::send_update ( const block_state_ptr & block_state)
inline

Definition at line 243 of file state_history_plugin.cpp.

243 {
244 need_to_send_update = true;
245 if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
246 return;
247 get_blocks_result_v0 result;
248 result.head = {block_state->block_num, block_state->id};
249 send_update(std::move(result), block_state);
250 }
Here is the call graph for this function:

◆ send_update() [3/3]

void sysio::state_history_plugin_impl::session::send_update ( get_blocks_result_v0 result,
const block_state_ptr & block_state )
inline

Definition at line 211 of file state_history_plugin.cpp.

211 {
212 need_to_send_update = true;
213 if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
214 return;
215 auto& chain = plugin->chain_plug->chain();
216 result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
217 uint32_t current =
218 current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;
219 if (current_request->start_block_num <= current &&
220 current_request->start_block_num < current_request->end_block_num) {
221 auto block_id = plugin->get_block_id(current_request->start_block_num);
222 if (block_id) {
223 result.this_block = block_position{current_request->start_block_num, *block_id};
224 auto prev_block_id = plugin->get_block_id(current_request->start_block_num - 1);
225 if (prev_block_id)
226 result.prev_block = block_position{current_request->start_block_num - 1, *prev_block_id};
227 if (current_request->fetch_block) {
228 plugin->get_block( current_request->start_block_num, block_state, result.block );
229 }
230 if (current_request->fetch_traces && plugin->trace_log)
231 plugin->get_log_entry(*plugin->trace_log, current_request->start_block_num, result.traces);
232 if (current_request->fetch_deltas && plugin->chain_state_log)
233 plugin->get_log_entry(*plugin->chain_state_log, current_request->start_block_num, result.deltas);
234 }
235 ++current_request->start_block_num;
236 }
237 send(std::move(result));
238 --current_request->max_messages_in_flight;
239 need_to_send_update = current_request->start_block_num <= current &&
240 current_request->start_block_num < current_request->end_block_num;
241 }
unsigned int uint32_t
Definition stdint.h:126
Here is the call graph for this function:
Here is the caller graph for this function:

◆ start()

void sysio::state_history_plugin_impl::session::start ( tcp::socket socket)
inline

Definition at line 112 of file state_history_plugin.cpp.

112 {
113 ilog("incoming connection");
114 socket_stream = std::make_unique<ws::stream<tcp::socket>>(std::move(socket));
115 socket_stream->binary(true);
116 socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(true));
117 socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
118 socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024));
119 socket_stream->async_accept([self = shared_from_this()](boost::system::error_code ec) {
120 self->callback(ec, "async_accept", [self] {
121 self->start_read();
123 });
124 });
125 }
const char *const state_history_plugin_abi
#define ilog(FORMAT,...)
Definition logger.hpp:118

◆ start_read()

void sysio::state_history_plugin_impl::session::start_read ( )
inline

Definition at line 127 of file state_history_plugin.cpp.

127 {
128 auto in_buffer = std::make_shared<boost::beast::flat_buffer>();
129 socket_stream->async_read(
130 *in_buffer, [self = shared_from_this(), in_buffer](boost::system::error_code ec, size_t) {
131 self->callback(ec, "async_read", [self, in_buffer] {
132 auto d = boost::asio::buffer_cast<char const*>(boost::beast::buffers_front(in_buffer->data()));
133 auto s = boost::asio::buffer_size(in_buffer->data());
135 state_request req;
136 fc::raw::unpack(ds, req);
137 std::visit(*self, req);
138 self->start_read();
139 });
140 });
141 }
static const Segment ds(Segment::ds)
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
std::variant< get_status_request_v0, get_blocks_request_v0, get_blocks_ack_request_v0 > state_request
Definition types.hpp:114
CK_ULONG d
Here is the call graph for this function:

Member Data Documentation

◆ current_request

std::optional<get_blocks_request_v0> sysio::state_history_plugin_impl::session::current_request

Definition at line 106 of file state_history_plugin.cpp.

◆ need_to_send_update

bool sysio::state_history_plugin_impl::session::need_to_send_update = false

Definition at line 107 of file state_history_plugin.cpp.

◆ plugin

std::shared_ptr<state_history_plugin_impl> sysio::state_history_plugin_impl::session::plugin

Definition at line 101 of file state_history_plugin.cpp.

◆ send_queue

std::vector<std::vector<char> > sysio::state_history_plugin_impl::session::send_queue

Definition at line 105 of file state_history_plugin.cpp.

◆ sending

bool sysio::state_history_plugin_impl::session::sending = false

Definition at line 103 of file state_history_plugin.cpp.

◆ sent_abi

bool sysio::state_history_plugin_impl::session::sent_abi = false

Definition at line 104 of file state_history_plugin.cpp.

◆ socket_stream

std::unique_ptr<ws::stream<tcp::socket> > sysio::state_history_plugin_impl::session::socket_stream

Definition at line 102 of file state_history_plugin.cpp.


The documentation for this struct was generated from the following file: