1#include <fc/network/udt_socket.hpp>
2#include <fc/thread/thread.hpp>
3#include <fc/thread/mutex.hpp>
4#include <fc/thread/unique_lock.hpp>
16 UDT::ERRORINFO& error_info = UDT::getlasterror();
17 if( error_info.getErrorCode() )
19 std::string error_message = error_info.getErrorMessage();
29 :_epoll_thread(
"udt_epoll")
33 _epoll_id = UDT::epoll_create();
34 _epoll_loop = _epoll_thread.async( [=](){
poll_loop(); },
"udt_poll_loop" );
39 _epoll_loop.cancel(
"udt_epoll_service is destructing");
46 std::set<UDTSOCKET> read_ready;
47 std::set<UDTSOCKET> write_ready;
48 while( !_epoll_loop.canceled() )
50 UDT::epoll_wait( _epoll_id,
52 &write_ready, 100000000 );
54 {
synchronized(_read_promises_mutex)
55 for(
auto sock : read_ready )
57 auto itr = _read_promises.find( sock );
58 if( itr != _read_promises.end() )
60 itr->second->set_value();
61 _read_promises.erase(itr);
66 {
synchronized(_write_promises_mutex)
67 for(
auto sock : write_ready )
69 auto itr = _write_promises.find( sock );
70 if( itr != _write_promises.end() )
72 itr->second->set_value();
73 _write_promises.erase(itr);
82 const promise<void>::ptr&
p )
84 int events = UDT_EPOLL_IN | UDT_EPOLL_ERR;
85 if( 0 != UDT::epoll_add_usock( _epoll_id,
91 {
synchronized(_read_promises_mutex)
93 _read_promises[udt_socket_id] =
p;
98 const promise<void>::ptr&
p )
100 int events = UDT_EPOLL_OUT | UDT_EPOLL_ERR;
101 if( 0 != UDT::epoll_add_usock( _epoll_id,
108 {
synchronized(_write_promises_mutex)
109 _write_promises[udt_socket_id] =
p;
114 {
synchronized(_read_promises_mutex)
115 auto read_itr = _read_promises.find( udt_socket_id );
116 if( read_itr != _read_promises.end() )
119 _read_promises.erase(read_itr);
122 {
synchronized(_write_promises_mutex)
123 auto write_itr = _write_promises.find( udt_socket_id );
124 if( write_itr != _write_promises.end() )
127 _write_promises.erase(write_itr);
130 UDT::epoll_remove_usock( _epoll_id, udt_socket_id );
134 fc::mutex _read_promises_mutex;
135 fc::mutex _write_promises_mutex;
136 std::unordered_map<int, promise<void>::ptr > _read_promises;
137 std::unordered_map<int, promise<void>::ptr > _write_promises;
139 fc::future<void> _epoll_loop;
140 fc::thread _epoll_thread;
148 return *default_service;
153 udt_socket::udt_socket()
154 :_udt_socket_id( UDT::INVALID_SOCK )
158 udt_socket::~udt_socket()
162 }
catch (
const std::bad_alloc& ) {
164 }
catch (
const boost::interprocess::bad_alloc& ) {
168 }
catch (
const std::exception& e ) {
169 wlog(
"${e}", (
"e", e.what() ) );
178 sockaddr_in local_addr;
179 local_addr.sin_family = AF_INET;
180 local_addr.sin_port = htons(local_endpoint.
port());
181 local_addr.sin_addr.s_addr = htonl(local_endpoint.
get_address());
183 if( UDT::ERROR == UDT::bind(_udt_socket_id, (sockaddr*)&local_addr,
sizeof(local_addr)) )
187 void udt_socket::connect_to(
const ip::endpoint& remote_endpoint )
192 sockaddr_in serv_addr;
193 serv_addr.sin_family = AF_INET;
194 serv_addr.sin_port = htons(remote_endpoint.port());
195 serv_addr.sin_addr.s_addr = htonl(remote_endpoint.get_address());
198 fc::thread connect_thread(
"connect_thread");
199 connect_thread.async( [&](){
200 if( UDT::ERROR == UDT::connect(_udt_socket_id, (sockaddr*)&serv_addr,
sizeof(serv_addr)) )
202 },
"udt_socket::connect_to").
wait();
205 UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block,
sizeof(
bool));
206 UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block,
sizeof(
bool));
211 ip::endpoint udt_socket::remote_endpoint()
const
213 sockaddr_in peer_addr;
214 int peer_addr_size =
sizeof(peer_addr);
215 int error_code = UDT::getpeername( _udt_socket_id, (
struct sockaddr*)&peer_addr, &peer_addr_size );
216 if( error_code == UDT::ERROR )
218 return ip::endpoint( ip::address( htonl( peer_addr.sin_addr.s_addr ) ), htons(peer_addr.sin_port) );
221 ip::endpoint udt_socket::local_endpoint()
const
223 sockaddr_in sock_addr;
224 int addr_size =
sizeof(sock_addr);
225 int error_code = UDT::getsockname( _udt_socket_id, (
struct sockaddr*)&sock_addr, &addr_size );
226 if( error_code == UDT::ERROR )
228 return ip::endpoint( ip::address( htonl( sock_addr.sin_addr.s_addr ) ), htons(sock_addr.sin_port) );
233 size_t udt_socket::readsome(
char* buffer,
size_t max )
235 auto bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 );
236 while( bytes_read == UDT::ERROR )
238 if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV )
240 UDT::getlasterror().clear();
241 promise<void>::ptr
p(
new promise<void>(
"udt_socket::readsome"));
244 bytes_read = UDT::recv( _udt_socket_id, buffer, max, 0 );
252 size_t udt_socket::readsome(
const std::shared_ptr<char>&
buf,
size_t len,
size_t offset )
254 return readsome(
buf.get() + offset,
len);
257 bool udt_socket::eof()
const
266 size_t udt_socket::writesome(
const char* buffer,
size_t len )
268 auto bytes_sent = UDT::send(_udt_socket_id, buffer,
len, 0);
270 while( UDT::ERROR == bytes_sent )
272 if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCSND )
274 UDT::getlasterror().clear();
275 promise<void>::ptr
p(
new promise<void>(
"udt_socket::writesome"));
278 bytes_sent = UDT::send(_udt_socket_id, buffer,
len, 0);
287 size_t udt_socket::writesome(
const std::shared_ptr<const char>&
buf,
size_t len,
size_t offset )
289 return writesome(
buf.get() + offset,
len);
292 void udt_socket::flush(){}
294 void udt_socket::close()
299 UDT::close( _udt_socket_id );
301 _udt_socket_id = UDT::INVALID_SOCK;
305 wlog(
"already closed" );
310 void udt_socket::open()
312 _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0);
313 if( _udt_socket_id == UDT::INVALID_SOCK )
317 bool udt_socket::is_open()
const
319 return _udt_socket_id != UDT::INVALID_SOCK;
327 udt_server::udt_server()
328 :_udt_socket_id( UDT::INVALID_SOCK )
330 _udt_socket_id = UDT::socket(AF_INET, SOCK_STREAM, 0);
331 if( _udt_socket_id == UDT::INVALID_SOCK )
335 UDT::setsockopt(_udt_socket_id, 0, UDT_SNDSYN, &block,
sizeof(
bool));
337 UDT::setsockopt(_udt_socket_id, 0, UDT_RCVSYN, &block,
sizeof(
bool));
341 udt_server::~udt_server()
345 }
catch (
const std::bad_alloc& ) {
347 }
catch (
const boost::interprocess::bad_alloc& ) {
351 }
catch (
const std::exception& e ) {
352 wlog(
"${e}", (
"e", e.what() ) );
356 void udt_server::close()
358 if( _udt_socket_id != UDT::INVALID_SOCK )
360 UDT::close( _udt_socket_id );
363 _udt_socket_id = UDT::INVALID_SOCK;
367 void udt_server::accept( udt_socket&
s )
372 while(
s._udt_socket_id == UDT::INVALID_SOCK )
374 s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen );
375 if( UDT::getlasterror().getErrorCode() == CUDTException::EASYNCRCV )
377 UDT::getlasterror().clear();
378 promise<void>::ptr
p(
new promise<void>(
"udt_server::accept"));
381 s._udt_socket_id = UDT::accept( _udt_socket_id, (sockaddr*)&their_addr, &namelen );
388 void udt_server::listen(
const ip::endpoint& ep )
391 my_addr.sin_family = AF_INET;
392 my_addr.sin_port = htons(ep.port());
393 my_addr.sin_addr.s_addr = INADDR_ANY;
394 memset(&(my_addr.sin_zero),
'\0', 8);
396 if( UDT::ERROR == UDT::bind(_udt_socket_id, (sockaddr*)&my_addr,
sizeof(my_addr)) )
399 UDT::listen(_udt_socket_id, 10);
405 sockaddr_in sock_addr;
406 int addr_size =
sizeof(sock_addr);
407 int error_code = UDT::getsockname( _udt_socket_id, (
struct sockaddr*)&sock_addr, &addr_size );
408 if( error_code == UDT::ERROR )
410 return ip::endpoint( ip::address( htonl( sock_addr.sin_addr.s_addr ) ), htons(sock_addr.sin_port) );
Used to generate a useful error report when an exception is thrown.
std::string to_detail_string(log_level ll=log_level::all) const
const address & get_address() const
void notify_read(int udt_socket_id, const promise< void >::ptr &p)
void remove(int udt_socket_id)
void notify_write(int udt_socket_id, const promise< void >::ptr &p)
#define FC_CAPTURE_AND_RETHROW(...)
#define FC_CAPTURE_AND_THROW(EXCEPTION_TYPE,...)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
void close(T *e, websocketpp::connection_hdl hdl)
fc::exception_ptr copy_exception(T &&e)
udt_epoll_service & default_epool_service()
memset(pInfo->slotDescription, ' ', 64)