Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
round_robin.hpp
Go to the documentation of this file.
1// Copyright Oliver Kowalke 2013.
2// Distributed under the Boost Software License, Version 1.0.
3// (See accompanying file LICENSE_1_0.txt or copy at
4// http://www.boost.org/LICENSE_1_0.txt)
5
6#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
7#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H
8
9#include <chrono>
10#include <cstddef>
11#include <mutex>
12#include <queue>
13
14#include <boost/asio.hpp>
15#include <boost/assert.hpp>
16#include <boost/asio/steady_timer.hpp>
17#include <boost/config.hpp>
18
19#include <boost/fiber/condition_variable.hpp>
20#include <boost/fiber/context.hpp>
21#include <boost/fiber/mutex.hpp>
22#include <boost/fiber/operations.hpp>
23#include <boost/fiber/scheduler.hpp>
24
25#include "yield.hpp"
26
27#ifdef BOOST_HAS_ABI_HEADERS
28# include BOOST_ABI_PREFIX
29#endif
30
31namespace boost {
32namespace fibers {
33namespace asio {
34
35class round_robin : public algo::algorithm {
36private:
37 typedef scheduler::ready_queue_t rqueue_t;
38
39//[asio_rr_suspend_timer
40 boost::asio::io_service & io_svc_;
41 boost::asio::steady_timer suspend_timer_;
42//]
43 rqueue_t rqueue_{};
44
45public:
46//[asio_rr_service_top
47 struct service : public boost::asio::io_service::service {
48 static boost::asio::io_service::id id;
49
50 std::unique_ptr< boost::asio::io_service::work > work_;
51
52 service( boost::asio::io_service & io_svc) :
53 boost::asio::io_service::service( io_svc),
54 work_{ new boost::asio::io_service::work( io_svc) } {
55 io_svc.post([&io_svc](){
56//]
57//[asio_rr_service_lambda
58 while ( ! io_svc.stopped() ) {
59 if ( boost::fibers::has_ready_fibers() ) {
60 // run all pending handlers in round_robin
61 while ( io_svc.poll() );
62 // run pending (ready) fibers
63 this_fiber::yield();
64 } else {
65 // run one handler inside io_service
66 // if no handler available, block this thread
67 if ( ! io_svc.run_one() ) {
68 break;
69 }
70 }
71 }
72//]
73//[asio_rr_service_bottom
74 });
75 }
76
77 virtual ~service() {}
78
79 service( service const&) = delete;
80 service & operator=( service const&) = delete;
81
82 void shutdown_service() override final {
83 work_.reset();
84 }
85 };
86//]
87
88//[asio_rr_ctor
89 round_robin( boost::asio::io_service & io_svc) :
90 io_svc_( io_svc),
91 suspend_timer_( io_svc_) {
92 // We use add_service() very deliberately. This will throw
93 // service_already_exists if you pass the same io_service instance to
94 // more than one round_robin instance.
95 boost::asio::add_service( io_svc_, new service( io_svc_));
96 }
97//]
98
99 void awakened( context * ctx) noexcept {
100 BOOST_ASSERT( nullptr != ctx);
101 ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
102 }
103
104 context * pick_next() noexcept {
105 context * ctx( nullptr);
106 if ( ! rqueue_.empty() ) { /*<
107 pop an item from the ready queue
108 >*/
109 ctx = & rqueue_.front();
110 rqueue_.pop_front();
111 BOOST_ASSERT( nullptr != ctx);
112 BOOST_ASSERT( context::active() != ctx);
113 }
114 return ctx;
115 }
116
117 bool has_ready_fibers() const noexcept {
118 return ! rqueue_.empty();
119 }
120
121//[asio_rr_suspend_until
122 void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
123 // Set a timer so at least one handler will eventually fire, causing
124 // run_one() to eventually return. Set a timer even if abs_time ==
125 // time_point::max() so the timer can be canceled by our notify()
126 // method -- which calls the handler.
127 if ( suspend_timer_.expires_at() != abs_time) {
128 // Each expires_at(time_point) call cancels any previous pending
129 // call. We could inadvertently spin like this:
130 // dispatcher calls suspend_until() with earliest wake time
131 // suspend_until() sets suspend_timer_
132 // lambda loop calls run_one()
133 // some other asio handler runs before timer expires
134 // run_one() returns to lambda loop
135 // lambda loop yields to dispatcher
136 // dispatcher finds no ready fibers
137 // dispatcher calls suspend_until() with SAME wake time
138 // suspend_until() sets suspend_timer_ to same time, canceling
139 // previous async_wait()
140 // lambda loop calls run_one()
141 // asio calls suspend_timer_ handler with operation_aborted
142 // run_one() returns to lambda loop... etc. etc.
143 // So only actually set the timer when we're passed a DIFFERENT
144 // abs_time value.
145 suspend_timer_.expires_at( abs_time);
146 // It really doesn't matter what the suspend_timer_ handler does,
147 // or even whether it's called because the timer ran out or was
148 // canceled. The whole point is to cause the run_one() call to
149 // return. So just pass a no-op lambda with proper signature.
150 suspend_timer_.async_wait([](boost::system::error_code const&){});
151 }
152 }
153//]
154
155//[asio_rr_notify
156 void notify() noexcept {
157 // Something has happened that should wake one or more fibers BEFORE
158 // suspend_timer_ expires. Reset the timer to cause it to fire
159 // immediately, causing the run_one() call to return. In theory we
160 // could use cancel() because we don't care whether suspend_timer_'s
161 // handler is called with operation_aborted or success. However --
162 // cancel() doesn't change the expiration time, and we use
163 // suspend_timer_'s expiration time to decide whether it's already
164 // set. If suspend_until() set some specific wake time, then notify()
165 // canceled it, then suspend_until() was called again with the same
166 // wake time, it would match suspend_timer_'s expiration time and we'd
167 // refrain from setting the timer. So instead of simply calling
168 // cancel(), reset the timer, which cancels the pending sleep AND sets
169 // a new expiration time. This will cause us to spin the loop twice --
170 // once for the operation_aborted handler, once for timer expiration
171 // -- but that shouldn't be a big problem.
172 suspend_timer_.expires_at( std::chrono::steady_clock::now() );
173 }
174//]
175};
176
177boost::asio::io_service::id round_robin::service::id;
178
179}}}
180
181#ifdef BOOST_HAS_ABI_HEADERS
182# include BOOST_ABI_SUFFIX
183#endif
184
185#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H
bool has_ready_fibers() const noexcept
void suspend_until(std::chrono::steady_clock::time_point const &abs_time) noexcept
round_robin(boost::asio::io_service &io_svc)
void awakened(context *ctx) noexcept
service(boost::asio::io_service &io_svc)
static boost::asio::io_service::id id
service & operator=(service const &)=delete
std::unique_ptr< boost::asio::io_service::work > work_