Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
rate_limiting.cpp
Go to the documentation of this file.
1#include <fc/network/rate_limiting.hpp>
2#include <fc/network/tcp_socket_io_hooks.hpp>
3#include <fc/network/tcp_socket.hpp>
4#include <list>
5#include <algorithm>
6#include <fc/network/ip.hpp>
7#include <fc/fwd_impl.hpp>
8#include <fc/asio.hpp>
9#include <fc/log/logger.hpp>
10#include <fc/io/stdio.hpp>
12#include <fc/thread/thread.hpp>
13
14namespace fc
15{
16
17 namespace detail
18 {
19 // data about a read or write we're managing
21 {
22 public:
23 size_t length;
24 size_t offset;
26 promise<size_t>::ptr completion_promise;
27
29 size_t offset,
30 promise<size_t>::ptr&& completion_promise) :
31 length(length),
35 {}
36
37 virtual void perform_operation() = 0;
38 };
39
41 {
42 public:
43 boost::asio::ip::tcp::socket& socket;
44 const char* raw_buffer;
45 std::shared_ptr<const char> shared_buffer;
46
47 // QUESTION: Why would this version of the constructor ever be called if it will abort the program if built as DEBUG?
48 // Commenting out for now since this file is not even included in the fc library build.
49 /*
50 rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
51 const char* buffer,
52 size_t length,
53 size_t offset,
54 promise<size_t>::ptr completion_promise) :
55 rate_limited_operation(length, offset, std::move(completion_promise)),
56 socket(socket),
57 raw_buffer(buffer)
58 {
59 assert(false);
60 }
61 */
62 rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket& socket,
63 const std::shared_ptr<const char>& buffer,
64 size_t length,
65 size_t offset,
66 promise<size_t>::ptr completion_promise) :
69 raw_buffer(nullptr),
70 shared_buffer(buffer)
71 {}
72 virtual void perform_operation() override
73 {
74 if (raw_buffer)
75 asio::async_write_some(socket,
78 else
79 asio::async_write_some(socket,
82 }
83 };
84
86 {
87 public:
88 boost::asio::ip::tcp::socket& socket;
90 std::shared_ptr<char> shared_buffer;
91
92 rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
93 char* buffer,
94 size_t length,
95 size_t offset,
96 promise<size_t>::ptr completion_promise) :
99 raw_buffer(buffer)
100 {}
101 rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket& socket,
102 const std::shared_ptr<char>& buffer,
103 size_t length,
104 size_t offset,
105 promise<size_t>::ptr completion_promise) :
107 socket(socket),
108 raw_buffer(nullptr),
109 shared_buffer(buffer)
110 {}
111 virtual void perform_operation() override
112 {
113 if (raw_buffer)
114 asio::async_read_some(socket,
117 else
118 asio::async_read_some(socket,
121
122 }
123 };
124
126 {
127 // less than operator designed to bring the shortest operations to the end
129 {
130 return lhs->length > rhs->length;
131 }
132 };
133
135 {
136 private:
137 mutable double _average_rate;
138 mutable uint32_t _unaccounted_bytes;
139 mutable time_point _last_update_time;
140 microseconds _time_constant;
141
142 void update_const(uint32_t bytes_transferred = 0) const;
143 public:
144 average_rate_meter(const microseconds& time_constant = seconds(10));
145 void set_time_constant(const microseconds& time_constant);
146 void update(uint32_t bytes_transferred = 0);
148 };
150 _average_rate(0.),
151 _unaccounted_bytes(0),
152 _last_update_time(time_point_sec::min()),
153 _time_constant(time_constant)
154 {}
156 {
157 _time_constant = time_constant;
158 }
159 void average_rate_meter::update(uint32_t bytes_transferred /* = 0 */)
160 {
161 update_const(bytes_transferred);
162 }
163 void average_rate_meter::update_const(uint32_t bytes_transferred /* = 0 */) const
164 {
166 if (now <= _last_update_time)
167 _unaccounted_bytes += bytes_transferred;
168 else
169 {
170 microseconds time_since_last_update = now - _last_update_time;
171 if (time_since_last_update > _time_constant)
172 _average_rate = bytes_transferred / (_time_constant.count() / (double)seconds(1).count());
173 else
174 {
175 bytes_transferred += _unaccounted_bytes;
176 double seconds_since_last_update = time_since_last_update.count() / (double)seconds(1).count();
177 double rate_since_last_update = bytes_transferred / seconds_since_last_update;
178 double alpha = time_since_last_update.count() / (double)_time_constant.count();
179 _average_rate = rate_since_last_update * alpha + _average_rate * (1.0 - alpha);
180 }
181 _last_update_time = now;
182 _unaccounted_bytes = 0;
183 }
184 }
186 {
187 update_const();
188 return (uint32_t)_average_rate;
189 }
190
191 class rate_limiting_group_impl : public tcp_socket_io_hooks
192 {
193 public:
197
198 microseconds _granularity; // how often to add tokens to the bucket
200 uint32_t _unused_read_tokens; // gets filled with tokens for unused bytes (if I'm allowed to read 200 bytes and I try to read 200 bytes, but can only read 50, tokens for the other 150 get returned here)
203
204 typedef std::list<rate_limited_operation*> rate_limited_operation_list;
209
212
217
220
221 rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second,
222 uint32_t burstiness_in_seconds = 1);
224
225 virtual size_t readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length) override;
226 virtual size_t readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset) override;
227 template <typename BufferType>
228 size_t readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset);
229 virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length) override;
230 virtual size_t writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset) override;
231 template <typename BufferType>
232 size_t writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset);
233
236 void process_pending_operations(time_point& last_iteration_start_time,
237 uint32_t& limit_bytes_per_second,
238 rate_limited_operation_list& operations_in_progress,
239 rate_limited_operation_list& operations_for_next_iteration,
240 uint32_t& tokens,
241 uint32_t& unused_tokens);
242 };
243
244 rate_limiting_group_impl::rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second,
245 uint32_t burstiness_in_seconds) :
246 _upload_bytes_per_second(upload_bytes_per_second),
247 _download_bytes_per_second(download_bytes_per_second),
248 _burstiness_in_seconds(burstiness_in_seconds),
249 _granularity(milliseconds(50)),
250 _read_tokens(_download_bytes_per_second),
251 _unused_read_tokens(0),
252 _write_tokens(_upload_bytes_per_second),
253 _unused_write_tokens(0)
254 {
255 }
256
258 {
259 try
260 {
261 _process_pending_reads_loop_complete.cancel_and_wait();
262 }
263 catch (...)
264 {
265 }
266 try
267 {
269 }
270 catch (...)
271 {
272 }
273 }
274
275 size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<char>& buffer, size_t length, size_t offset)
276 {
277 return readsome_impl(socket, buffer, length, offset);
278 }
279
280 size_t rate_limiting_group_impl::readsome(boost::asio::ip::tcp::socket& socket, char* buffer, size_t length)
281 {
282 return readsome_impl(socket, buffer, length, 0);
283 }
284
285 template <typename BufferType>
286 size_t rate_limiting_group_impl::readsome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset)
287 {
288 size_t bytes_read;
290 {
291 promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::readsome"));
292 rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise);
293 _read_operations_for_next_iteration.push_back(&read_operation);
294
295 // launch the read processing loop it if isn't running, or signal it to resume if it's paused.
297 _process_pending_reads_loop_complete = async([=](){ process_pending_reads(); }, "process_pending_reads" );
300
301 try
302 {
303 bytes_read = completion_promise->wait();
304 }
305 catch (...)
306 {
307 _read_operations_for_next_iteration.remove(&read_operation);
308 _read_operations_in_progress.remove(&read_operation);
309 throw;
310 }
311 _unused_read_tokens += read_operation.permitted_length - bytes_read;
312 }
313 else
314 bytes_read = asio::read_some(socket, buffer, length, offset);
315
316 _actual_download_rate.update(bytes_read);
317
318 return bytes_read;
319 }
320
321 size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const char* buffer, size_t length)
322 {
323 return writesome_impl(socket, buffer, length, 0);
324 }
325
326 size_t rate_limiting_group_impl::writesome(boost::asio::ip::tcp::socket& socket, const std::shared_ptr<const char>& buffer, size_t length, size_t offset)
327 {
328 return writesome_impl(socket, buffer, length, offset);
329 }
330
331 template <typename BufferType>
332 size_t rate_limiting_group_impl::writesome_impl(boost::asio::ip::tcp::socket& socket, const BufferType& buffer, size_t length, size_t offset)
333 {
334 size_t bytes_written;
336 {
337 promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::writesome"));
338 rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise);
339 _write_operations_for_next_iteration.push_back(&write_operation);
340
341 // launch the write processing loop it if isn't running, or signal it to resume if it's paused.
343 _process_pending_writes_loop_complete = async([=](){ process_pending_writes(); }, "process_pending_writes");
346
347 try
348 {
349 bytes_written = completion_promise->wait();
350 }
351 catch (...)
352 {
353 _write_operations_for_next_iteration.remove(&write_operation);
354 _write_operations_in_progress.remove(&write_operation);
355 throw;
356 }
357 _unused_write_tokens += write_operation.permitted_length - bytes_written;
358 }
359 else
360 bytes_written = asio::write_some(socket, buffer, length, offset);
361
362 _actual_upload_rate.update(bytes_written);
363
364 return bytes_written;
365 }
366
368 {
369 for (;;)
370 {
373
374 _new_read_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_reads");
375 try
376 {
379 else
381 }
382 catch (const timeout_exception&)
383 {
384 }
386 }
387 }
389 {
390 for (;;)
391 {
394
395 _new_write_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_writes");
396 try
397 {
400 else
402 }
403 catch (const timeout_exception&)
404 {
405 }
407 }
408 }
410 uint32_t& limit_bytes_per_second,
411 rate_limited_operation_list& operations_in_progress,
412 rate_limited_operation_list& operations_for_next_iteration,
413 uint32_t& tokens,
414 uint32_t& unused_tokens)
415 {
416 // lock here for multithreaded
417 std::copy(operations_for_next_iteration.begin(),
418 operations_for_next_iteration.end(),
419 std::back_inserter(operations_in_progress));
420 operations_for_next_iteration.clear();
421
422 // find out how much time since our last read/write
423 time_point this_iteration_start_time = time_point::now();
424 if (limit_bytes_per_second) // the we are limiting up/download speed
425 {
426 microseconds time_since_last_iteration = this_iteration_start_time - last_iteration_start_time;
427 if (time_since_last_iteration > seconds(1))
428 time_since_last_iteration = seconds(1);
429 else if (time_since_last_iteration < microseconds(0))
430 time_since_last_iteration = microseconds(0);
431
432 tokens += (uint32_t)((limit_bytes_per_second * time_since_last_iteration.count()) / 1000000);
433 tokens += unused_tokens;
434 unused_tokens = 0;
435 tokens = std::min(tokens, limit_bytes_per_second * _burstiness_in_seconds);
436
437 if (tokens)
438 {
439 // sort the pending reads/writes in order of the number of bytes they need to write, smallest first
440 std::vector<rate_limited_operation*> operations_sorted_by_length;
441 operations_sorted_by_length.reserve(operations_in_progress.size());
442 for (rate_limited_operation* operation_data : operations_in_progress)
443 operations_sorted_by_length.push_back(operation_data);
444 std::sort(operations_sorted_by_length.begin(), operations_sorted_by_length.end(), is_operation_shorter());
445
446 // figure out how many bytes each reader/writer is allowed to read/write
447 uint32_t bytes_remaining_to_allocate = tokens;
448 while (!operations_sorted_by_length.empty())
449 {
450 uint32_t bytes_permitted_for_this_operation = bytes_remaining_to_allocate / operations_sorted_by_length.size();
451 uint32_t bytes_allocated_for_this_operation = std::min<size_t>(operations_sorted_by_length.back()->length, bytes_permitted_for_this_operation);
452 operations_sorted_by_length.back()->permitted_length = bytes_allocated_for_this_operation;
453 bytes_remaining_to_allocate -= bytes_allocated_for_this_operation;
454 operations_sorted_by_length.pop_back();
455 }
456 tokens = bytes_remaining_to_allocate;
457
458 // kick off the reads/writes in first-come order
459 for (auto iter = operations_in_progress.begin(); iter != operations_in_progress.end();)
460 {
461 if ((*iter)->permitted_length > 0)
462 {
463 rate_limited_operation* operation_to_perform = *iter;
464 iter = operations_in_progress.erase(iter);
465 operation_to_perform->perform_operation();
466 }
467 else
468 ++iter;
469 }
470 }
471 }
472 else // down/upload speed is unlimited
473 {
474 // we shouldn't end up here often. If the rate is unlimited, we should just execute
475 // the operation immediately without being queued up. This should only be hit if
476 // we change from a limited rate to unlimited
477 for (auto iter = operations_in_progress.begin();
478 iter != operations_in_progress.end();)
479 {
480 rate_limited_operation* operation_to_perform = *iter;
481 iter = operations_in_progress.erase(iter);
482 operation_to_perform->permitted_length = operation_to_perform->length;
483 operation_to_perform->perform_operation();
484 }
485 }
486 last_iteration_start_time = this_iteration_start_time;
487 }
488
489 }
490
491 rate_limiting_group::rate_limiting_group(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds /* = 1 */) :
492 my(new detail::rate_limiting_group_impl(upload_bytes_per_second, download_bytes_per_second, burstiness_in_seconds))
493 {
494 }
495
496 rate_limiting_group::~rate_limiting_group()
497 {
498 }
499
500 uint32_t rate_limiting_group::get_actual_upload_rate() const
501 {
502 return my->_actual_upload_rate.get_average_rate();
503 }
504
505 uint32_t rate_limiting_group::get_actual_download_rate() const
506 {
507 return my->_actual_download_rate.get_average_rate();
508 }
509
510 void rate_limiting_group::set_actual_rate_time_constant(microseconds time_constant)
511 {
512 my->_actual_upload_rate.set_time_constant(time_constant);
513 my->_actual_download_rate.set_time_constant(time_constant);
514 }
515
516 void rate_limiting_group::set_upload_limit(uint32_t upload_bytes_per_second)
517 {
518 my->_upload_bytes_per_second = upload_bytes_per_second;
519 }
520
521 uint32_t rate_limiting_group::get_upload_limit() const
522 {
523 return my->_upload_bytes_per_second;
524 }
525
526 void rate_limiting_group::set_download_limit(uint32_t download_bytes_per_second)
527 {
528 my->_download_bytes_per_second = download_bytes_per_second;
529 }
530
531 uint32_t rate_limiting_group::get_download_limit() const
532 {
533 return my->_download_bytes_per_second;
534 }
535
536 void rate_limiting_group::add_tcp_socket(tcp_socket* tcp_socket_to_limit)
537 {
538 tcp_socket_to_limit->set_io_hooks(my.get());
539 }
540
541 void rate_limiting_group::remove_tcp_socket(tcp_socket* tcp_socket_to_stop_limiting)
542 {
543 tcp_socket_to_stop_limiting->set_io_hooks(NULL);
544 }
545
546
547} // namespace fc
average_rate_meter(const microseconds &time_constant=seconds(10))
void set_time_constant(const microseconds &time_constant)
void update(uint32_t bytes_transferred=0)
rate_limited_operation(size_t length, size_t offset, promise< size_t >::ptr &&completion_promise)
promise< size_t >::ptr completion_promise
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
boost::asio::ip::tcp::socket & socket
rate_limited_tcp_read_operation(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
boost::asio::ip::tcp::socket & socket
rate_limited_tcp_write_operation(boost::asio::ip::tcp::socket &socket, const std::shared_ptr< const char > &buffer, size_t length, size_t offset, promise< size_t >::ptr completion_promise)
std::shared_ptr< const char > shared_buffer
promise< void >::ptr _new_write_operation_available_promise
rate_limited_operation_list _write_operations_in_progress
rate_limited_operation_list _write_operations_for_next_iteration
virtual size_t writesome(boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length) override
rate_limiting_group_impl(uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds=1)
virtual size_t readsome(boost::asio::ip::tcp::socket &socket, char *buffer, size_t length) override
rate_limited_operation_list _read_operations_in_progress
std::list< rate_limited_operation * > rate_limited_operation_list
void process_pending_operations(time_point &last_iteration_start_time, uint32_t &limit_bytes_per_second, rate_limited_operation_list &operations_in_progress, rate_limited_operation_list &operations_for_next_iteration, uint32_t &tokens, uint32_t &unused_tokens)
promise< void >::ptr _new_read_operation_available_promise
rate_limited_operation_list _read_operations_for_next_iteration
size_t readsome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
size_t writesome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
constexpr int64_t count() const
Definition time.hpp:26
static time_point now()
Definition time.cpp:14
Defines exception's used by fc.
int * count
namespace sysio::chain
Definition authority.cpp:3
constexpr microseconds milliseconds(int64_t s)
Definition time.hpp:33
constexpr microseconds seconds(int64_t s)
Definition time.hpp:32
const T & min(const T &a, const T &b)
Definition utility.hpp:140
Definition name.hpp:106
unsigned int uint32_t
Definition stdint.h:126
bool operator()(const rate_limited_operation *lhs, const rate_limited_operation *rhs)