Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
local_endpoint.hpp
Go to the documentation of this file.
1#pragma once
2
6
7#include <websocketpp/uri.hpp>
9
11
12#include <sstream>
13#include <string>
14
15namespace websocketpp {
16namespace transport {
17namespace asio {
18
19namespace basic_socket {
20
21class local_connection : public lib::enable_shared_from_this<local_connection> {
22public:
26 typedef lib::shared_ptr<type> ptr;
27
29 typedef lib::asio::io_service* io_service_ptr;
31 typedef lib::shared_ptr<lib::asio::io_service::strand> strand_ptr;
33 typedef lib::asio::local::stream_protocol::socket socket_type;
35 typedef lib::shared_ptr<socket_type> socket_ptr;
36
37 explicit local_connection() : m_state(UNINITIALIZED) {
38 }
39
41 return shared_from_this();
42 }
43
44 bool is_secure() const {
45 return false;
46 }
47
48 lib::asio::local::stream_protocol::socket & get_socket() {
49 return *m_socket;
50 }
51
52 lib::asio::local::stream_protocol::socket & get_next_layer() {
53 return *m_socket;
54 }
55
56 lib::asio::local::stream_protocol::socket & get_raw_socket() {
57 return *m_socket;
58 }
59
60 std::string get_remote_endpoint(lib::error_code & ec) const {
61 return "UNIX Socket Endpoint";
62 }
63
64 void pre_init(init_handler callback) {
65 if (m_state != READY) {
67 return;
68 }
69
70 m_state = READING;
71
72 callback(lib::error_code());
73 }
74
75 void post_init(init_handler callback) {
76 callback(lib::error_code());
77 }
78protected:
79 lib::error_code init_asio (io_service_ptr service, strand_ptr, bool)
80 {
81 if (m_state != UNINITIALIZED) {
83 }
84
85 m_socket = lib::make_shared<lib::asio::local::stream_protocol::socket>(*service);
86
87 m_state = READY;
88
89 return lib::error_code();
90 }
91
93 m_hdl = hdl;
94 }
95
96 lib::asio::error_code cancel_socket() {
97 lib::asio::error_code ec;
98 m_socket->cancel(ec);
99 return ec;
100 }
101
103 lib::asio::error_code ec;
104 m_socket->shutdown(lib::asio::ip::tcp::socket::shutdown_both, ec);
105 h(ec);
106 }
107
108 lib::error_code get_ec() const {
109 return lib::error_code();
110 }
111
112 template <typename ErrorCodeType>
113 lib::error_code translate_ec(ErrorCodeType) {
114 return make_error_code(transport::error::pass_through);
115 }
116
117 lib::error_code translate_ec(lib::error_code ec) {
118 return ec;
119 }
120private:
121 enum state {
122 UNINITIALIZED = 0,
123 READY = 1,
124 READING = 2
125 };
126
127 socket_ptr m_socket;
128 state m_state;
129
130 connection_hdl m_hdl;
131 socket_init_handler m_socket_init_handler;
132};
133
135public:
138
144
145 explicit local_endpoint() {}
146
147 bool is_secure() const {
148 return false;
149 }
150};
151}
152
154
158template <typename config>
159class local_endpoint : public config::socket_type {
160public:
163
165 typedef typename config::concurrency_type concurrency_type;
167 typedef typename config::socket_type socket_type;
169 typedef typename config::elog_type elog_type;
171 typedef typename config::alog_type alog_type;
172
174 typedef typename socket_type::socket_con_type socket_con_type;
176 typedef typename socket_con_type::ptr socket_con_ptr;
177
180 typedef asio::connection<config> transport_con_type;
183 typedef typename transport_con_type::ptr transport_con_ptr;
184
186 typedef lib::asio::io_service * io_service_ptr;
188 typedef lib::shared_ptr<lib::asio::local::stream_protocol::acceptor> acceptor_ptr;
190 typedef lib::shared_ptr<lib::asio::steady_timer> timer_ptr;
192 typedef lib::shared_ptr<lib::asio::io_service::work> work_ptr;
193
194 // generate and manage our own io_service
195 explicit local_endpoint()
196 : m_io_service(NULL)
197 , m_state(UNINITIALIZED)
198 {
199 //std::cout << "transport::asio::endpoint constructor" << std::endl;
200 }
201
203 if (m_acceptor && m_state == LISTENING)
204 ::unlink(m_acceptor->local_endpoint().path().c_str());
205
206 // Explicitly destroy local objects
207 m_acceptor.reset();
208 m_work.reset();
209 }
210
214#ifdef _WEBSOCKETPP_DEFAULT_DELETE_FUNCTIONS_
215 local_endpoint(const local_endpoint & src) = delete;
216 local_endpoint& operator= (const local_endpoint & rhs) = delete;
217#else
218private:
219 local_endpoint(const local_endpoint & src);
220 local_endpoint & operator= (const local_endpoint & rhs);
221public:
222#endif // _WEBSOCKETPP_DEFAULT_DELETE_FUNCTIONS_
223
224#ifdef _WEBSOCKETPP_MOVE_SEMANTICS_
226 : config::socket_type(std::move(src))
227 , m_tcp_pre_init_handler(src.m_tcp_pre_init_handler)
228 , m_tcp_post_init_handler(src.m_tcp_post_init_handler)
229 , m_io_service(src.m_io_service)
230 , m_acceptor(src.m_acceptor)
231 , m_elog(src.m_elog)
232 , m_alog(src.m_alog)
233 , m_state(src.m_state)
234 {
235 src.m_io_service = NULL;
236 src.m_acceptor = NULL;
237 src.m_state = UNINITIALIZED;
238 }
239
240#endif // _WEBSOCKETPP_MOVE_SEMANTICS_
241
243 bool is_secure() const {
244 return socket_type::is_secure();
245 }
246
248
256 void init_asio(io_service_ptr ptr, lib::error_code & ec) {
257 if (m_state != UNINITIALIZED) {
258 m_elog->write(log::elevel::library,
259 "asio::init_asio called from the wrong state");
261 ec = make_error_code(websocketpp::error::invalid_state);
262 return;
263 }
264
265 m_alog->write(log::alevel::devel,"asio::init_asio");
266
267 m_io_service = ptr;
268 m_acceptor = lib::make_shared<lib::asio::local::stream_protocol::acceptor>(*m_io_service);
269
270 m_state = READY;
271 ec = lib::error_code();
272 }
273
275
283 lib::error_code ec;
284 init_asio(ptr,ec);
285 if (ec) { throw exception(ec); }
286 }
287
289
299 m_tcp_pre_init_handler = h;
300 }
301
303
315
317
328 m_tcp_post_init_handler = h;
329 }
330
332
342 lib::asio::io_service & get_io_service() {
343 return *m_io_service;
344 }
345
347
354 void listen(lib::asio::local::stream_protocol::endpoint const & ep, lib::error_code & ec)
355 {
356 if (m_state != READY) {
357 m_elog->write(log::elevel::library,
358 "asio::listen called from the wrong state");
360 ec = make_error_code(websocketpp::error::invalid_state);
361 return;
362 }
363
364 m_alog->write(log::alevel::devel,"asio::listen");
365
366 lib::asio::error_code bec;
367
368 {
369 boost::system::error_code test_ec;
370 lib::asio::local::stream_protocol::socket test_socket(get_io_service());
371 test_socket.connect(ep, test_ec);
372
373 //looks like a service is already running on that socket, probably another kiod, don't touch it
374 if(test_ec == boost::system::errc::success)
375 bec = boost::system::errc::make_error_code(boost::system::errc::address_in_use);
376 //socket exists but no one home, go ahead and remove it and continue on
377 else if(test_ec == boost::system::errc::connection_refused)
378 ::unlink(ep.path().c_str());
379 else if(test_ec != boost::system::errc::no_such_file_or_directory)
380 bec = test_ec;
381 }
382
383 if (!bec) {
384 m_acceptor->open(ep.protocol(),bec);
385 }
386 if (!bec) {
387 m_acceptor->bind(ep,bec);
388 }
389 if (!bec) {
390 m_acceptor->listen(boost::asio::socket_base::max_listen_connections,bec);
391 }
392 if (bec) {
393 if (m_acceptor->is_open()) {
394 m_acceptor->close();
395 }
396 log_err(log::elevel::info,"asio listen",bec);
397 ec = bec;
398 } else {
399 m_state = LISTENING;
400 ec = lib::error_code();
401 }
402 }
403
405
410 void listen(lib::asio::local::stream_protocol::endpoint const & ep) {
411 lib::error_code ec;
412 listen(ep,ec);
413 if (ec) { throw exception(ec); }
414 }
415
417
424 void stop_listening(lib::error_code & ec) {
425 if (m_state != LISTENING) {
426 m_elog->write(log::elevel::library,
427 "asio::listen called from the wrong state");
429 ec = make_error_code(websocketpp::error::invalid_state);
430 return;
431 }
432
433 ::unlink(m_acceptor->local_endpoint().path().c_str());
434 m_acceptor->close();
435 m_state = READY;
436 ec = lib::error_code();
437 }
438
440
447 lib::error_code ec;
448 stop_listening(ec);
449 if (ec) { throw exception(ec); }
450 }
451
453
456 bool is_listening() const {
457 return (m_state == LISTENING);
458 }
459
461 std::size_t run() {
462 return m_io_service->run();
463 }
464
466
469 std::size_t run_one() {
470 return m_io_service->run_one();
471 }
472
474 void stop() {
475 m_io_service->stop();
476 }
477
479 std::size_t poll() {
480 return m_io_service->poll();
481 }
482
484 std::size_t poll_one() {
485 return m_io_service->poll_one();
486 }
487
489 void reset() {
490 m_io_service->reset();
491 }
492
494 bool stopped() const {
495 return m_io_service->stopped();
496 }
497
499
511 m_work = lib::make_shared<lib::asio::io_service::work>(
512 lib::ref(*m_io_service)
513 );
514 }
515
517
525 m_work.reset();
526 }
527
529
540 timer_ptr set_timer(long duration, timer_handler callback) {
541 timer_ptr new_timer = lib::make_shared<lib::asio::steady_timer>(
542 *m_io_service,
544 );
545
546 new_timer->async_wait(
547 lib::bind(
549 this,
550 new_timer,
551 callback,
552 lib::placeholders::_1
553 )
554 );
555
556 return new_timer;
557 }
558
560
569 lib::asio::error_code const & ec)
570 {
571 if (ec) {
572 if (ec == lib::asio::error::operation_aborted) {
573 callback(make_error_code(transport::error::operation_aborted));
574 } else {
575 m_elog->write(log::elevel::info,
576 "asio handle_timer error: "+ec.message());
577 log_err(log::elevel::info,"asio handle_timer",ec);
578 callback(ec);
579 }
580 } else {
581 callback(lib::error_code());
582 }
583 }
584
586
592 lib::error_code & ec)
593 {
594 if (m_state != LISTENING) {
597 return;
598 }
599
600 m_alog->write(log::alevel::devel, "asio::async_accept");
601
602 if (config::enable_multithreading) {
603 m_acceptor->async_accept(
604 tcon->get_raw_socket(),
605 tcon->get_strand()->wrap(lib::bind(
607 this,
608 callback,
609 lib::placeholders::_1
610 ))
611 );
612 } else {
613 m_acceptor->async_accept(
614 tcon->get_raw_socket(),
615 lib::bind(
617 this,
618 callback,
619 lib::placeholders::_1
620 )
621 );
622 }
623 }
624
626
631 lib::error_code ec;
632 async_accept(tcon,callback,ec);
633 if (ec) { throw exception(ec); }
634 }
635protected:
637
647 m_alog = a;
648 m_elog = e;
649 }
650
651 void handle_accept(accept_handler callback, lib::asio::error_code const &
652 asio_ec)
653 {
654 lib::error_code ret_ec;
655
656 m_alog->write(log::alevel::devel, "asio::handle_accept");
657
658 if (asio_ec) {
659 if (asio_ec == lib::asio::errc::operation_canceled) {
660 ret_ec = make_error_code(websocketpp::error::operation_canceled);
661 } else {
662 log_err(log::elevel::info,"asio handle_accept",asio_ec);
663 ret_ec = asio_ec;
664 }
665 }
666
667 callback(ret_ec);
668 }
669
671
681 connect_handler callback, lib::error_code const & ec)
682 {
683 lib::error_code ret_ec;
684
685 if (ec) {
687 m_alog->write(log::alevel::devel,
688 "asio handle_connect_timeout timer cancelled");
689 return;
690 }
691
692 log_err(log::elevel::devel,"asio handle_connect_timeout",ec);
693 ret_ec = ec;
694 } else {
695 ret_ec = make_error_code(transport::error::timeout);
696 }
697
698 m_alog->write(log::alevel::devel,"TCP connect timed out");
699 tcon->cancel_socket_checked();
700 callback(ret_ec);
701 }
702
704 connect_handler callback, lib::asio::error_code const & ec)
705 {
706 if (ec == lib::asio::error::operation_aborted ||
707 lib::asio::is_neg(con_timer->expires_from_now()))
708 {
709 m_alog->write(log::alevel::devel,"async_connect cancelled");
710 return;
711 }
712
713 con_timer->cancel();
714
715 if (ec) {
716 log_err(log::elevel::info,"asio async_connect",ec);
717 callback(ec);
718 return;
719 }
720
721 if (m_alog->static_test(log::alevel::devel)) {
722 m_alog->write(log::alevel::devel,
723 "Async connect to "+tcon->get_remote_endpoint()+" successful.");
724 }
725
726 callback(lib::error_code());
727 }
728
730
740 lib::error_code init(transport_con_ptr tcon) {
741 m_alog->write(log::alevel::devel, "transport::asio::init");
742
743 lib::error_code ec;
744
745 ec = tcon->init_asio(m_io_service);
746 if (ec) {return ec;}
747
748 tcon->set_tcp_pre_init_handler(m_tcp_pre_init_handler);
749 tcon->set_tcp_post_init_handler(m_tcp_post_init_handler);
750
751 return lib::error_code();
752 }
753private:
755 template <typename error_type>
756 void log_err(log::level l, char const * msg, error_type const & ec) {
757 std::stringstream s;
758 s << msg << " error: " << ec << " (" << ec.message() << ")";
759 m_elog->write(l,s.str());
760 }
761
762 enum state {
763 UNINITIALIZED = 0,
764 READY = 1,
765 LISTENING = 2
766 };
767
768 // Handlers
769 tcp_init_handler m_tcp_pre_init_handler;
770 tcp_init_handler m_tcp_post_init_handler;
771
772 // Network Resources
773 io_service_ptr m_io_service;
774 acceptor_ptr m_acceptor;
775 work_ptr m_work;
776
777 elog_type* m_elog;
778 alog_type* m_alog;
779
780 // Transport state
781 state m_state;
782};
783
784} // namespace asio
785} // namespace transport
786} // namespace websocketpp
lib::asio::local::stream_protocol::socket socket_type
Type of the ASIO socket being used.
lib::shared_ptr< lib::asio::io_service::strand > strand_ptr
Type of a pointer to the Asio io_service strand being used.
lib::asio::local::stream_protocol::socket & get_raw_socket()
lib::shared_ptr< socket_type > socket_ptr
Type of a shared pointer to the socket being used.
lib::shared_ptr< type > ptr
Type of a shared pointer to this connection socket component.
lib::asio::local::stream_protocol::socket & get_socket()
lib::asio::local::stream_protocol::socket & get_next_layer()
lib::error_code init_asio(io_service_ptr service, strand_ptr, bool)
local_connection type
Type of this connection socket component.
lib::asio::io_service * io_service_ptr
Type of a pointer to the Asio io_service being used.
std::string get_remote_endpoint(lib::error_code &ec) const
local_connection socket_con_type
The type of the corresponding connection socket component.
local_endpoint type
The type of this endpoint socket component.
Asio based endpoint transport component.
lib::asio::io_service & get_io_service()
Retrieve a reference to the endpoint's io_service.
void handle_connect_timeout(transport_con_ptr tcon, timer_ptr, connect_handler callback, lib::error_code const &ec)
Asio connect timeout handler.
void reset()
wraps the reset method of the internal io_service object
config::alog_type alog_type
Type of the access logging policy.
bool is_secure() const
Return whether or not the endpoint produces secure connections.
void stop()
wraps the stop method of the internal io_service object
void listen(lib::asio::local::stream_protocol::endpoint const &ep)
Set up endpoint for listening manually.
std::size_t run()
wraps the run method of the internal io_service object
lib::shared_ptr< lib::asio::io_service::work > work_ptr
Type of a shared pointer to an io_service work object.
bool is_listening() const
Check if the endpoint is listening.
lib::shared_ptr< lib::asio::steady_timer > timer_ptr
Type of timer handle.
void handle_accept(accept_handler callback, lib::asio::error_code const &asio_ec)
config::socket_type socket_type
Type of the socket policy.
lib::error_code init(transport_con_ptr tcon)
Initialize a connection.
void init_asio(io_service_ptr ptr, lib::error_code &ec)
initialize asio transport with external io_service (exception free)
void set_tcp_pre_init_handler(tcp_init_handler h)
Sets the tcp pre init handler.
timer_ptr set_timer(long duration, timer_handler callback)
Call back a function after a period of time.
std::size_t poll()
wraps the poll method of the internal io_service object
void init_logging(alog_type *a, elog_type *e)
Initialize logging.
void init_asio(io_service_ptr ptr)
initialize asio transport with external io_service
local_endpoint< config > type
Type of this endpoint transport component.
config::elog_type elog_type
Type of the error logging policy.
void async_accept(transport_con_ptr tcon, accept_handler callback)
Accept the next connection attempt and assign it to con.
socket_con_type::ptr socket_con_ptr
Type of a shared pointer to the socket connection component.
socket_type::socket_con_type socket_con_type
Type of the socket connection component.
void handle_connect(transport_con_ptr tcon, timer_ptr con_timer, connect_handler callback, lib::asio::error_code const &ec)
void set_tcp_post_init_handler(tcp_init_handler h)
Sets the tcp post init handler.
void handle_timer(timer_ptr, timer_handler callback, lib::asio::error_code const &ec)
Timer handler.
config::concurrency_type concurrency_type
Type of the concurrency policy.
lib::asio::io_service * io_service_ptr
Type of a pointer to the ASIO io_service being used.
void set_tcp_init_handler(tcp_init_handler h)
Sets the tcp pre init handler (deprecated)
void stop_listening(lib::error_code &ec)
Stop listening (exception free)
void stop_perpetual()
Clears the endpoint's perpetual flag, allowing it to exit when empty.
lib::shared_ptr< lib::asio::local::stream_protocol::acceptor > acceptor_ptr
Type of a shared pointer to the acceptor being used.
bool stopped() const
wraps the stopped method of the internal io_service object
std::size_t poll_one()
wraps the poll_one method of the internal io_service object
void start_perpetual()
Marks the endpoint as perpetual, stopping it from exiting when empty.
void listen(lib::asio::local::stream_protocol::endpoint const &ep, lib::error_code &ec)
Set up endpoint for listening manually (exception free)
std::size_t run_one()
wraps the run_one method of the internal io_service object
void async_accept(transport_con_ptr tcon, accept_handler callback, lib::error_code &ec)
Accept the next connection attempt and assign it to con (exception free)
Definition name.hpp:106
@ operation_canceled
The requested operation was canceled.
Definition error.hpp:127
@ invalid_state
The connection was in the wrong state for this operation.
Definition error.hpp:74
lib::error_code make_error_code(error::value e)
Definition error.hpp:235
bool is_neg(T duration)
Definition asio.hpp:114
boost::posix_time::time_duration milliseconds(long duration)
Definition asio.hpp:117
uint32_t level
Type of a channel package.
Definition levels.hpp:37
lib::function< void(connection_hdl, lib::asio::ip::tcp::socket &)> socket_init_handler
The signature of the socket init handler for this socket policy.
Definition none.hpp:51
@ invalid_state
A function was called in a state that it was illegal to do so.
Definition base.hpp:86
lib::function< void(lib::asio::error_code const &)> shutdown_handler
Definition base.hpp:67
lib::error_code make_error_code(error::value e)
Definition base.hpp:147
lib::function< void(connection_hdl)> tcp_init_handler
@ pass_through
underlying transport pass through
@ operation_aborted
Operation aborted.
lib::function< void(lib::error_code const &)> accept_handler
The type and signature of the callback passed to the accept method.
Definition endpoint.hpp:69
lib::function< void(lib::error_code const &)> timer_handler
The type and signature of the callback passed to the read method.
lib::function< void(lib::error_code const &)> connect_handler
The type and signature of the callback passed to the connect method.
Definition endpoint.hpp:72
lib::function< void(lib::error_code const &)> init_handler
The type and signature of the callback passed to the init hook.
Namespace for the WebSocket++ project.
Definition base64.hpp:41
lib::weak_ptr< void > connection_hdl
A handle to uniquely identify a connection.
const GenericPointer< typename T::ValueType > T2 T::AllocatorType & a
Definition pointer.h:1181
static level const devel
Development messages (warning: very chatty)
Definition levels.hpp:141
static level const devel
Low level debugging information (warning: very chatty)
Definition levels.hpp:63
static level const library
Definition levels.hpp:66
static level const info
Definition levels.hpp:69
char * s
int l