Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
gelf_appender.cpp
Go to the documentation of this file.
2#include <fc/network/ip.hpp>
7#include <fc/variant.hpp>
8#include <fc/io/json.hpp>
9#include <fc/crypto/city.hpp>
10#include <fc/compress/zlib.hpp>
11
12#include <boost/lexical_cast.hpp>
13#include <iomanip>
14#include <iostream>
15#include <queue>
16#include <sstream>
17#include <iostream>
18
19namespace fc
20{
21 namespace detail
22 {
23 boost::asio::ip::udp::endpoint to_asio_ep( const fc::ip::endpoint& e )
24 {
25 return boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4(e.get_address()), e.port() );
26 }
27 }
28
29 const std::vector<std::string> gelf_appender::config::reserved_field_names = {
30 "_id", // per GELF specification
31 "_timestamp_ns", // Remaining names all populated by appender
32 "_log_id",
33 "_line",
34 "_file",
35 "_method_name",
36 "_thread_name",
37 "_task_name"
38 };
39
40 const std::regex gelf_appender::config::user_field_name_pattern{"^_[\\w\\.\\-]*$"}; // per GELF specification
41
43 {
44 public:
46 std::optional<boost::asio::ip::udp::endpoint> gelf_endpoint;
48
49 impl(const variant& c)
50 {
52 from_variant(c, mvo);
53
54 cfg.endpoint = mvo["endpoint"].as<std::string>();
55 mvo.erase("endpoint");
56 cfg.host = mvo["host"].as<std::string>();
57 mvo.erase("host");
58 cfg.user_fields = mvo;
59
60 for(auto&& field_name : config::reserved_field_names) {
61 if (cfg.user_fields.contains(field_name.c_str())) {
62 FC_THROW_EXCEPTION(invalid_arg_exception, "Field name '${field_name}' is reserved",
63 ("field_name", field_name));
64 }
65 }
66 for(auto&& field : cfg.user_fields) {
67 if (!std::regex_match(field.key(), config::user_field_name_pattern)) {
68 FC_THROW_EXCEPTION(invalid_arg_exception, "Field name '${field_name} must begin with an underscore and contain only letters, numbers, underscores, dashes, and dots.",
69 ("field_name", field.key()));
70 }
71 }
72 }
73
75 {
76 }
77 };
78
80 my(new impl(args))
81 {
82 }
83
84 void gelf_appender::initialize(boost::asio::io_service &io_service)
85 {
86 try
87 {
88 try
89 {
90 // if it's a numeric address:port, this will parse it
91 my->gelf_endpoint = detail::to_asio_ep(ip::endpoint::from_string(my->cfg.endpoint));
92 }
93 catch (...)
94 {
95 }
96 if (!my->gelf_endpoint)
97 {
98 // couldn't parse as a numeric ip address, try resolving as a DNS name.
99 // This can yield, so don't do it in the catch block above
100 string::size_type colon_pos = my->cfg.endpoint.find(':');
101 try
102 {
103 uint16_t port = boost::lexical_cast<uint16_t>(my->cfg.endpoint.substr(colon_pos + 1, my->cfg.endpoint.size()));
104
105 string hostname = my->cfg.endpoint.substr( 0, colon_pos );
106 auto endpoints = resolve(io_service, hostname, port);
107 if (endpoints.empty())
108 FC_THROW_EXCEPTION(unknown_host_exception, "The logging destination host name can not be resolved: ${hostname}",
109 ("hostname", hostname));
110 my->gelf_endpoint = endpoints.back();
111 }
112 catch (const boost::bad_lexical_cast&)
113 {
114 FC_THROW("Bad port: ${port}", ("port", my->cfg.endpoint.substr(colon_pos + 1, my->cfg.endpoint.size())));
115 }
116 }
117
118 if (my->gelf_endpoint)
119 {
120 my->gelf_socket.initialize(io_service);
121 my->gelf_socket.open();
122 std::cerr << "opened GELF socket to endpoint " << my->cfg.endpoint << "\n";
123 }
124 }
125 catch (...)
126 {
127 std::cerr << "error opening GELF socket to endpoint " << my->cfg.endpoint << "\n";
128 }
129 }
130
133
134 void gelf_appender::log(const log_message& message)
135 {
136 if (!my->gelf_endpoint)
137 return;
138
139 log_context context = message.get_context();
140
141 mutable_variant_object gelf_message;
142 gelf_message["version"] = "1.1";
143 gelf_message["host"] = my->cfg.host;
144 gelf_message["short_message"] = format_string(message.get_format(), message.get_data(), true);
145
146 // use now() instead of context.get_timestamp() because log_message construction can include user provided long running calls
147 const auto time_ns = time_point::now().time_since_epoch().count();
148 gelf_message["timestamp"] = time_ns / 1000000.;
149 gelf_message["_timestamp_ns"] = time_ns;
150
151 static uint64_t gelf_log_counter;
152 gelf_message["_log_id"] = fc::to_string(++gelf_log_counter);
153
154 switch (context.get_log_level())
155 {
156 case log_level::debug:
157 gelf_message["level"] = 7; // debug
158 break;
159 case log_level::info:
160 gelf_message["level"] = 6; // info
161 break;
162 case log_level::warn:
163 gelf_message["level"] = 4; // warning
164 break;
165 case log_level::error:
166 gelf_message["level"] = 3; // error
167 break;
168 case log_level::all:
169 case log_level::off:
170 // these shouldn't be used in log messages, but do something deterministic just in case
171 gelf_message["level"] = 6; // info
172 break;
173 }
174
175 if (!context.get_context().empty())
176 gelf_message["context"] = context.get_context();
177 gelf_message["_line"] = context.get_line_number();
178 gelf_message["_file"] = context.get_file();
179 gelf_message["_method_name"] = context.get_method();
180 gelf_message["_thread_name"] = context.get_thread_name();
181 if (!context.get_task_name().empty())
182 gelf_message["_task_name"] = context.get_task_name();
183
184 for(auto&& field : my->cfg.user_fields) {
185 gelf_message[field.key()] = field.value();
186 }
187
188 string gelf_message_as_string = json::to_string(gelf_message,
190 json::output_formatting::legacy_generator); // GELF 1.1 specifies unstringified numbers
191 gelf_message_as_string = zlib_compress(gelf_message_as_string);
192
193 // packets are sent by UDP, and they tend to disappear if they
194 // get too large. It's hard to find any solid numbers on how
195 // large they can be before they get dropped -- datagrams can
196 // be up to 64k, but anything over 512 is not guaranteed.
197 // You can play with this number, intermediate values like
198 // 1400 and 8100 are likely to work on most intranets.
199 const unsigned max_payload_size = 512;
200
201 if (gelf_message_as_string.size() <= max_payload_size)
202 {
203 // no need to split
204 std::shared_ptr<char> send_buffer(new char[gelf_message_as_string.size()],
205 [](char* p){ delete[] p; });
206 memcpy(send_buffer.get(), gelf_message_as_string.c_str(),
207 gelf_message_as_string.size());
208
209 my->gelf_socket.send_to(send_buffer, gelf_message_as_string.size(),
210 *my->gelf_endpoint);
211 }
212 else
213 {
214 // split the message
215 // we need to generate an 8-byte ID for this message.
216 // city hash should do
217 uint64_t message_id = city_hash64(gelf_message_as_string.c_str(), gelf_message_as_string.size());
218 const unsigned header_length = 2 /* magic */ + 8 /* msg id */ + 1 /* seq */ + 1 /* count */;
219 const unsigned body_length = max_payload_size - header_length;
220 unsigned total_number_of_packets = (gelf_message_as_string.size() + body_length - 1) / body_length;
221 unsigned bytes_sent = 0;
222 unsigned number_of_packets_sent = 0;
223 while (bytes_sent < gelf_message_as_string.size())
224 {
225 unsigned bytes_to_send = std::min((unsigned)gelf_message_as_string.size() - bytes_sent,
226 body_length);
227
228 std::shared_ptr<char> send_buffer(new char[max_payload_size],
229 [](char* p){ delete[] p; });
230 char* ptr = send_buffer.get();
231 // magic number for chunked message
232 *(unsigned char*)ptr++ = 0x1e;
233 *(unsigned char*)ptr++ = 0x0f;
234
235 // message id
236 memcpy(ptr, (char*)&message_id, sizeof(message_id));
237 ptr += sizeof(message_id);
238
239 *(unsigned char*)(ptr++) = number_of_packets_sent;
240 *(unsigned char*)(ptr++) = total_number_of_packets;
241 memcpy(ptr, gelf_message_as_string.c_str() + bytes_sent,
242 bytes_to_send);
243 my->gelf_socket.send_to(send_buffer, header_length + bytes_to_send,
244 *my->gelf_endpoint);
245 ++number_of_packets_sent;
246 bytes_sent += bytes_to_send;
247 }
248 FC_ASSERT(number_of_packets_sent == total_number_of_packets);
249 }
250 }
251} // fc
const mie::Vuint & p
Definition bn.cpp:27
std::shared_ptr< appender > ptr
Definition appender.hpp:37
static constexpr fc::microseconds format_time_limit
Definition exception.hpp:60
std::optional< boost::asio::ip::udp::endpoint > gelf_endpoint
impl(const variant &c)
virtual void log(const log_message &m) override
gelf_appender(const variant &args)
void initialize(boost::asio::io_service &io_service) override
Required for name resolution and socket initialization.
const address & get_address() const
Definition ip.cpp:72
static endpoint from_string(const string &s)
Definition ip.cpp:74
uint16_t port() const
Definition ip.cpp:71
static string to_string(const variant &v, const yield_function_t &yield, const output_formatting format=output_formatting::stringify_large_ints_and_doubles)
Definition json.cpp:674
provides information about where and when a log message was generated.
aggregates a message along with the context and associated meta-information.
constexpr int64_t count() const
Definition time.hpp:26
An order-preserving dictionary of variants.
void erase(const string &key)
static time_point now()
Definition time.cpp:14
constexpr const microseconds & time_since_epoch() const
Definition time.hpp:52
bool contains(const char *key) const
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
Definition variant.hpp:191
Defines exception's used by fc.
#define FC_THROW( ...)
#define FC_THROW_EXCEPTION(EXCEPTION, FORMAT,...)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
ehm field
boost::asio::ip::udp::endpoint to_asio_ep(const fc::ip::endpoint &e)
namespace sysio::chain
Definition authority.cpp:3
fc::string to_string(double)
Definition string.cpp:131
string zlib_compress(const string &in)
Definition zlib.cpp:11
uint64_t city_hash64(const char *buf, size_t len)
Definition city.cpp:394
std::vector< boost::asio::ip::udp::endpoint > resolve(boost::asio::io_service &io_service, const std::string &host, uint16_t port)
Definition resolve.cpp:6
void from_variant(const fc::variant &v, sysio::chain::chain_id_type &cid)
fc::string format_string(const fc::string &, const variant_object &, bool minimize=false)
Definition variant.cpp:773
unsigned short uint16_t
Definition stdint.h:125
unsigned __int64 uint64_t
Definition stdint.h:136
static const std::regex user_field_name_pattern
static const std::vector< std::string > reserved_field_names
memcpy((char *) pInfo->slotDescription, s, l)