7 static constexpr uint32_t _current_version = 1;
8 static constexpr const char* _trace_prefix =
"trace_";
9 static constexpr const char* _trace_index_prefix =
"trace_index_";
10 static constexpr const char* _trace_trx_id_prefix =
"trace_trx_id_";
11 static constexpr const char* _trace_ext =
".log";
12 static constexpr const char* _compressed_trace_ext =
".clog";
13 static constexpr int _max_filename_size = std::char_traits<char>::length(_trace_index_prefix) + 10 + 1 + 10 + std::char_traits<char>::length(_compressed_trace_ext) + 1;
15 std::string make_filename(
const char* slice_prefix,
const char* slice_ext,
uint32_t slice_number,
uint32_t slice_width) {
16 char filename[_max_filename_size] = {};
17 const uint32_t slice_start = slice_number * slice_width;
18 const int size_written = snprintf(filename, _max_filename_size,
"%s%010d-%010d%s", slice_prefix, slice_start, (slice_start + slice_width), slice_ext);
20 if ( size_written >= _max_filename_size ) {
21 const std::string max_size_str = std::to_string(_max_filename_size - 1);
22 const std::string size_written_str = std::to_string(size_written);
23 throw std::runtime_error(
"Could not write the complete filename. Anticipated the max filename characters to be: " +
24 max_size_str +
" or less, but wrote: " + size_written_str +
" characters. This is likely because the file "
25 "format was changed and the code was not updated accordingly. Filename created: " + filename);
28 return std::string(filename);
35 std::optional<uint32_t> minimum_uncompressed_irreversible_history_blocks,
size_t compression_seek_point_stride)
36 : _slice_directory(slice_dir, stride_width, minimum_irreversible_history_blocks, minimum_uncompressed_irreversible_history_blocks, compression_seek_point_stride) {
39 template<
typename BlockTrace>
49 append_store(be, index);
60 append_store(le, index);
62 append_store(le, trx_id);
71 append_store(entry, trx_id_file);
75 std::optional<uint64_t> trace_offset;
76 bool irreversible =
false;
78 if (std::holds_alternative<block_entry_v0>(e)) {
79 const auto& block = std::get<block_entry_v0>(e);
80 if (block.number == block_height) {
81 trace_offset = block.offset;
83 }
else if (std::holds_alternative<lib_entry_v0>(e)) {
84 auto lib = std::get<lib_entry_v0>(e).lib;
85 if (lib >= block_height) {
95 std::optional<data_log_entry> entry =
read_data_log(block_height, *trace_offset);
99 return std::make_tuple( entry.value(), irreversible );
105 if (minimum_irreversible_history_blocks) {
122 while (offset < end) {
125 if (std::holds_alternative<block_trxs_entry>(entry)) {
126 const auto& trxs_entry = std::get<block_trxs_entry>(entry);
127 for (
auto i = 0
U; i < trxs_entry.ids.size(); ++i) {
128 if (trxs_entry.ids[i] == trx_id) {
130 trx_block_num = trxs_entry.block_num;
133 }
else if (std::holds_alternative<lib_entry_v0>(entry)) {
134 auto lib = std::get<lib_entry_v0>(entry).lib;
135 if (trx_entries > 0 && lib >= trx_block_num) {
136 return trx_block_num;
139 FC_ASSERT(
false,
"unpacked data should be a block_trxs_entry or a lib_entry_v0" );;
141 offset = trx_id_file.
tellp();
148 return trx_block_num;
153 slice_directory::slice_directory(
const bfs::path& slice_dir,
uint32_t width, std::optional<uint32_t> minimum_irreversible_history_blocks, std::optional<uint32_t> minimum_uncompressed_irreversible_history_blocks,
size_t compression_seek_point_stride)
154 : _slice_dir(slice_dir)
156 , _minimum_irreversible_history_blocks(minimum_irreversible_history_blocks)
157 , _minimum_uncompressed_irreversible_history_blocks(minimum_uncompressed_irreversible_history_blocks)
158 , _compression_seek_point_stride(compression_seek_point_stride)
159 , _best_known_lib(0) {
160 if (!
exists(_slice_dir)) {
161 bfs::create_directories(slice_dir);
168 create_new_index_slice_file(index_file);
174 const bool found = find_slice(_trace_index_prefix,
slice_number, index_file, open_file);
175 if( !found || !open_file ) {
179 validate_existing_index_slice_file(index_file,
state);
183 void slice_directory::create_new_index_slice_file(
fc::cfile& index_file)
const {
185 index_header h { .version = _current_version };
186 append_store(h, index_file);
190 const auto header = extract_store<index_header>(index_file);
191 if (header.version != _current_version) {
192 throw old_slice_version(
"Old slice file with version: " + std::to_string(header.version) +
193 " is in directory, only supporting version: " + std::to_string(_current_version));
196 if(
state == open_state::write ) {
212 const bool found = find_slice(_trace_prefix,
slice_number, trace_file, open_file);
214 if( !found || !open_file ) {
218 if(
state == open_state::write ) {
228 auto filename = make_filename(_trace_prefix, _compressed_trace_ext,
slice_number, _width);
229 const path slice_path = _slice_dir / filename;
230 const bool file_exists =
exists(slice_path);
238 return std::move(result);
244 bool slice_directory::find_slice(
const char* slice_prefix,
uint32_t slice_number,
fc::cfile& slice_file,
bool open_file)
const {
245 auto filename = make_filename(slice_prefix, _trace_ext,
slice_number, _width);
246 const path slice_path = _slice_dir / filename;
249 const bool file_exists =
exists(slice_path);
250 if( !file_exists || !open_file ) {
265 if (trace_found != index_found) {
266 const std::string trace_status = trace_found ?
"existing" :
"new";
267 const std::string index_status = index_found ?
"existing" :
"new";
268 elog(
"Trace file is ${ts}, but it's metadata file is ${is}. This means the files are not consistent.", (
"ts", trace_status)(
"is", index_status));
281 const bool found = find_slice(_trace_trx_id_prefix,
slice_number, trx_id_file, open_file);
282 if( !found || !open_file ) {
285 if(
state == open_state::write ) {
293 std::scoped_lock
lock(_maintenance_mtx);
294 _best_known_lib = lib;
296 _maintenance_condition.notify_one();
300 _maintenance_thread = std::thread([
this, log=std::move(log)](){
305 std::unique_lock<std::mutex>
lock(_maintenance_mtx);
306 while ( last_lib >= _best_known_lib && !_maintenance_shutdown ) {
307 _maintenance_condition.wait(lock);
310 if (_maintenance_shutdown) {
314 uint32_t best_known_lib = _best_known_lib;
317 log(std::string(
"Waking up to handle lib: ") + std::to_string(best_known_lib));
319 if (last_lib < best_known_lib) {
322 last_lib = best_known_lib;
331 std::scoped_lock
lock(_maintenance_mtx);
332 _maintenance_shutdown =
true;
334 _maintenance_condition.notify_one();
335 _maintenance_thread.join();
339 void slice_directory::process_irreversible_slice_range(
uint32_t lib,
uint32_t min_irreversible, std::optional<uint32_t>& lower_bound_slice, F&&
f) {
341 if (lib_slice_number < 1 || (lower_bound_slice && *lower_bound_slice >= lib_slice_number - 1))
344 const int64_t upper_bound_block_number =
static_cast<int64_t>(lib) -
static_cast<int64_t>(min_irreversible) - _width;
345 if (upper_bound_block_number >= 0) {
347 while (!lower_bound_slice || *lower_bound_slice < upper_bound_slice_num) {
348 const uint32_t slice_to_process = lower_bound_slice ? *lower_bound_slice + 1 : 0;
350 lower_bound_slice = slice_to_process;
356 if (_minimum_irreversible_history_blocks) {
357 process_irreversible_slice_range(lib, *_minimum_irreversible_history_blocks, _last_cleaned_up_slice, [
this, &log](
uint32_t slice_to_clean){
362 log(std::string(
"Attempting Prune of slice: ") + std::to_string(slice_to_clean));
365 const bool dont_open_file =
false;
366 const bool index_found =
find_index_slice(slice_to_clean, open_state::read, index, dont_open_file);
368 log(std::string(
"Removing: ") + index.get_file_path().generic_string());
369 bfs::remove(index.get_file_path());
371 const bool trace_found =
find_trace_slice(slice_to_clean, open_state::read, trace, dont_open_file);
376 const bool trx_id_found =
find_trx_id_slice(slice_to_clean, open_state::read, trx_id, dont_open_file);
384 log(std::string(
"Removing: ") + ctrace->get_file_path().generic_string());
385 bfs::remove(ctrace->get_file_path());
392 if (_minimum_uncompressed_irreversible_history_blocks &&
393 (!_minimum_irreversible_history_blocks || *_minimum_uncompressed_irreversible_history_blocks < *_minimum_irreversible_history_blocks) )
395 process_irreversible_slice_range(lib, *_minimum_uncompressed_irreversible_history_blocks, _last_compressed_slice, [
this, &log](
uint32_t slice_to_compress){
397 const bool dont_open_file =
false;
398 const bool trace_found =
find_trace_slice(slice_to_compress, open_state::read, trace, dont_open_file);
400 log(std::string(
"Attempting compression of slice: ") + std::to_string(slice_to_compress));
static constexpr const char * create_or_update_rw_mode
cfile_datastream create_datastream()
void open(const char *mode)
fc::path get_file_path() const
void set_file_path(fc::path file_path)
wraps boost::filesystem::path to provide platform independent path manipulation.
std::string generic_string() const
void replace_extension(const fc::path &e)
static bool process(const fc::path &input_path, const fc::path &output_path, size_t seek_point_stride)
bool find_or_create_index_slice(uint32_t slice_number, open_state state, fc::cfile &index_file) const
bool find_trace_slice(uint32_t slice_number, open_state state, fc::cfile &trace_file, bool open_file=true) const
bool find_or_create_trace_slice(uint32_t slice_number, open_state state, fc::cfile &trace_file) const
uint32_t slice_number(uint32_t block_height) const
std::optional< compressed_file > find_compressed_trace_slice(uint32_t slice_number, bool open_file=true) const
void start_maintenance_thread(log_handler log)
void run_maintenance_tasks(uint32_t lib, const log_handler &log)
slice_directory(const boost::filesystem::path &slice_dir, uint32_t width, std::optional< uint32_t > minimum_irreversible_history_blocks, std::optional< uint32_t > minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
bool find_index_slice(uint32_t slice_number, open_state state, fc::cfile &index_file, bool open_file=true) const
bool find_or_create_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile &trx_id_file) const
void find_or_create_slice_pair(uint32_t slice_number, open_state state, fc::cfile &trace, fc::cfile &index)
void set_lib(uint32_t lib)
void stop_maintenance_thread()
bool find_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile &trx_id_file, bool open_file=true) const
void append_trx_ids(block_trxs_entry tt)
uint64_t scan_metadata_log_from(uint32_t block_height, uint64_t offset, Fn &&fn, const yield_function &yield)
get_block_n get_trx_block_number(const chain::transaction_id_type &trx_id, std::optional< uint32_t > minimum_irreversible_history_blocks, const yield_function &yield={})
void append(const BlockTrace &bt)
get_block_t get_block(uint32_t block_height, const yield_function &yield={})
void append_lib(uint32_t lib)
store_provider(const boost::filesystem::path &slice_dir, uint32_t stride_width, std::optional< uint32_t > minimum_irreversible_history_blocks, std::optional< uint32_t > minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
slice_directory _slice_directory
std::optional< data_log_entry > read_data_log(uint32_t block_height, uint64_t offset)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
#define FC_LOG_AND_DROP(...)
void unpack(Stream &s, std::deque< T > &value)
bool exists(const path &p)
uint64_t file_size(const path &p)
void set_os_thread_name(const string &name)
std::optional< std::tuple< data_log_entry, bool > > get_block_t
std::variant< block_entry_v0, lib_entry_v0, block_trxs_entry > metadata_log_entry
std::variant< block_trace_v0, block_trace_v1, block_trace_v2 > data_log_entry
std::optional< uint32_t > get_block_n
unsigned __int64 uint64_t
slice_directory::open_state open_state
void bt(const Operand &op, const Reg ®)