Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
broadcast_server.cpp
Go to the documentation of this file.
2
4
5#include <iostream>
6#include <set>
7
8/*#include <boost/thread.hpp>
9#include <boost/thread/mutex.hpp>
10#include <boost/thread/condition_variable.hpp>*/
12
14
16using websocketpp::lib::placeholders::_1;
17using websocketpp::lib::placeholders::_2;
18using websocketpp::lib::bind;
19
20using websocketpp::lib::thread;
21using websocketpp::lib::mutex;
22using websocketpp::lib::lock_guard;
23using websocketpp::lib::unique_lock;
24using websocketpp::lib::condition_variable;
25
26/* on_open insert connection_hdl into channel
27 * on_close remove connection_hdl from channel
28 * on_message queue send to all channels
29 */
30
36
46
47class broadcast_server {
48public:
50 // Initialize Asio Transport
51 m_server.init_asio();
52
53 // Register handler callbacks
54 m_server.set_open_handler(bind(&broadcast_server::on_open,this,::_1));
55 m_server.set_close_handler(bind(&broadcast_server::on_close,this,::_1));
56 m_server.set_message_handler(bind(&broadcast_server::on_message,this,::_1,::_2));
57 }
58
59 void run(uint16_t port) {
60 // listen on specified port
61 m_server.listen(port);
62
63 // Start the server accept loop
64 m_server.start_accept();
65
66 // Start the ASIO io_service run loop
67 try {
68 m_server.run();
69 } catch (const std::exception & e) {
70 std::cout << e.what() << std::endl;
71 }
72 }
73
75 {
76 lock_guard<mutex> guard(m_action_lock);
77 //std::cout << "on_open" << std::endl;
78 m_actions.push(action(SUBSCRIBE,hdl));
79 }
80 m_action_cond.notify_one();
81 }
82
84 {
85 lock_guard<mutex> guard(m_action_lock);
86 //std::cout << "on_close" << std::endl;
87 m_actions.push(action(UNSUBSCRIBE,hdl));
88 }
89 m_action_cond.notify_one();
90 }
91
93 // queue message up for sending by processing thread
94 {
95 lock_guard<mutex> guard(m_action_lock);
96 //std::cout << "on_message" << std::endl;
97 m_actions.push(action(MESSAGE,hdl,msg));
98 }
99 m_action_cond.notify_one();
100 }
101
103 while(1) {
104 unique_lock<mutex> lock(m_action_lock);
105
106 while(m_actions.empty()) {
107 m_action_cond.wait(lock);
108 }
109
110 action a = m_actions.front();
111 m_actions.pop();
112
113 lock.unlock();
114
115 if (a.type == SUBSCRIBE) {
116 lock_guard<mutex> guard(m_connection_lock);
117 m_connections.insert(a.hdl);
118 } else if (a.type == UNSUBSCRIBE) {
119 lock_guard<mutex> guard(m_connection_lock);
120 m_connections.erase(a.hdl);
121 } else if (a.type == MESSAGE) {
122 lock_guard<mutex> guard(m_connection_lock);
123
124 con_list::iterator it;
125 for (it = m_connections.begin(); it != m_connections.end(); ++it) {
126 m_server.send(*it,a.msg);
127 }
128 } else {
129 // undefined.
130 }
131 }
132 }
133private:
134 typedef std::set<connection_hdl,std::owner_less<connection_hdl> > con_list;
135
136 server m_server;
137 con_list m_connections;
138 std::queue<action> m_actions;
139
140 mutex m_action_lock;
141 mutex m_connection_lock;
142 condition_variable m_action_cond;
143};
144
145int main() {
146 try {
147 broadcast_server server_instance;
148
149 // Start a thread to run the processing loop
150 thread t(bind(&broadcast_server::process_messages,&server_instance));
151
152 // Run the asio loop with the main thread
153 server_instance.run(9002);
154
155 t.join();
156
157 } catch (websocketpp::exception const & e) {
158 std::cout << e.what() << std::endl;
159 }
160}
@ UNSUBSCRIBE
@ MESSAGE
@ SUBSCRIBE
int main()
websocketpp::server< websocketpp::config::asio > server
void on_open(connection_hdl hdl)
void on_message(connection_hdl hdl, server::message_ptr msg)
void on_close(connection_hdl hdl)
void run(uint16_t port)
void set_message_handler(message_handler h)
Definition endpoint.hpp:322
void send(connection_hdl hdl, std::string const &payload, frame::opcode::value op, lib::error_code &ec)
Create a message and add it to the outgoing send queue (exception free)
void set_open_handler(open_handler h)
Definition endpoint.hpp:277
void set_close_handler(close_handler h)
Definition endpoint.hpp:282
virtual char const * what() const
Definition error.hpp:263
Server endpoint role based on the given config.
void start_accept(lib::error_code &ec)
Starts the server's async connection acceptance loop (exception free)
lib::weak_ptr< void > connection_hdl
A handle to uniquely identify a connection.
const GenericPointer< typename T::ValueType > T2 T::AllocatorType & a
Definition pointer.h:1181
unsigned short uint16_t
Definition stdint.h:125
action(action_type t, connection_hdl h)
websocketpp::connection_hdl hdl
action_type type
action(action_type t, connection_hdl h, server::message_ptr m)
server::message_ptr msg
void lock()