Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
log.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <boost/filesystem.hpp>
4#include <fstream>
5#include <stdint.h>
6
10#include <fc/io/cfile.hpp>
11#include <fc/log/logger.hpp>
12
13namespace sysio {
14
15/*
16 * *.log:
17 * +---------+----------------+-----------+------------------+-----+---------+----------------+
18 * | Entry i | Pos of Entry i | Entry i+1 | Pos of Entry i+1 | ... | Entry z | Pos of Entry z |
19 * +---------+----------------+-----------+------------------+-----+---------+----------------+
20 *
21 * *.index:
22 * +----------------+------------------+-----+----------------+
23 * | Pos of Entry i | Pos of Entry i+1 | ... | Pos of Entry z |
24 * +----------------+------------------+-----+----------------+
25 *
26 * each entry:
27 * state_history_log_header
28 * payload
29 *
30 * When block pruning is enabled, a slight modification to the format is as followed:
31 * For first entry in log, a unique version is used to indicate the log is a "pruned log": this prevents
32 * older versions from trying to read something with holes in it
33 * The end of the log has a 4 byte value that indicates guaranteed number of blocks the log has at its
34 * end (this can be used to reconstruct an index of the log from the end even when there is a hole in
35 * the middle of the log)
36 */
37
38inline uint64_t ship_magic(uint16_t version, uint16_t features = 0) {
39 using namespace sysio::chain::literals;
40 return "ship"_n.to_uint64_t() | version | features<<16;
41}
42inline bool is_ship(uint64_t magic) {
43 using namespace sysio::chain::literals;
44 return (magic & 0xffff'ffff'0000'0000) == "ship"_n.to_uint64_t();
45}
46inline uint16_t get_ship_version(uint64_t magic) { return magic; }
47inline uint16_t get_ship_features(uint64_t magic) { return magic>>16; }
48inline bool is_ship_supported_version(uint64_t magic) { return get_ship_version(magic) == 0; }
49static const uint16_t ship_current_version = 0;
50static const uint16_t ship_feature_pruned_log = 1;
51inline bool is_ship_log_pruned(uint64_t magic) { return get_ship_features(magic) & ship_feature_pruned_log; }
52inline uint64_t clear_ship_log_pruned_feature(uint64_t magic) { return ship_magic(get_ship_version(magic), get_ship_features(magic) & ~ship_feature_pruned_log); }
53
59static const int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) +
63 uint32_t prune_blocks; //number of blocks to prune to when doing a prune
64 size_t prune_threshold = 4*1024*1024; //(approximately) how many bytes need to be added before a prune is performed
65 std::optional<size_t> vacuum_on_close; //when set, a vacuum is performed on dtor if log contains less than this many bytes
66};
67
69 private:
70 const char* const name = "";
71 std::string log_filename;
72 std::string index_filename;
73 std::optional<state_history_log_prune_config> prune_config; //is set, log is in pruned mode
74 fc::cfile log;
75 fc::cfile index;
76 uint32_t _begin_block = 0; //always tracks the first block available even after pruning
77 uint32_t _index_begin_block = 0; //the first block of the file; even after pruning. it's what index 0 in the index file points to
78 uint32_t _end_block = 0;
79 chain::block_id_type last_block_id;
80
81 public:
82 state_history_log(const char* const name, std::string log_filename, std::string index_filename,
83 std::optional<state_history_log_prune_config> prune_conf = std::optional<state_history_log_prune_config>())
84 : name(name)
85 , log_filename(std::move(log_filename))
86 , index_filename(std::move(index_filename))
87 , prune_config(prune_conf) {
88 open_log();
89 open_index();
90
91 if(prune_config) {
92 SYS_ASSERT(prune_config->prune_blocks, chain::plugin_exception, "state history log prune configuration requires at least one block");
93 SYS_ASSERT(__builtin_popcount(prune_config->prune_threshold) == 1, chain::plugin_exception, "state history prune threshold must be power of 2");
94 //switch this over to the mask that will be used
95 prune_config->prune_threshold = ~(prune_config->prune_threshold-1);
96 }
97
98 //check for conversions to/from pruned log, as long as log contains something
99 if(_begin_block != _end_block) {
100 state_history_log_header first_header;
101 log.seek(0);
102 read_header(first_header);
103
104 if((is_ship_log_pruned(first_header.magic) == false) && prune_config) {
105 //need to convert non-pruned to pruned; first prune any ranges we can (might be none)
106 prune(fc::log_level::info);
107
108 //update first header to indicate prune feature is enabled
109 log.seek(0);
110 first_header.magic = ship_magic(get_ship_version(first_header.magic), ship_feature_pruned_log);
111 write_header(first_header);
112
113 //write trailer on log with num blocks
114 log.seek_end(0);
115 const uint32_t num_blocks_in_log = _end_block - _begin_block;
116 fc::raw::pack(log, num_blocks_in_log);
117 }
118 else if(is_ship_log_pruned(first_header.magic) && !prune_config) {
119 vacuum();
120 }
121 }
122 }
123
125 //nothing to do if log is empty or we aren't pruning
126 if(_begin_block == _end_block)
127 return;
128 if(!prune_config || !prune_config->vacuum_on_close)
129 return;
130
131 const size_t first_data_pos = get_pos(_begin_block);
132 const size_t last_data_pos = fc::file_size(log.get_file_path());
133 if(last_data_pos - first_data_pos < *prune_config->vacuum_on_close)
134 vacuum();
135 }
136
137 uint32_t begin_block() const { return _begin_block; }
138 uint32_t end_block() const { return _end_block; }
139
140 void read_header(state_history_log_header& header, bool assert_version = true) {
141 char bytes[state_history_log_header_serial_size];
142 log.read(bytes, sizeof(bytes));
144 fc::raw::unpack(ds, header);
145 SYS_ASSERT(!ds.remaining(), chain::plugin_exception, "state_history_log_header_serial_size mismatch");
146 if (assert_version)
147 SYS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic), chain::plugin_exception,
148 "corrupt ${name}.log (0)", ("name", name));
149 }
150
152 char bytes[state_history_log_header_serial_size];
153 fc::datastream<char*> ds(bytes, sizeof(bytes));
154 fc::raw::pack(ds, header);
155 SYS_ASSERT(!ds.remaining(), chain::plugin_exception, "state_history_log_header_serial_size mismatch");
156 log.write(bytes, sizeof(bytes));
157 }
158
159 template <typename F>
160 void write_entry(state_history_log_header header, const chain::block_id_type& prev_id, F write_payload) {
161 auto block_num = chain::block_header::num_from_id(header.block_id);
162 SYS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception,
163 "missed a block in ${name}.log", ("name", name));
164
165 if (_begin_block != _end_block && block_num > _begin_block) {
166 if (block_num == _end_block) {
167 SYS_ASSERT(prev_id == last_block_id, chain::plugin_exception, "missed a fork change in ${name}.log",
168 ("name", name));
169 } else {
171 get_entry(block_num - 1, prev);
172 SYS_ASSERT(prev_id == prev.block_id, chain::plugin_exception, "missed a fork change in ${name}.log",
173 ("name", name));
174 }
175 }
176
177 if (block_num < _end_block)
178 truncate(block_num); //truncate is expected to always leave file pointer at the end
179 else if (!prune_config)
180 log.seek_end(0);
181 else if (prune_config && _begin_block != _end_block)
182 log.seek_end(-sizeof(uint32_t)); //overwrite the trailing block count marker on this write
183
184 //if we're operating on a pruned block log and this is the first entry in the log, make note of the feature in the header
185 if(prune_config && _begin_block == _end_block)
186 header.magic = ship_magic(get_ship_version(header.magic), ship_feature_pruned_log);
187
188 uint64_t pos = log.tellp();
189 write_header(header);
190 write_payload(log);
191 SYS_ASSERT(log.tellp() == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception,
192 "wrote payload with incorrect size to ${name}.log", ("name", name));
193 fc::raw::pack(log, pos);
194
195 fc::raw::pack(index, pos);
196 if (_begin_block == _end_block)
197 _index_begin_block = _begin_block = block_num;
198 _end_block = block_num + 1;
199 last_block_id = header.block_id;
200
201 if(prune_config) {
202 if((pos&prune_config->prune_threshold) != (log.tellp()&prune_config->prune_threshold))
204
205 const uint32_t num_blocks_in_log = _end_block - _begin_block;
206 fc::raw::pack(log, num_blocks_in_log);
207 }
208 }
209
210 // returns cfile positioned at payload
212 SYS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception,
213 "read non-existing block in ${name}.log", ("name", name));
214 log.seek(get_pos(block_num));
215 read_header(header);
216 return log;
217 }
218
221 get_entry(block_num, header);
222 return header.block_id;
223 }
224
225 private:
226 //file position must be at start of last block's suffix (back pointer)
227 bool get_last_block() {
229 uint64_t suffix;
230
231 fc::raw::unpack(log, suffix);
232 const size_t after_suffix_pos = log.tellp();
233 if (suffix > after_suffix_pos || suffix + state_history_log_header_serial_size > after_suffix_pos) {
234 elog("corrupt ${name}.log (2)", ("name", name));
235 return false;
236 }
237 log.seek(suffix);
238 read_header(header, false);
239 if (!is_ship(header.magic) || !is_ship_supported_version(header.magic) ||
240 suffix + state_history_log_header_serial_size + header.payload_size + sizeof(suffix) != after_suffix_pos) {
241 elog("corrupt ${name}.log (3)", ("name", name));
242 return false;
243 }
244 _end_block = chain::block_header::num_from_id(header.block_id) + 1;
245 last_block_id = header.block_id;
246 if (_begin_block >= _end_block) {
247 elog("corrupt ${name}.log (4)", ("name", name));
248 return false;
249 }
250 return true;
251 }
252
253 void prune(const fc::log_level& loglevel) {
254 if(!prune_config)
255 return;
256 if(_end_block - _begin_block <= prune_config->prune_blocks)
257 return;
258
259 const uint32_t prune_to_num = _end_block - prune_config->prune_blocks;
260 uint64_t prune_to_pos = get_pos(prune_to_num);
261
262 log.punch_hole(state_history_log_header_serial_size, prune_to_pos);
263
264 _begin_block = prune_to_num;
265 log.flush();
266
267 if(auto l = fc::logger::get(); l.is_enabled(loglevel))
268 l.log(fc::log_message(fc::log_context(loglevel, __FILE__, __LINE__, __func__),
269 "${name}.log pruned to blocks ${b}-${e}", fc::mutable_variant_object()("name", name)("b", _begin_block)("e", _end_block - 1)));
270 }
271
272 //only works on non-pruned logs
273 void recover_blocks() {
274 ilog("recover ${name}.log", ("name", name));
275 uint64_t pos = 0;
276 uint32_t num_found = 0;
277 log.seek_end(0);
278 const size_t size = log.tellp();
279
280 while (true) {
282 if (pos + state_history_log_header_serial_size > size)
283 break;
284 log.seek(pos);
285 read_header(header, false);
286 uint64_t suffix;
287 if (!is_ship(header.magic) || !is_ship_supported_version(header.magic) || header.payload_size > size ||
288 pos + state_history_log_header_serial_size + header.payload_size + sizeof(suffix) > size) {
289 SYS_ASSERT(!is_ship(header.magic) || is_ship_supported_version(header.magic), chain::plugin_exception,
290 "${name}.log has an unsupported version", ("name", name));
291 break;
292 }
293 log.seek(pos + state_history_log_header_serial_size + header.payload_size);
294 log.read((char*)&suffix, sizeof(suffix));
295 if (suffix != pos)
296 break;
297 pos = pos + state_history_log_header_serial_size + header.payload_size + sizeof(suffix);
298 if (!(++num_found % 10000)) {
299 ilog("${num_found} blocks found, log pos = ${pos}", ("num_found", num_found)("pos", pos));
300 }
301 }
302 log.flush();
303 boost::filesystem::resize_file(log_filename, pos);
304 log.flush();
305
306 log.seek_end(-sizeof(pos));
307 SYS_ASSERT(get_last_block(), chain::plugin_exception, "recover ${name}.log failed", ("name", name));
308 }
309
310 void open_log() {
311 log.set_file_path(log_filename);
312 log.open("a+b");
313 log.seek_end(0);
314 uint64_t size = log.tellp();
315 log.close();
316
317 log.open("r+b");
318 if (size >= state_history_log_header_serial_size) {
320 log.seek(0);
321 read_header(header, false);
323 state_history_log_header_serial_size + header.payload_size + sizeof(uint64_t) <= size,
324 chain::plugin_exception, "corrupt ${name}.log (1)", ("name", name));
325
326 log.seek_end(0);
327
328 std::optional<uint32_t> pruned_count;
329 if(is_ship_log_pruned(header.magic)) {
330 //the existing log is a prune'ed log. find the count of blocks at the end
331 log.skip(-sizeof(uint32_t));
334 pruned_count = count;
335 log.skip(-sizeof(uint32_t));
336 }
337
338 _index_begin_block = _begin_block = chain::block_header::num_from_id(header.block_id);
339 last_block_id = header.block_id;
340 log.skip(-sizeof(uint64_t));
341 if(!get_last_block()) {
342 SYS_ASSERT(!is_ship_log_pruned(header.magic), chain::plugin_exception, "${name}.log is pruned and cannot have recovery attempted", ("name", name));
343 recover_blocks();
344 }
345
346 if(pruned_count)
347 _begin_block = _end_block - *pruned_count;
348
349 ilog("${name}.log has blocks ${b}-${e}", ("name", name)("b", _begin_block)("e", _end_block - 1));
350 } else {
351 SYS_ASSERT(!size, chain::plugin_exception, "corrupt ${name}.log (5)", ("name", name));
352 ilog("${name}.log is empty", ("name", name));
353 }
354 }
355
356 void open_index() {
357 index.set_file_path(index_filename);
358 index.open("a+b");
359 index.seek_end(0);
360 if (index.tellp() == (static_cast<int>(_end_block) - _index_begin_block) * sizeof(uint64_t))
361 return;
362 ilog("Regenerate ${name}.index", ("name", name));
363 index.close();
364
365 index.open("wb");
366 log.seek_end(0);
367 if(log.tellp()) {
368 uint32_t remaining = _end_block - _begin_block;
369 index.seek((_end_block - _index_begin_block)*sizeof(uint64_t)); //this can make the index sparse for a pruned log; but that's okay
370
371 log.seek(0);
372 state_history_log_header first_entry_header;
373 read_header(first_entry_header);
374 log.seek_end(0);
375 if(is_ship_log_pruned(first_entry_header.magic))
376 log.skip(-sizeof(uint32_t));
377
378 while(remaining--) {
379 uint64_t pos = 0;
381 log.skip(-sizeof(pos));
382 fc::raw::unpack(log, pos);
383 log.seek(pos);
384 read_header(header, false);
385 log.seek(pos);
386 SYS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic), chain::plugin_exception, "corrupt ${name}.log (6)", ("name", name));
387
388 index.skip(-sizeof(uint64_t));
389 fc::raw::pack(index, pos);
390 index.skip(-sizeof(uint64_t));
391
392 if (!(remaining % 10000))
393 ilog("${r} blocks remaining, log pos = ${pos}", ("r", remaining)("pos", pos));
394 }
395 }
396
397 index.close();
398 index.open("a+b");
399 }
400
401 uint64_t get_pos(uint32_t block_num) {
402 uint64_t pos;
403 index.seek((block_num - _index_begin_block) * sizeof(pos));
404 index.read((char*)&pos, sizeof(pos));
405 return pos;
406 }
407
408 void truncate(uint32_t block_num) {
409 log.flush();
410 index.flush();
411 uint64_t num_removed = 0;
412 if (block_num <= _begin_block) {
413 num_removed = _end_block - _begin_block;
414 log.seek(0);
415 boost::filesystem::resize_file(log_filename, 0);
416 boost::filesystem::resize_file(index_filename, 0);
417 _begin_block = _end_block = 0;
418 } else {
419 num_removed = _end_block - block_num;
420 uint64_t pos = get_pos(block_num);
421 log.seek(0);
422 boost::filesystem::resize_file(log_filename, pos);
423 boost::filesystem::resize_file(index_filename, (block_num - _index_begin_block) * sizeof(uint64_t));
424 _end_block = block_num;
425 //this will leave the end of the log with the last block's suffix no matter if the log is operating in pruned
426 // mode or not. The assumption is truncate() is always immediately followed up with an append to the log thus
427 // restoring the prune trailer if required
428 }
429 log.seek_end(0);
430 ilog("fork or replay: removed ${n} blocks from ${name}.log", ("n", num_removed)("name", name));
431 }
432
433 void vacuum() {
434 //a completely empty log should have nothing on disk; don't touch anything
435 if(_begin_block == _end_block)
436 return;
437
438 log.seek(0);
440 fc::raw::unpack(log, magic);
441 SYS_ASSERT(is_ship_log_pruned(magic), chain::plugin_exception, "vacuum can only be performed on pruned logs");
442
443 //may happen if _begin_block is still first block on-disk of log. clear the pruned feature flag & erase
444 // the 4 byte trailer. The pruned flag is only set on the first header in the log, so it does not need
445 // to be touched up if we actually vacuum up any other blocks to the front.
446 if(_begin_block == _index_begin_block) {
447 log.seek(0);
449 log.flush();
450 fc::resize_file(log.get_file_path(), fc::file_size(log.get_file_path()) - sizeof(uint32_t));
451 return;
452 }
453
454 ilog("Vacuuming pruned log ${n}", ("n", name));
455
456 size_t copy_from_pos = get_pos(_begin_block);
457 size_t copy_to_pos = 0;
458
459 const size_t offset_bytes = copy_from_pos - copy_to_pos;
460 const size_t offset_blocks = _begin_block - _index_begin_block;
461 log.seek_end(0);
462 size_t copy_sz = log.tellp() - copy_from_pos - sizeof(uint32_t); //don't copy trailer in to new unpruned log
463 const uint32_t num_blocks_in_log = _end_block - _begin_block;
464
465 std::vector<char> buff;
466 buff.resize(4*1024*1024);
467
468 auto tick = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now());
469 while(copy_sz) {
470 const size_t copy_this_round = std::min(buff.size(), copy_sz);
471 log.seek(copy_from_pos);
472 log.read(buff.data(), copy_this_round);
473 log.punch_hole(copy_to_pos, copy_from_pos+copy_this_round);
474 log.seek(copy_to_pos);
475 log.write(buff.data(), copy_this_round);
476
477 copy_from_pos += copy_this_round;
478 copy_to_pos += copy_this_round;
479 copy_sz -= copy_this_round;
480
481 const auto tock = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now());
482 if(tick < tock - std::chrono::seconds(5)) {
483 ilog("Vacuuming pruned log ${n}, ${b} bytes remaining", ("b", copy_sz)("n", name));
484 tick = tock;
485 }
486 }
487 log.flush();
488 fc::resize_file(log.get_file_path(), log.tellp());
489
490 index.flush();
491 {
492 boost::interprocess::mapped_region index_mapped(index, boost::interprocess::read_write);
493 uint64_t* index_ptr = (uint64_t*)index_mapped.get_address();
494
495 for(uint32_t new_block_num = 0; new_block_num < num_blocks_in_log; ++new_block_num) {
496 const uint64_t new_pos = index_ptr[new_block_num + offset_blocks] - offset_bytes;
497 index_ptr[new_block_num] = new_pos;
498
499 if(new_block_num + 1 != num_blocks_in_log)
500 log.seek(index_ptr[new_block_num + offset_blocks + 1] - offset_bytes - sizeof(uint64_t));
501 else
502 log.seek_end(-sizeof(uint64_t));
503 log.write((char*)&new_pos, sizeof(new_pos));
504 }
505 }
506 fc::resize_file(index.get_file_path(), num_blocks_in_log*sizeof(uint64_t));
507
508 _index_begin_block = _begin_block;
509 ilog("Vacuum of pruned log ${n} complete",("n", name));
510 }
511}; // state_history_log
512
513} // namespace sysio
514
515FC_REFLECT(sysio::state_history_log_header, (magic)(block_id)(payload_size))
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
provides information about where and when a log message was generated.
aggregates a message along with the context and associated meta-information.
static logger get(const fc::string &name=DEFAULT_LOGGER)
Definition logger.cpp:88
bool is_enabled(log_level e) const
Definition logger.cpp:58
An order-preserving dictionary of variants.
uint32_t end_block() const
Definition log.hpp:138
chain::block_id_type get_block_id(uint32_t block_num)
Definition log.hpp:219
void read_header(state_history_log_header &header, bool assert_version=true)
Definition log.hpp:140
state_history_log(const char *const name, std::string log_filename, std::string index_filename, std::optional< state_history_log_prune_config > prune_conf=std::optional< state_history_log_prune_config >())
Definition log.hpp:82
void write_entry(state_history_log_header header, const chain::block_id_type &prev_id, F write_payload)
Definition log.hpp:160
void write_header(const state_history_log_header &header)
Definition log.hpp:151
uint32_t begin_block() const
Definition log.hpp:137
fc::cfile & get_entry(uint32_t block_num, state_history_log_header &header)
Definition log.hpp:211
int * count
#define __func__
#define ilog(FORMAT,...)
Definition logger.hpp:118
#define elog(FORMAT,...)
Definition logger.hpp:130
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
void pack(Stream &s, const std::deque< T > &value)
Definition raw.hpp:531
uint64_t file_size(const path &p)
void resize_file(const path &file, size_t s)
Definition name.hpp:106
uint64_t clear_ship_log_pruned_feature(uint64_t magic)
Definition log.hpp:52
uint16_t get_ship_features(uint64_t magic)
Definition log.hpp:47
uint16_t get_ship_version(uint64_t magic)
Definition log.hpp:46
bool is_ship_supported_version(uint64_t magic)
Definition log.hpp:48
bool is_ship_log_pruned(uint64_t magic)
Definition log.hpp:51
bool is_ship(uint64_t magic)
Definition log.hpp:42
uint64_t ship_magic(uint16_t version, uint16_t features=0)
Definition log.hpp:38
#define FC_REFLECT(TYPE, MEMBERS)
Specializes fc::reflector for TYPE.
Definition reflect.hpp:311
unsigned short uint16_t
Definition stdint.h:125
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
static uint32_t num_from_id(const block_id_type &id)
Immutable except for fc::from_variant.
Definition name.hpp:43
chain::block_id_type block_id
Definition log.hpp:56
std::optional< size_t > vacuum_on_close
Definition log.hpp:65
int l