70 const char*
const name =
"";
71 std::string log_filename;
72 std::string index_filename;
73 std::optional<state_history_log_prune_config> prune_config;
83 std::optional<state_history_log_prune_config> prune_conf = std::optional<state_history_log_prune_config>())
85 , log_filename(
std::move(log_filename))
86 , index_filename(
std::move(index_filename))
87 , prune_config(prune_conf) {
92 SYS_ASSERT(prune_config->prune_blocks, chain::plugin_exception,
"state history log prune configuration requires at least one block");
93 SYS_ASSERT(__builtin_popcount(prune_config->prune_threshold) == 1, chain::plugin_exception,
"state history prune threshold must be power of 2");
95 prune_config->prune_threshold = ~(prune_config->prune_threshold-1);
99 if(_begin_block != _end_block) {
115 const uint32_t num_blocks_in_log = _end_block - _begin_block;
126 if(_begin_block == _end_block)
128 if(!prune_config || !prune_config->vacuum_on_close)
131 const size_t first_data_pos = get_pos(_begin_block);
132 const size_t last_data_pos =
fc::file_size(log.get_file_path());
133 if(last_data_pos - first_data_pos < *prune_config->vacuum_on_close)
141 char bytes[state_history_log_header_serial_size];
145 SYS_ASSERT(!ds.remaining(), chain::plugin_exception,
"state_history_log_header_serial_size mismatch");
148 "corrupt ${name}.log (0)", (
"name",
name));
152 char bytes[state_history_log_header_serial_size];
155 SYS_ASSERT(!ds.remaining(), chain::plugin_exception,
"state_history_log_header_serial_size mismatch");
159 template <
typename F>
162 SYS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception,
163 "missed a block in ${name}.log", (
"name",
name));
165 if (_begin_block != _end_block && block_num > _begin_block) {
166 if (block_num == _end_block) {
167 SYS_ASSERT(prev_id == last_block_id, chain::plugin_exception,
"missed a fork change in ${name}.log",
172 SYS_ASSERT(prev_id == prev.
block_id, chain::plugin_exception,
"missed a fork change in ${name}.log",
177 if (block_num < _end_block)
179 else if (!prune_config)
181 else if (prune_config && _begin_block != _end_block)
185 if(prune_config && _begin_block == _end_block)
191 SYS_ASSERT(log.tellp() == pos + state_history_log_header_serial_size + header.
payload_size, chain::plugin_exception,
192 "wrote payload with incorrect size to ${name}.log", (
"name",
name));
196 if (_begin_block == _end_block)
197 _index_begin_block = _begin_block = block_num;
198 _end_block = block_num + 1;
202 if((pos&prune_config->prune_threshold) != (log.tellp()&prune_config->prune_threshold))
205 const uint32_t num_blocks_in_log = _end_block - _begin_block;
212 SYS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception,
213 "read non-existing block in ${name}.log", (
"name",
name));
214 log.seek(get_pos(block_num));
227 bool get_last_block() {
232 const size_t after_suffix_pos = log.tellp();
233 if (suffix > after_suffix_pos || suffix + state_history_log_header_serial_size > after_suffix_pos) {
234 elog(
"corrupt ${name}.log (2)", (
"name",
name));
240 suffix + state_history_log_header_serial_size + header.
payload_size +
sizeof(suffix) != after_suffix_pos) {
241 elog(
"corrupt ${name}.log (3)", (
"name",
name));
246 if (_begin_block >= _end_block) {
247 elog(
"corrupt ${name}.log (4)", (
"name",
name));
256 if(_end_block - _begin_block <= prune_config->prune_blocks)
259 const uint32_t prune_to_num = _end_block - prune_config->prune_blocks;
260 uint64_t prune_to_pos = get_pos(prune_to_num);
262 log.punch_hole(state_history_log_header_serial_size, prune_to_pos);
264 _begin_block = prune_to_num;
273 void recover_blocks() {
274 ilog(
"recover ${name}.log", (
"name",
name));
278 const size_t size = log.tellp();
282 if (pos + state_history_log_header_serial_size > size)
288 pos + state_history_log_header_serial_size + header.
payload_size +
sizeof(suffix) > size) {
290 "${name}.log has an unsupported version", (
"name",
name));
293 log.seek(pos + state_history_log_header_serial_size + header.
payload_size);
294 log.read((
char*)&suffix,
sizeof(suffix));
297 pos = pos + state_history_log_header_serial_size + header.
payload_size +
sizeof(suffix);
298 if (!(++num_found % 10000)) {
299 ilog(
"${num_found} blocks found, log pos = ${pos}", (
"num_found", num_found)(
"pos", pos));
303 boost::filesystem::resize_file(log_filename, pos);
306 log.seek_end(-
sizeof(pos));
307 SYS_ASSERT(get_last_block(), chain::plugin_exception,
"recover ${name}.log failed", (
"name",
name));
311 log.set_file_path(log_filename);
318 if (size >= state_history_log_header_serial_size) {
324 chain::plugin_exception,
"corrupt ${name}.log (1)", (
"name",
name));
328 std::optional<uint32_t> pruned_count;
334 pruned_count =
count;
341 if(!get_last_block()) {
347 _begin_block = _end_block - *pruned_count;
349 ilog(
"${name}.log has blocks ${b}-${e}", (
"name",
name)(
"b", _begin_block)(
"e", _end_block - 1));
351 SYS_ASSERT(!size, chain::plugin_exception,
"corrupt ${name}.log (5)", (
"name",
name));
352 ilog(
"${name}.log is empty", (
"name",
name));
357 index.set_file_path(index_filename);
360 if (index.tellp() == (
static_cast<int>(_end_block) - _index_begin_block) *
sizeof(
uint64_t))
362 ilog(
"Regenerate ${name}.index", (
"name",
name));
368 uint32_t remaining = _end_block - _begin_block;
369 index.seek((_end_block - _index_begin_block)*
sizeof(
uint64_t));
381 log.skip(-
sizeof(pos));
392 if (!(remaining % 10000))
393 ilog(
"${r} blocks remaining, log pos = ${pos}", (
"r", remaining)(
"pos", pos));
403 index.seek((block_num - _index_begin_block) *
sizeof(pos));
404 index.read((
char*)&pos,
sizeof(pos));
412 if (block_num <= _begin_block) {
413 num_removed = _end_block - _begin_block;
415 boost::filesystem::resize_file(log_filename, 0);
416 boost::filesystem::resize_file(index_filename, 0);
417 _begin_block = _end_block = 0;
419 num_removed = _end_block - block_num;
422 boost::filesystem::resize_file(log_filename, pos);
423 boost::filesystem::resize_file(index_filename, (block_num - _index_begin_block) *
sizeof(
uint64_t));
424 _end_block = block_num;
430 ilog(
"fork or replay: removed ${n} blocks from ${name}.log", (
"n", num_removed)(
"name",
name));
435 if(_begin_block == _end_block)
446 if(_begin_block == _index_begin_block) {
454 ilog(
"Vacuuming pruned log ${n}", (
"n",
name));
456 size_t copy_from_pos = get_pos(_begin_block);
457 size_t copy_to_pos = 0;
459 const size_t offset_bytes = copy_from_pos - copy_to_pos;
460 const size_t offset_blocks = _begin_block - _index_begin_block;
462 size_t copy_sz = log.tellp() - copy_from_pos -
sizeof(
uint32_t);
463 const uint32_t num_blocks_in_log = _end_block - _begin_block;
465 std::vector<char> buff;
466 buff.resize(4*1024*1024);
468 auto tick = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now());
470 const size_t copy_this_round = std::min(buff.size(), copy_sz);
471 log.seek(copy_from_pos);
472 log.read(buff.data(), copy_this_round);
473 log.punch_hole(copy_to_pos, copy_from_pos+copy_this_round);
474 log.seek(copy_to_pos);
475 log.write(buff.data(), copy_this_round);
477 copy_from_pos += copy_this_round;
478 copy_to_pos += copy_this_round;
479 copy_sz -= copy_this_round;
481 const auto tock = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now());
482 if(tick < tock - std::chrono::seconds(5)) {
483 ilog(
"Vacuuming pruned log ${n}, ${b} bytes remaining", (
"b", copy_sz)(
"n",
name));
492 boost::interprocess::mapped_region index_mapped(index, boost::interprocess::read_write);
495 for(
uint32_t new_block_num = 0; new_block_num < num_blocks_in_log; ++new_block_num) {
496 const uint64_t new_pos = index_ptr[new_block_num + offset_blocks] - offset_bytes;
497 index_ptr[new_block_num] = new_pos;
499 if(new_block_num + 1 != num_blocks_in_log)
500 log.seek(index_ptr[new_block_num + offset_blocks + 1] - offset_bytes -
sizeof(
uint64_t));
503 log.write((
char*)&new_pos,
sizeof(new_pos));
508 _index_begin_block = _begin_block;
509 ilog(
"Vacuum of pruned log ${n} complete",(
"n",
name));
provides information about where and when a log message was generated.
chain::block_id_type get_block_id(uint32_t block_num)
void read_header(state_history_log_header &header, bool assert_version=true)
state_history_log(const char *const name, std::string log_filename, std::string index_filename, std::optional< state_history_log_prune_config > prune_conf=std::optional< state_history_log_prune_config >())
void write_entry(state_history_log_header header, const chain::block_id_type &prev_id, F write_payload)
void write_header(const state_history_log_header &header)
fc::cfile & get_entry(uint32_t block_num, state_history_log_header &header)