Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
fc::detail::rate_limiting_group_impl Class Reference
Inheritance diagram for fc::detail::rate_limiting_group_impl:
Collaboration diagram for fc::detail::rate_limiting_group_impl:

Public Types

typedef std::list< rate_limited_operation * > rate_limited_operation_list
 

Public Member Functions

 rate_limiting_group_impl (uint32_t upload_bytes_per_second, uint32_t download_bytes_per_second, uint32_t burstiness_in_seconds=1)
 
 ~rate_limiting_group_impl ()
 
virtual size_t readsome (boost::asio::ip::tcp::socket &socket, char *buffer, size_t length) override
 
virtual size_t readsome (boost::asio::ip::tcp::socket &socket, const std::shared_ptr< char > &buffer, size_t length, size_t offset) override
 
template<typename BufferType >
size_t readsome_impl (boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
 
virtual size_t writesome (boost::asio::ip::tcp::socket &socket, const char *buffer, size_t length) override
 
virtual size_t writesome (boost::asio::ip::tcp::socket &socket, const std::shared_ptr< const char > &buffer, size_t length, size_t offset) override
 
template<typename BufferType >
size_t writesome_impl (boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
 
void process_pending_reads ()
 
void process_pending_writes ()
 
void process_pending_operations (time_point &last_iteration_start_time, uint32_t &limit_bytes_per_second, rate_limited_operation_list &operations_in_progress, rate_limited_operation_list &operations_for_next_iteration, uint32_t &tokens, uint32_t &unused_tokens)
 

Public Attributes

uint32_t _upload_bytes_per_second
 
uint32_t _download_bytes_per_second
 
uint32_t _burstiness_in_seconds
 
microseconds _granularity
 
uint32_t _read_tokens
 
uint32_t _unused_read_tokens
 
uint32_t _write_tokens
 
uint32_t _unused_write_tokens
 
rate_limited_operation_list _read_operations_in_progress
 
rate_limited_operation_list _read_operations_for_next_iteration
 
rate_limited_operation_list _write_operations_in_progress
 
rate_limited_operation_list _write_operations_for_next_iteration
 
time_point _last_read_iteration_time
 
time_point _last_write_iteration_time
 
future< void > _process_pending_reads_loop_complete
 
promise< void >::ptr _new_read_operation_available_promise
 
future< void > _process_pending_writes_loop_complete
 
promise< void >::ptr _new_write_operation_available_promise
 
average_rate_meter _actual_upload_rate
 
average_rate_meter _actual_download_rate
 

Detailed Description

Definition at line 191 of file rate_limiting.cpp.

Member Typedef Documentation

◆ rate_limited_operation_list

Constructor & Destructor Documentation

◆ rate_limiting_group_impl()

fc::detail::rate_limiting_group_impl::rate_limiting_group_impl ( uint32_t upload_bytes_per_second,
uint32_t download_bytes_per_second,
uint32_t burstiness_in_seconds = 1 )

◆ ~rate_limiting_group_impl()

fc::detail::rate_limiting_group_impl::~rate_limiting_group_impl ( )

Definition at line 257 of file rate_limiting.cpp.

258 {
259 try
260 {
261 _process_pending_reads_loop_complete.cancel_and_wait();
262 }
263 catch (...)
264 {
265 }
266 try
267 {
269 }
270 catch (...)
271 {
272 }
273 }

Member Function Documentation

◆ process_pending_operations()

void fc::detail::rate_limiting_group_impl::process_pending_operations ( time_point & last_iteration_start_time,
uint32_t & limit_bytes_per_second,
rate_limited_operation_list & operations_in_progress,
rate_limited_operation_list & operations_for_next_iteration,
uint32_t & tokens,
uint32_t & unused_tokens )

Definition at line 409 of file rate_limiting.cpp.

415 {
416 // lock here for multithreaded
417 std::copy(operations_for_next_iteration.begin(),
418 operations_for_next_iteration.end(),
419 std::back_inserter(operations_in_progress));
420 operations_for_next_iteration.clear();
421
422 // find out how much time since our last read/write
423 time_point this_iteration_start_time = time_point::now();
424 if (limit_bytes_per_second) // the we are limiting up/download speed
425 {
426 microseconds time_since_last_iteration = this_iteration_start_time - last_iteration_start_time;
427 if (time_since_last_iteration > seconds(1))
428 time_since_last_iteration = seconds(1);
429 else if (time_since_last_iteration < microseconds(0))
430 time_since_last_iteration = microseconds(0);
431
432 tokens += (uint32_t)((limit_bytes_per_second * time_since_last_iteration.count()) / 1000000);
433 tokens += unused_tokens;
434 unused_tokens = 0;
435 tokens = std::min(tokens, limit_bytes_per_second * _burstiness_in_seconds);
436
437 if (tokens)
438 {
439 // sort the pending reads/writes in order of the number of bytes they need to write, smallest first
440 std::vector<rate_limited_operation*> operations_sorted_by_length;
441 operations_sorted_by_length.reserve(operations_in_progress.size());
442 for (rate_limited_operation* operation_data : operations_in_progress)
443 operations_sorted_by_length.push_back(operation_data);
444 std::sort(operations_sorted_by_length.begin(), operations_sorted_by_length.end(), is_operation_shorter());
445
446 // figure out how many bytes each reader/writer is allowed to read/write
447 uint32_t bytes_remaining_to_allocate = tokens;
448 while (!operations_sorted_by_length.empty())
449 {
450 uint32_t bytes_permitted_for_this_operation = bytes_remaining_to_allocate / operations_sorted_by_length.size();
451 uint32_t bytes_allocated_for_this_operation = std::min<size_t>(operations_sorted_by_length.back()->length, bytes_permitted_for_this_operation);
452 operations_sorted_by_length.back()->permitted_length = bytes_allocated_for_this_operation;
453 bytes_remaining_to_allocate -= bytes_allocated_for_this_operation;
454 operations_sorted_by_length.pop_back();
455 }
456 tokens = bytes_remaining_to_allocate;
457
458 // kick off the reads/writes in first-come order
459 for (auto iter = operations_in_progress.begin(); iter != operations_in_progress.end();)
460 {
461 if ((*iter)->permitted_length > 0)
462 {
463 rate_limited_operation* operation_to_perform = *iter;
464 iter = operations_in_progress.erase(iter);
465 operation_to_perform->perform_operation();
466 }
467 else
468 ++iter;
469 }
470 }
471 }
472 else // down/upload speed is unlimited
473 {
474 // we shouldn't end up here often. If the rate is unlimited, we should just execute
475 // the operation immediately without being queued up. This should only be hit if
476 // we change from a limited rate to unlimited
477 for (auto iter = operations_in_progress.begin();
478 iter != operations_in_progress.end();)
479 {
480 rate_limited_operation* operation_to_perform = *iter;
481 iter = operations_in_progress.erase(iter);
482 operation_to_perform->permitted_length = operation_to_perform->length;
483 operation_to_perform->perform_operation();
484 }
485 }
486 last_iteration_start_time = this_iteration_start_time;
487 }
static time_point now()
Definition time.cpp:14
constexpr microseconds seconds(int64_t s)
Definition time.hpp:32
unsigned int uint32_t
Definition stdint.h:126
Here is the call graph for this function:
Here is the caller graph for this function:

◆ process_pending_reads()

void fc::detail::rate_limiting_group_impl::process_pending_reads ( )

Definition at line 367 of file rate_limiting.cpp.

368 {
369 for (;;)
370 {
373
374 _new_read_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_reads");
375 try
376 {
379 else
381 }
382 catch (const timeout_exception&)
383 {
384 }
386 }
387 }
rate_limited_operation_list _read_operations_in_progress
void process_pending_operations(time_point &last_iteration_start_time, uint32_t &limit_bytes_per_second, rate_limited_operation_list &operations_in_progress, rate_limited_operation_list &operations_for_next_iteration, uint32_t &tokens, uint32_t &unused_tokens)
promise< void >::ptr _new_read_operation_available_promise
rate_limited_operation_list _read_operations_for_next_iteration
Here is the call graph for this function:
Here is the caller graph for this function:

◆ process_pending_writes()

void fc::detail::rate_limiting_group_impl::process_pending_writes ( )

Definition at line 388 of file rate_limiting.cpp.

389 {
390 for (;;)
391 {
394
395 _new_write_operation_available_promise = new promise<void>("rate_limiting_group_impl::process_pending_writes");
396 try
397 {
400 else
402 }
403 catch (const timeout_exception&)
404 {
405 }
407 }
408 }
promise< void >::ptr _new_write_operation_available_promise
rate_limited_operation_list _write_operations_in_progress
rate_limited_operation_list _write_operations_for_next_iteration
Here is the call graph for this function:
Here is the caller graph for this function:

◆ readsome() [1/2]

size_t fc::detail::rate_limiting_group_impl::readsome ( boost::asio::ip::tcp::socket & socket,
char * buffer,
size_t length )
overridevirtual

Definition at line 280 of file rate_limiting.cpp.

281 {
282 return readsome_impl(socket, buffer, length, 0);
283 }
size_t readsome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
Here is the call graph for this function:

◆ readsome() [2/2]

size_t fc::detail::rate_limiting_group_impl::readsome ( boost::asio::ip::tcp::socket & socket,
const std::shared_ptr< char > & buffer,
size_t length,
size_t offset )
overridevirtual

Definition at line 275 of file rate_limiting.cpp.

276 {
277 return readsome_impl(socket, buffer, length, offset);
278 }
Here is the call graph for this function:

◆ readsome_impl()

template<typename BufferType >
size_t fc::detail::rate_limiting_group_impl::readsome_impl ( boost::asio::ip::tcp::socket & socket,
const BufferType & buffer,
size_t length,
size_t offset )

Definition at line 286 of file rate_limiting.cpp.

287 {
288 size_t bytes_read;
290 {
291 promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::readsome"));
292 rate_limited_tcp_read_operation read_operation(socket, buffer, length, offset, completion_promise);
293 _read_operations_for_next_iteration.push_back(&read_operation);
294
295 // launch the read processing loop it if isn't running, or signal it to resume if it's paused.
297 _process_pending_reads_loop_complete = async([=](){ process_pending_reads(); }, "process_pending_reads" );
300
301 try
302 {
303 bytes_read = completion_promise->wait();
304 }
305 catch (...)
306 {
307 _read_operations_for_next_iteration.remove(&read_operation);
308 _read_operations_in_progress.remove(&read_operation);
309 throw;
310 }
311 _unused_read_tokens += read_operation.permitted_length - bytes_read;
312 }
313 else
314 bytes_read = asio::read_some(socket, buffer, length, offset);
315
316 _actual_download_rate.update(bytes_read);
317
318 return bytes_read;
319 }
void update(uint32_t bytes_transferred=0)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ writesome() [1/2]

size_t fc::detail::rate_limiting_group_impl::writesome ( boost::asio::ip::tcp::socket & socket,
const char * buffer,
size_t length )
overridevirtual

Definition at line 321 of file rate_limiting.cpp.

322 {
323 return writesome_impl(socket, buffer, length, 0);
324 }
size_t writesome_impl(boost::asio::ip::tcp::socket &socket, const BufferType &buffer, size_t length, size_t offset)
Here is the call graph for this function:

◆ writesome() [2/2]

size_t fc::detail::rate_limiting_group_impl::writesome ( boost::asio::ip::tcp::socket & socket,
const std::shared_ptr< const char > & buffer,
size_t length,
size_t offset )
overridevirtual

Definition at line 326 of file rate_limiting.cpp.

327 {
328 return writesome_impl(socket, buffer, length, offset);
329 }
Here is the call graph for this function:

◆ writesome_impl()

template<typename BufferType >
size_t fc::detail::rate_limiting_group_impl::writesome_impl ( boost::asio::ip::tcp::socket & socket,
const BufferType & buffer,
size_t length,
size_t offset )

Definition at line 332 of file rate_limiting.cpp.

333 {
334 size_t bytes_written;
336 {
337 promise<size_t>::ptr completion_promise(new promise<size_t>("rate_limiting_group_impl::writesome"));
338 rate_limited_tcp_write_operation write_operation(socket, buffer, length, offset, completion_promise);
339 _write_operations_for_next_iteration.push_back(&write_operation);
340
341 // launch the write processing loop it if isn't running, or signal it to resume if it's paused.
343 _process_pending_writes_loop_complete = async([=](){ process_pending_writes(); }, "process_pending_writes");
346
347 try
348 {
349 bytes_written = completion_promise->wait();
350 }
351 catch (...)
352 {
353 _write_operations_for_next_iteration.remove(&write_operation);
354 _write_operations_in_progress.remove(&write_operation);
355 throw;
356 }
357 _unused_write_tokens += write_operation.permitted_length - bytes_written;
358 }
359 else
360 bytes_written = asio::write_some(socket, buffer, length, offset);
361
362 _actual_upload_rate.update(bytes_written);
363
364 return bytes_written;
365 }
Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ _actual_download_rate

average_rate_meter fc::detail::rate_limiting_group_impl::_actual_download_rate

Definition at line 219 of file rate_limiting.cpp.

◆ _actual_upload_rate

average_rate_meter fc::detail::rate_limiting_group_impl::_actual_upload_rate

Definition at line 218 of file rate_limiting.cpp.

◆ _burstiness_in_seconds

uint32_t fc::detail::rate_limiting_group_impl::_burstiness_in_seconds

Definition at line 196 of file rate_limiting.cpp.

◆ _download_bytes_per_second

uint32_t fc::detail::rate_limiting_group_impl::_download_bytes_per_second

Definition at line 195 of file rate_limiting.cpp.

◆ _granularity

microseconds fc::detail::rate_limiting_group_impl::_granularity

Definition at line 198 of file rate_limiting.cpp.

◆ _last_read_iteration_time

time_point fc::detail::rate_limiting_group_impl::_last_read_iteration_time

Definition at line 210 of file rate_limiting.cpp.

◆ _last_write_iteration_time

time_point fc::detail::rate_limiting_group_impl::_last_write_iteration_time

Definition at line 211 of file rate_limiting.cpp.

◆ _new_read_operation_available_promise

promise<void>::ptr fc::detail::rate_limiting_group_impl::_new_read_operation_available_promise

Definition at line 214 of file rate_limiting.cpp.

◆ _new_write_operation_available_promise

promise<void>::ptr fc::detail::rate_limiting_group_impl::_new_write_operation_available_promise

Definition at line 216 of file rate_limiting.cpp.

◆ _process_pending_reads_loop_complete

future<void> fc::detail::rate_limiting_group_impl::_process_pending_reads_loop_complete

Definition at line 213 of file rate_limiting.cpp.

◆ _process_pending_writes_loop_complete

future<void> fc::detail::rate_limiting_group_impl::_process_pending_writes_loop_complete

Definition at line 215 of file rate_limiting.cpp.

◆ _read_operations_for_next_iteration

rate_limited_operation_list fc::detail::rate_limiting_group_impl::_read_operations_for_next_iteration

Definition at line 206 of file rate_limiting.cpp.

◆ _read_operations_in_progress

rate_limited_operation_list fc::detail::rate_limiting_group_impl::_read_operations_in_progress

Definition at line 205 of file rate_limiting.cpp.

◆ _read_tokens

uint32_t fc::detail::rate_limiting_group_impl::_read_tokens

Definition at line 199 of file rate_limiting.cpp.

◆ _unused_read_tokens

uint32_t fc::detail::rate_limiting_group_impl::_unused_read_tokens

Definition at line 200 of file rate_limiting.cpp.

◆ _unused_write_tokens

uint32_t fc::detail::rate_limiting_group_impl::_unused_write_tokens

Definition at line 202 of file rate_limiting.cpp.

◆ _upload_bytes_per_second

uint32_t fc::detail::rate_limiting_group_impl::_upload_bytes_per_second

Definition at line 194 of file rate_limiting.cpp.

◆ _write_operations_for_next_iteration

rate_limited_operation_list fc::detail::rate_limiting_group_impl::_write_operations_for_next_iteration

Definition at line 208 of file rate_limiting.cpp.

◆ _write_operations_in_progress

rate_limited_operation_list fc::detail::rate_limiting_group_impl::_write_operations_in_progress

Definition at line 207 of file rate_limiting.cpp.

◆ _write_tokens

uint32_t fc::detail::rate_limiting_group_impl::_write_tokens

Definition at line 201 of file rate_limiting.cpp.


The documentation for this class was generated from the following file: