Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
zipkin.cpp
Go to the documentation of this file.
1#include <fc/log/zipkin.hpp>
6#include <fc/crypto/hex.hpp>
8#include <fc/variant.hpp>
9
10#include <boost/asio.hpp>
11
12#include <thread>
13#include <random>
14
15namespace fc {
16
17zipkin_config& zipkin_config::get() {
18 static zipkin_config the_one;
19 return the_one;
20}
21
22void zipkin_config::init( const std::string& url, const std::string& service_name, uint32_t timeout_us ) {
23 get().zip = std::make_unique<zipkin>( url, service_name, timeout_us );
24}
25
27 if( !get().zip ) {
28 FC_THROW_EXCEPTION( fc::assert_exception, "uninitialized zipkin" );
29 }
30 return *get().zip;
31}
32
34 if( zipkin* z = get_zipkin_() ) {
35 z->shutdown();
36 }
37}
38
40 if( !get().zip ) {
41 FC_THROW_EXCEPTION( fc::assert_exception, "uninitialized zipkin" );
42 }
43 return get().zip->get_next_unique_id();
44}
45
47public:
48 static constexpr uint32_t max_consecutive_errors = 9;
49
50 const std::string zipkin_url;
51 const std::string service_name;
53 std::mutex mtx;
56 std::atomic<uint32_t> consecutive_errors = 0;
57 std::atomic<unsigned char> stopped = 0;
58 std::optional<url> endpoint;
59 std::thread thread;
60 boost::asio::io_context ctx;
61 boost::asio::io_context::strand work_strand{ctx};
62 boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard = boost::asio::make_work_guard(ctx);
63
64 impl( std::string url, std::string service_name, uint32_t timeout_us )
65 : zipkin_url( std::move(url) )
66 , service_name( std::move(service_name) )
68 }
69
70 void init();
71 void shutdown();
72
73 void log( zipkin_span::span_data&& span );
74
75 ~impl();
76};
77
79 thread = std::thread( [this]() {
80 fc::set_os_thread_name( "zipkin" );
81 while( true ) {
82 try {
83 ctx.run();
84 break;
86 }
87 } );
88}
89
91 try {
92 shutdown();
93 } catch (...) {}
94}
95
97 if( stopped ^= 1 ) return;
98 work_guard.reset(); // drain the queue
99 thread.join();
100}
101
102zipkin::zipkin( const std::string& url, const std::string& service_name, uint32_t timeout_us ) :
103 my( new impl( url, service_name, timeout_us ) ) {
104 my->init();
105}
106
108 std::scoped_lock g( my->mtx );
109 if( my->next_id == 0 ) {
110 std::mt19937_64 engine( std::random_device{}() );
111 std::uniform_int_distribution<uint64_t> distribution(1);
112 my->next_id = distribution( engine );
113 }
114 return my->next_id++;
115}
116
118 my->shutdown();
119}
120
121fc::variant create_zipkin_variant( zipkin_span::span_data&& span, const std::string& service_name ) {
122 // https://zipkin.io/zipkin-api/
123 // std::string traceId; // [a-f0-9]{16,32} unique id for trace, all children spans shared same id
124 // std::string name; // logical operation, should have low cardinality
125 // std::string parentId; // The parent span id, or absent if root span
126 // std::string id // a-f0-9]{16}
127 // int64_t timestamp // epoch microseconds of start of span
128 // int64_t duration // microseconds of span
129
130 uint64_t trace_id;
131 if( span.parent_id != 0 ) {
132 trace_id = span.parent_id;
133 } else {
134 trace_id = span.id;
135 }
136
138 mvo( "id", fc::to_hex( reinterpret_cast<const char*>(&span.id), sizeof( span.id ) ) );
139 mvo( "traceId", fc::to_hex( reinterpret_cast<const char*>(&trace_id), sizeof( trace_id ) ) );
140 if( span.parent_id != 0 ) {
141 mvo( "parentId", fc::to_hex( reinterpret_cast<const char*>(&span.parent_id), sizeof( span.parent_id ) ) );
142 }
143 mvo( "name", std::move( span.name ) );
144 mvo( "timestamp", span.start.time_since_epoch().count() );
145 mvo( "duration", (span.stop - span.start).count() );
146 mvo( "localEndpoint", fc::variant_object( "serviceName", service_name ) );
147
148 mvo( "tags", std::move( span.tags ) );
149 span.id = 0; // stop destructor of span from calling log again
150
151 // /api/v2/spans takes an array of spans
152 fc::variants result;
153 result.emplace_back( std::move( mvo ) );
154
155 return result;
156}
157
159 if( my->consecutive_errors > my->max_consecutive_errors || my->stopped )
160 return;
161
162 boost::asio::post(my->work_strand, [this, span{std::move(span)}]() mutable {
163 my->log( std::move( span ) );
164 });
165}
166
169 return;
170
171 try {
172 auto deadline = fc::time_point::now() + fc::microseconds( timeout_us );
173 if( !endpoint ) {
175 dlog( "connecting to zipkin: ${p}", ("p", *endpoint) );
176 }
177
178 http.post_sync( *endpoint, create_zipkin_variant( std::move( span ), service_name ), deadline );
179
181 return;
182 } catch( const fc::exception& e ) {
183 wlog( "unable to connect to zipkin: ${u}, error: ${e}", ("u", zipkin_url)("e", e.to_detail_string()) );
184 } catch( const std::exception& e ) {
185 wlog( "unable to connect to zipkin: ${u}, error: ${e}", ("u", zipkin_url)("e", e.what()) );
186 } catch( ... ) {
187 wlog( "unable to connect to zipkin: ${u}, error: unknown", ("u", zipkin_url) );
188 }
190}
191
193 // avoid 0 since id of 0 is used as a flag
194 return id._hash[3] == 0 ? 1 : id._hash[3];
195}
196
198 if( data.id == 0 )
199 return;
200 try {
202 data.stop = time_point::now();
203 zipkin_config::get_zipkin().log( std::move( data ) );
204 }
205 } catch( ... ) {}
206}
207
208} // fc
Used to generate a useful error report when an exception is thrown.
Definition exception.hpp:58
std::string to_detail_string(log_level ll=log_level::all) const
const char * what() const noexcept override
An order-preserving dictionary of variants.
static time_point now()
Definition time.cpp:14
An order-preserving dictionary of variants.
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
Definition variant.hpp:191
std::optional< url > endpoint
Definition zipkin.cpp:58
const uint32_t timeout_us
Definition zipkin.cpp:52
void log(zipkin_span::span_data &&span)
Definition zipkin.cpp:167
boost::asio::io_context ctx
Definition zipkin.cpp:60
http_client http
Definition zipkin.cpp:55
impl(std::string url, std::string service_name, uint32_t timeout_us)
Definition zipkin.cpp:64
const std::string service_name
Definition zipkin.cpp:51
std::atomic< unsigned char > stopped
Definition zipkin.cpp:57
static constexpr uint32_t max_consecutive_errors
Definition zipkin.cpp:48
std::atomic< uint32_t > consecutive_errors
Definition zipkin.cpp:56
std::mutex mtx
Definition zipkin.cpp:53
boost::asio::executor_work_guard< boost::asio::io_context::executor_type > work_guard
Definition zipkin.cpp:62
void shutdown()
Definition zipkin.cpp:96
const std::string zipkin_url
Definition zipkin.cpp:50
std::thread thread
Definition zipkin.cpp:59
boost::asio::io_context::strand work_strand
Definition zipkin.cpp:61
uint64_t next_id
Definition zipkin.cpp:54
static void shutdown()
Definition zipkin.cpp:33
static uint64_t get_next_unique_id()
Starts with a random id and increments on each call, will not return 0.
Definition zipkin.cpp:39
static bool is_enabled()
Thread safe only if init() called from main thread before spawning of any threads.
Definition zipkin.hpp:29
static zipkin & get_zipkin()
Definition zipkin.cpp:26
static void init(const std::string &url, const std::string &service_name, uint32_t timeout_us)
Definition zipkin.cpp:22
void log(zipkin_span::span_data &&span)
Definition zipkin.cpp:158
uint64_t get_next_unique_id()
Starts with a random id and increments on each call, will not return 0.
Definition zipkin.cpp:107
void shutdown()
Definition zipkin.cpp:117
zipkin(const std::string &url, const std::string &service_name, uint32_t timeout_us)
Definition zipkin.cpp:102
Defines exception's used by fc.
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define FC_LOG_AND_DROP(...)
#define wlog(FORMAT,...)
Definition logger.hpp:124
#define dlog(FORMAT,...)
Definition logger.hpp:101
namespace sysio::chain
Definition authority.cpp:3
fc::variant create_zipkin_variant(zipkin_span::span_data &&span, const std::string &service_name)
Definition zipkin.cpp:121
std::vector< fc::variant > variants
Definition variant.hpp:173
fc::string to_hex(const char *d, uint32_t s)
Definition hex.cpp:17
void set_os_thread_name(const string &name)
Definition name.hpp:106
string url
Definition main.cpp:166
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
static uint64_t to_id(const fc::sha256 &id)
Definition zipkin.cpp:192