1#include <fc/network/rate_limiting.hpp>
2#include <fc/network/tcp_socket_io_hooks.hpp>
3#include <fc/network/tcp_socket.hpp>
10#include <fc/io/stdio.hpp>
12#include <fc/thread/thread.hpp>
43 boost::asio::ip::tcp::socket&
socket;
63 const std::shared_ptr<const char>& buffer,
75 asio::async_write_some(
socket,
79 asio::async_write_some(
socket,
88 boost::asio::ip::tcp::socket&
socket;
102 const std::shared_ptr<char>& buffer,
114 asio::async_read_some(
socket,
118 asio::async_read_some(
socket,
137 mutable double _average_rate;
138 mutable uint32_t _unaccounted_bytes;
142 void update_const(
uint32_t bytes_transferred = 0)
const;
151 _unaccounted_bytes(0),
153 _time_constant(time_constant)
157 _time_constant = time_constant;
161 update_const(bytes_transferred);
163 void average_rate_meter::update_const(
uint32_t bytes_transferred )
const
166 if (now <= _last_update_time)
167 _unaccounted_bytes += bytes_transferred;
170 microseconds time_since_last_update = now - _last_update_time;
171 if (time_since_last_update > _time_constant)
172 _average_rate = bytes_transferred / (_time_constant.
count() / (
double)
seconds(1).
count());
175 bytes_transferred += _unaccounted_bytes;
176 double seconds_since_last_update = time_since_last_update.
count() / (double)
seconds(1).
count();
177 double rate_since_last_update = bytes_transferred / seconds_since_last_update;
178 double alpha = time_since_last_update.
count() / (double)_time_constant.
count();
179 _average_rate = rate_since_last_update * alpha + _average_rate * (1.0 - alpha);
181 _last_update_time = now;
182 _unaccounted_bytes = 0;
222 uint32_t burstiness_in_seconds = 1);
225 virtual size_t readsome(boost::asio::ip::tcp::socket& socket,
char* buffer,
size_t length)
override;
226 virtual size_t readsome(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<char>& buffer,
size_t length,
size_t offset)
override;
227 template <
typename BufferType>
228 size_t readsome_impl(boost::asio::ip::tcp::socket& socket,
const BufferType& buffer,
size_t length,
size_t offset);
229 virtual size_t writesome(boost::asio::ip::tcp::socket& socket,
const char* buffer,
size_t length)
override;
230 virtual size_t writesome(boost::asio::ip::tcp::socket& socket,
const std::shared_ptr<const char>& buffer,
size_t length,
size_t offset)
override;
231 template <
typename BufferType>
232 size_t writesome_impl(boost::asio::ip::tcp::socket& socket,
const BufferType& buffer,
size_t length,
size_t offset);
246 _upload_bytes_per_second(upload_bytes_per_second),
247 _download_bytes_per_second(download_bytes_per_second),
248 _burstiness_in_seconds(burstiness_in_seconds),
250 _read_tokens(_download_bytes_per_second),
251 _unused_read_tokens(0),
252 _write_tokens(_upload_bytes_per_second),
253 _unused_write_tokens(0)
285 template <
typename BufferType>
291 promise<size_t>::ptr completion_promise(
new promise<size_t>(
"rate_limiting_group_impl::readsome"));
303 bytes_read = completion_promise->wait();
314 bytes_read = asio::read_some(socket, buffer, length, offset);
331 template <
typename BufferType>
334 size_t bytes_written;
337 promise<size_t>::ptr completion_promise(
new promise<size_t>(
"rate_limiting_group_impl::writesome"));
349 bytes_written = completion_promise->wait();
360 bytes_written = asio::write_some(socket, buffer, length, offset);
364 return bytes_written;
382 catch (
const timeout_exception&)
403 catch (
const timeout_exception&)
417 std::copy(operations_for_next_iteration.begin(),
418 operations_for_next_iteration.end(),
419 std::back_inserter(operations_in_progress));
420 operations_for_next_iteration.clear();
424 if (limit_bytes_per_second)
426 microseconds time_since_last_iteration = this_iteration_start_time - last_iteration_start_time;
427 if (time_since_last_iteration >
seconds(1))
428 time_since_last_iteration =
seconds(1);
432 tokens += (
uint32_t)((limit_bytes_per_second * time_since_last_iteration.
count()) / 1000000);
433 tokens += unused_tokens;
440 std::vector<rate_limited_operation*> operations_sorted_by_length;
441 operations_sorted_by_length.reserve(operations_in_progress.size());
443 operations_sorted_by_length.push_back(operation_data);
444 std::sort(operations_sorted_by_length.begin(), operations_sorted_by_length.end(),
is_operation_shorter());
447 uint32_t bytes_remaining_to_allocate = tokens;
448 while (!operations_sorted_by_length.empty())
450 uint32_t bytes_permitted_for_this_operation = bytes_remaining_to_allocate / operations_sorted_by_length.size();
451 uint32_t bytes_allocated_for_this_operation = std::min<size_t>(operations_sorted_by_length.back()->length, bytes_permitted_for_this_operation);
452 operations_sorted_by_length.back()->permitted_length = bytes_allocated_for_this_operation;
453 bytes_remaining_to_allocate -= bytes_allocated_for_this_operation;
454 operations_sorted_by_length.pop_back();
456 tokens = bytes_remaining_to_allocate;
459 for (
auto iter = operations_in_progress.begin(); iter != operations_in_progress.end();)
461 if ((*iter)->permitted_length > 0)
464 iter = operations_in_progress.erase(iter);
477 for (
auto iter = operations_in_progress.begin();
478 iter != operations_in_progress.end();)
481 iter = operations_in_progress.erase(iter);
486 last_iteration_start_time = this_iteration_start_time;
491 rate_limiting_group::rate_limiting_group(
uint32_t upload_bytes_per_second,
uint32_t download_bytes_per_second,
uint32_t burstiness_in_seconds ) :
492 my(new
detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second, burstiness_in_seconds))
496 rate_limiting_group::~rate_limiting_group()
500 uint32_t rate_limiting_group::get_actual_upload_rate()
const
502 return my->_actual_upload_rate.get_average_rate();
505 uint32_t rate_limiting_group::get_actual_download_rate()
const
507 return my->_actual_download_rate.get_average_rate();
510 void rate_limiting_group::set_actual_rate_time_constant(microseconds time_constant)
512 my->_actual_upload_rate.set_time_constant(time_constant);
513 my->_actual_download_rate.set_time_constant(time_constant);
516 void rate_limiting_group::set_upload_limit(
uint32_t upload_bytes_per_second)
518 my->_upload_bytes_per_second = upload_bytes_per_second;
521 uint32_t rate_limiting_group::get_upload_limit()
const
523 return my->_upload_bytes_per_second;
526 void rate_limiting_group::set_download_limit(
uint32_t download_bytes_per_second)
528 my->_download_bytes_per_second = download_bytes_per_second;
531 uint32_t rate_limiting_group::get_download_limit()
const
533 return my->_download_bytes_per_second;
536 void rate_limiting_group::add_tcp_socket(tcp_socket* tcp_socket_to_limit)
538 tcp_socket_to_limit->set_io_hooks(my.get());
541 void rate_limiting_group::remove_tcp_socket(tcp_socket* tcp_socket_to_stop_limiting)
543 tcp_socket_to_stop_limiting->set_io_hooks(NULL);
average_rate_meter(const microseconds &time_constant=seconds(10))
void set_time_constant(const microseconds &time_constant)
uint32_t get_average_rate() const
void update(uint32_t bytes_transferred=0)
virtual void perform_operation()=0
rate_limited_operation(size_t length, size_t offset, promise< size_t >::ptr &&completion_promise)
promise< size_t >::ptr completion_promise
std::shared_ptr< char > shared_buffer
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
virtual void perform_operation() override
boost::asio::ip::tcp::socket & socket
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
boost::asio::ip::tcp::socket & socket
virtual void perform_operation() override
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< const char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
std::shared_ptr< const char > shared_buffer
promise< void >::ptr _new_write_operation_available_promise
future< void > _process_pending_writes_loop_complete
void process_pending_reads()
rate_limited_operation_list _write_operations_in_progress
microseconds _granularity
rate_limited_operation_list _write_operations_for_next_iteration
virtual size_t writesome(boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length) override
time_point _last_write_iteration_time
~rate_limiting_group_impl()
rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds=1)
average_rate_meter _actual_download_rate
uint32_t _burstiness_in_seconds
virtual size_t readsome(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length) override
uint32_t _unused_read_tokens
uint32_t _unused_write_tokens
rate_limited_operation_list _read_operations_in_progress
average_rate_meter _actual_upload_rate
time_point _last_read_iteration_time
uint32_t _download_bytes_per_second
std::list< rate_limited_operation * > rate_limited_operation_list
void process_pending_operations(time_point &last_iteration_start_time, uint32_t &limit_bytes_per_second, rate_limited_operation_list &operations_in_progress, rate_limited_operation_list &operations_for_next_iteration, uint32_t &tokens, uint32_t &unused_tokens)
promise< void >::ptr _new_read_operation_available_promise
rate_limited_operation_list _read_operations_for_next_iteration
future< void > _process_pending_reads_loop_complete
size_t readsome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
void process_pending_writes()
uint32_t _upload_bytes_per_second
size_t writesome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
constexpr int64_t count() const
Defines exception's used by fc.
constexpr microseconds milliseconds(int64_t s)
constexpr microseconds seconds(int64_t s)
const T & min(const T &a, const T &b)
bool operator()(const rate_limited_operation *lhs, const rate_limited_operation *rhs)