Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
udt_socket.cpp
Go to the documentation of this file.
1#include <fc/network/udt_socket.hpp>
2#include <fc/thread/thread.hpp>
3#include <fc/thread/mutex.hpp>
4#include <fc/thread/unique_lock.hpp>
5#include <fc/network/ip.hpp>
6#include <udt.h>
7
8#ifndef WIN32
9# include <arpa/inet.h>
10#endif
11
12namespace fc {
13
15 {
16 UDT::ERRORINFO& error_info = UDT::getlasterror();
17 if( error_info.getErrorCode() )
18 {
19 std::string error_message = error_info.getErrorMessage();
20 error_info.clear();
21 FC_CAPTURE_AND_THROW( udt_exception, (error_message) );
22 }
23 }
24
26 {
27 public:
29 :_epoll_thread("udt_epoll")
30 {
31 UDT::startup();
33 _epoll_id = UDT::epoll_create();
34 _epoll_loop = _epoll_thread.async( [=](){ poll_loop(); }, "udt_poll_loop" );
35 }
36
38 {
39 _epoll_loop.cancel("udt_epoll_service is destructing");
40 _epoll_loop.wait();
41 UDT::cleanup();
42 }
43
44 void poll_loop()
45 {
46 std::set<UDTSOCKET> read_ready;
47 std::set<UDTSOCKET> write_ready;
48 while( !_epoll_loop.canceled() )
49 {
50 UDT::epoll_wait( _epoll_id,
51 &read_ready,
52 &write_ready, 100000000 );
53
54 { synchronized(_read_promises_mutex)
55 for( auto sock : read_ready )
56 {
57 auto itr = _read_promises.find( sock );
58 if( itr != _read_promises.end() )
59 {
60 itr->second->set_value();
61 _read_promises.erase(itr);
62 }
63 }
64 } // synchronized read promise mutex
65
66 { synchronized(_write_promises_mutex)
67 for( auto sock : write_ready )
68 {
69 auto itr = _write_promises.find( sock );
70 if( itr != _write_promises.end() )
71 {
72 itr->second->set_value();
73 _write_promises.erase(itr);
74 }
75 }
76 } // synchronized write promise mutex
77 } // while not canceled
78 } // poll_loop
79
80
81 void notify_read( int udt_socket_id,
82 const promise<void>::ptr& p )
83 {
84 int events = UDT_EPOLL_IN | UDT_EPOLL_ERR;
85 if( 0 != UDT::epoll_add_usock( _epoll_id,
86 udt_socket_id,
87 &events ) )
88 {
90 }
91 { synchronized(_read_promises_mutex)
92
93 _read_promises[udt_socket_id] = p;
94 }
95 }
96
97 void notify_write( int udt_socket_id,
98 const promise<void>::ptr& p )
99 {
100 int events = UDT_EPOLL_OUT | UDT_EPOLL_ERR;
101 if( 0 != UDT::epoll_add_usock( _epoll_id,
102 udt_socket_id,
103 &events ) )
104 {
106 }
107
108 { synchronized(_write_promises_mutex)
109 _write_promises[udt_socket_id] = p;
110 }
111 }
112 void remove( int udt_socket_id )
113 {
114 { synchronized(_read_promises_mutex)
115 auto read_itr = _read_promises.find( udt_socket_id );
116 if( read_itr != _read_promises.end() )
117 {
118 read_itr->second->set_exception( fc::copy_exception( fc::exception() ) );
119 _read_promises.erase(read_itr);
120 }
121 }
122 { synchronized(_write_promises_mutex)
123 auto write_itr = _write_promises.find( udt_socket_id );
124 if( write_itr != _write_promises.end() )
125 {
126 write_itr->second->set_exception( fc::copy_exception( fc::exception() ) );
127 _write_promises.erase(write_itr);
128 }
129 }
130 UDT::epoll_remove_usock( _epoll_id, udt_socket_id );
131 }
132
133 private:
134 fc::mutex _read_promises_mutex;
135 fc::mutex _write_promises_mutex;
136 std::unordered_map<int, promise<void>::ptr > _read_promises;
137 std::unordered_map<int, promise<void>::ptr > _write_promises;
138
139 fc::future<void> _epoll_loop;
140 fc::thread _epoll_thread;
141 int _epoll_id;
142 };
143
144
146 {
147 static udt_epoll_service* default_service = new udt_epoll_service();
148 return *default_service;
149 }
150
151
152
153 udt_socket::udt_socket()
154 :_udt_socket_id( UDT::INVALID_SOCK )
155 {
156 }
157
158 udt_socket::~udt_socket()
159 {
160 try {
161 close();
162 } catch ( const std::bad_alloc& ) {
163 throw;
164 } catch ( const boost::interprocess::bad_alloc& ) {
165 throw;
166 } catch ( const fc::exception& e ) {
167 wlog( "${e}", ("e", e.to_detail_string() ) );
168 } catch ( const std::exception& e ) {
169 wlog( "${e}", ("e", e.what() ) );
170 }
171 }
172
173 void udt_socket::bind( const fc::ip::endpoint& local_endpoint )
174 { try {
175 if( !is_open() )
176 open();
177
178 sockaddr_in local_addr;
179 local_addr.sin_family = AF_INET;
180 local_addr.sin_port = htons(local_endpoint.port());
181 local_addr.sin_addr.s_addr = htonl(local_endpoint.get_address());
182
183 if( UDT::ERROR == UDT::bind(_udt_socket_id, (sockaddr*)&local_addr, sizeof(local_addr)) )
186
187 void udt_socket::connect_to( const ip::endpoint& remote_endpoint )
188 { try {
189 if( !is_open() )
190 open();
191
192 sockaddr_in serv_addr;
193 serv_addr.sin_family = AF_INET;
194 serv_addr.sin_port = htons(remote_endpoint.port());
195 serv_addr.sin_addr.s_addr = htonl(remote_endpoint.get_address());
196
197 // UDT doesn't allow now blocking connects...
198 fc::thread connect_thread("connect_thread");
199 connect_thread.async( [&](){
200 if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr, sizeof(serv_addr)) )
202 }, "udt_socket::connect_to").wait();
203
204 bool block = false;
205 UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool));
206 UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool));
208
209 } FC_CAPTURE_AND_RETHROW( (remote_endpoint) ) }
210
211 ip::endpoint udt_socket::remote_endpoint() const
212 { try {
213 sockaddr_in peer_addr;
214 int peer_addr_size = sizeof(peer_addr);
215 int error_code = UDT::getpeername( _udt_socket_id, (struct sockaddr*)&peer_addr, &peer_addr_size );
216 if( error_code == UDT::ERROR )
218 return ip::endpoint( ip::address( htonl( peer_addr.sin_addr.s_addr ) ), htons(peer_addr.sin_port) );
220
221 ip::endpoint udt_socket::local_endpoint() const
222 { try {
223 sockaddr_in sock_addr;
224 int addr_size = sizeof(sock_addr);
225 int error_code = UDT::getsockname( _udt_socket_id, (struct sockaddr*)&sock_addr, &addr_size );
226 if( error_code == UDT::ERROR )
228 return ip::endpoint( ip::address( htonl( sock_addr.sin_addr.s_addr ) ), htons(sock_addr.sin_port) );
230
231
233 size_t udt_socket::readsome( char* buffer, size_t max )
234 { try {
235 auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 );
236 while( bytes_read == UDT::ERROR )
237 {
238 if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV )
239 {
240 UDT::getlasterror().clear();
241 promise<void>::ptr p(new promise<void>("udt_socket::readsome"));
242 default_epool_service().notify_read( _udt_socket_id, p );
243 p->wait();
244 bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 );
245 }
246 else
248 }
249 return bytes_read;
250 } FC_CAPTURE_AND_RETHROW( (max) ) }
251
252 size_t udt_socket::readsome( const std::shared_ptr<char>& buf, size_t len, size_t offset )
253 {
254 return readsome(buf.get() + offset, len);
255 }
256
257 bool udt_socket::eof()const
258 {
259 // TODO...
260 return false;
261 }
263
266 size_t udt_socket::writesome( const char* buffer, size_t len )
267 { try {
268 auto bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0);
269
270 while( UDT::ERROR == bytes_sent )
271 {
272 if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCSND )
273 {
274 UDT::getlasterror().clear();
275 promise<void>::ptr p(new promise<void>("udt_socket::writesome"));
276 default_epool_service().notify_write( _udt_socket_id, p );
277 p->wait();
278 bytes_sent = UDT::send(_udt_socket_id, buffer, len, 0);
279 continue;
280 }
281 else
283 }
284 return bytes_sent;
286
287 size_t udt_socket::writesome( const std::shared_ptr<const char>& buf, size_t len, size_t offset )
288 {
289 return writesome(buf.get() + offset, len);
290 }
291
292 void udt_socket::flush(){}
293
294 void udt_socket::close()
295 { try {
296 if( is_open() )
297 {
298 default_epool_service().remove( _udt_socket_id );
299 UDT::close( _udt_socket_id );
301 _udt_socket_id = UDT::INVALID_SOCK;
302 }
303 else
304 {
305 wlog( "already closed" );
306 }
309
310 void udt_socket::open()
311 {
312 _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0);
313 if( _udt_socket_id == UDT::INVALID_SOCK )
315 }
316
317 bool udt_socket::is_open()const
318 {
319 return _udt_socket_id != UDT::INVALID_SOCK;
320 }
321
322
323
324
325
326
327 udt_server::udt_server()
328 :_udt_socket_id( UDT::INVALID_SOCK )
329 {
330 _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0);
331 if( _udt_socket_id == UDT::INVALID_SOCK )
333
334 bool block = false;
335 UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block, sizeof(bool));
337 UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block, sizeof(bool));
339 }
340
341 udt_server::~udt_server()
342 {
343 try {
344 close();
345 } catch ( const std::bad_alloc& ) {
346 throw;
347 } catch ( const boost::interprocess::bad_alloc& ) {
348 throw;
349 } catch ( const fc::exception& e ) {
350 wlog( "${e}", ("e", e.to_detail_string() ) );
351 } catch ( const std::exception& e ) {
352 wlog( "${e}", ("e", e.what() ) );
353 }
354 }
355
356 void udt_server::close()
357 { try {
358 if( _udt_socket_id != UDT::INVALID_SOCK )
359 {
360 UDT::close( _udt_socket_id );
362 default_epool_service().remove( _udt_socket_id );
363 _udt_socket_id = UDT::INVALID_SOCK;
364 }
366
367 void udt_server::accept( udt_socket& s )
368 { try {
369 FC_ASSERT( !s.is_open() );
370
371
372 while( s._udt_socket_id == UDT::INVALID_SOCK )
373 {
374 s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen );
375 if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV )
376 {
377 UDT::getlasterror().clear();
378 promise<void>::ptr p(new promise<void>("udt_server::accept"));
379 default_epool_service().notify_read( _udt_socket_id, p );
380 p->wait();
381 s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen );
382 }
383 else
385 }
387
388 void udt_server::listen( const ip::endpoint& ep )
389 { try {
390 sockaddr_in my_addr;
391 my_addr.sin_family = AF_INET;
392 my_addr.sin_port = htons(ep.port());
393 my_addr.sin_addr.s_addr = INADDR_ANY;
394 memset(&(my_addr.sin_zero), '\0', 8);
395
396 if( UDT::ERROR == UDT::bind(_udt_socket_id, (sockaddr*)&my_addr, sizeof(my_addr)) )
398
399 UDT::listen(_udt_socket_id, 10);
401 } FC_CAPTURE_AND_RETHROW( (ep) ) }
402
403 fc::ip::endpoint udt_server::local_endpoint() const
404 { try {
405 sockaddr_in sock_addr;
406 int addr_size = sizeof(sock_addr);
407 int error_code = UDT::getsockname( _udt_socket_id, (struct sockaddr*)&sock_addr, &addr_size );
408 if( error_code == UDT::ERROR )
410 return ip::endpoint( ip::address( htonl( sock_addr.sin_addr.s_addr ) ), htons(sock_addr.sin_port) );
412
413}
const mie::Vuint & p
Definition bn.cpp:27
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_detail_string(log_level ll=log_level::all) const
const address & get_address() const
Definition ip.cpp:72
uint16_t port() const
Definition ip.cpp:71
void notify_read(int udt_socket_id, const promise< void >::ptr &p)
void remove(int udt_socket_id)
void notify_write(int udt_socket_id, const promise< void >::ptr &p)
#define FC_CAPTURE_AND_RETHROW(...)
#define FC_CAPTURE_AND_THROW(EXCEPTION_TYPE,...)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
void close(T *e, websocketpp::connection_hdl hdl)
#define wlog(FORMAT,...)
Definition logger.hpp:124
namespace sysio::chain
Definition authority.cpp:3
fc::exception_ptr copy_exception(T &&e)
udt_epoll_service & default_epool_service()
void check_udt_errors()
void wait()
char * s
size_t len
uint8_t buf[2048]
memset(pInfo->slotDescription, ' ', 64)