Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
store_provider.cpp
Go to the documentation of this file.
2
5
6namespace {
7 static constexpr uint32_t _current_version = 1;
8 static constexpr const char* _trace_prefix = "trace_";
9 static constexpr const char* _trace_index_prefix = "trace_index_";
10 static constexpr const char* _trace_trx_id_prefix = "trace_trx_id_";
11 static constexpr const char* _trace_ext = ".log";
12 static constexpr const char* _compressed_trace_ext = ".clog";
13 static constexpr int _max_filename_size = std::char_traits<char>::length(_trace_index_prefix) + 10 + 1 + 10 + std::char_traits<char>::length(_compressed_trace_ext) + 1; // "trace_index_" + 10-digits + '-' + 10-digits + ".clog" + null-char
14
15 std::string make_filename(const char* slice_prefix, const char* slice_ext, uint32_t slice_number, uint32_t slice_width) {
16 char filename[_max_filename_size] = {};
17 const uint32_t slice_start = slice_number * slice_width;
18 const int size_written = snprintf(filename, _max_filename_size, "%s%010d-%010d%s", slice_prefix, slice_start, (slice_start + slice_width), slice_ext);
19 // assert that _max_filename_size is correct
20 if ( size_written >= _max_filename_size ) {
21 const std::string max_size_str = std::to_string(_max_filename_size - 1); // dropping null character from size
22 const std::string size_written_str = std::to_string(size_written);
23 throw std::runtime_error("Could not write the complete filename. Anticipated the max filename characters to be: " +
24 max_size_str + " or less, but wrote: " + size_written_str + " characters. This is likely because the file "
25 "format was changed and the code was not updated accordingly. Filename created: " + filename);
26 }
27
28 return std::string(filename);
29 }
30}
31
32namespace sysio::trace_api {
33 namespace bfs = boost::filesystem;
34 store_provider::store_provider(const bfs::path& slice_dir, uint32_t stride_width, std::optional<uint32_t> minimum_irreversible_history_blocks,
35 std::optional<uint32_t> minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
36 : _slice_directory(slice_dir, stride_width, minimum_irreversible_history_blocks, minimum_uncompressed_irreversible_history_blocks, compression_seek_point_stride) {
37 }
38
39 template<typename BlockTrace>
40 void store_provider::append(const BlockTrace& bt) {
41 fc::cfile trace;
42 fc::cfile index;
43 const uint32_t slice_number = _slice_directory.slice_number(bt.number);
44 _slice_directory.find_or_create_slice_pair(slice_number, open_state::write, trace, index);
45 // storing as static_variant to allow adding other data types to the trace file in the future
46 const uint64_t offset = append_store(data_log_entry { bt }, trace);
47
48 auto be = metadata_log_entry { block_entry_v0 { .id = bt.id, .number = bt.number, .offset = offset }};
49 append_store(be, index);
50 }
51
54
56 fc::cfile index, trx_id;
57 const uint32_t slice_number = _slice_directory.slice_number(lib);
58 _slice_directory.find_or_create_index_slice(slice_number, open_state::write, index);
59 auto le = metadata_log_entry { lib_entry_v0 { .lib = lib }};
60 append_store(le, index);
61 _slice_directory.find_or_create_trx_id_slice(slice_number, open_state::write, trx_id);
62 append_store(le, trx_id);
64 }
65
67 fc::cfile trx_id_file;
68 const uint32_t slice_number = _slice_directory.slice_number(tt.block_num);
69 _slice_directory.find_or_create_trx_id_slice(slice_number, open_state::write, trx_id_file);
70 auto entry = metadata_log_entry { std::move(tt) };
71 append_store(entry, trx_id_file);
72 }
73
75 std::optional<uint64_t> trace_offset;
76 bool irreversible = false;
77 scan_metadata_log_from(block_height, 0, [&block_height, &trace_offset, &irreversible](const metadata_log_entry& e) -> bool {
78 if (std::holds_alternative<block_entry_v0>(e)) {
79 const auto& block = std::get<block_entry_v0>(e);
80 if (block.number == block_height) {
81 trace_offset = block.offset;
82 }
83 } else if (std::holds_alternative<lib_entry_v0>(e)) {
84 auto lib = std::get<lib_entry_v0>(e).lib;
85 if (lib >= block_height) {
86 irreversible = true;
87 return false;
88 }
89 }
90 return true;
91 }, yield);
92 if (!trace_offset) {
93 return get_block_t{};
94 }
95 std::optional<data_log_entry> entry = read_data_log(block_height, *trace_offset);
96 if (!entry) {
97 return get_block_t{};
98 }
99 return std::make_tuple( entry.value(), irreversible );
100 }
101
102 get_block_n store_provider::get_trx_block_number(const chain::transaction_id_type& trx_id, std::optional<uint32_t> minimum_irreversible_history_blocks, const yield_function& yield) {
103 fc::cfile trx_id_file;
104 int32_t slice_number;
105 if (minimum_irreversible_history_blocks) {
106 slice_number = _slice_directory.slice_number(*minimum_irreversible_history_blocks);
107 } else {
108 slice_number = 0;
109 }
110
111 uint32_t trx_block_num = 0; // number of the block that contains the target trx
112 uint32_t trx_entries = 0; // number of entries that contain the target trx
113 while (true){
114 const bool found = _slice_directory.find_trx_id_slice(slice_number, open_state::read, trx_id_file);
115 if( !found )
116 break; // traversed all slices
117
118 metadata_log_entry entry;
119 auto ds = trx_id_file.create_datastream();
120 const uint64_t end = file_size(trx_id_file.get_file_path());
121 uint64_t offset = trx_id_file.tellp();
122 while (offset < end) {
123 yield();
124 fc::raw::unpack(ds, entry);
125 if (std::holds_alternative<block_trxs_entry>(entry)) {
126 const auto& trxs_entry = std::get<block_trxs_entry>(entry);
127 for (auto i = 0U; i < trxs_entry.ids.size(); ++i) {
128 if (trxs_entry.ids[i] == trx_id) {
129 trx_entries++;
130 trx_block_num = trxs_entry.block_num;
131 }
132 }
133 } else if (std::holds_alternative<lib_entry_v0>(entry)) {
134 auto lib = std::get<lib_entry_v0>(entry).lib;
135 if (trx_entries > 0 && lib >= trx_block_num) {
136 return trx_block_num;
137 }
138 } else {
139 FC_ASSERT( false, "unpacked data should be a block_trxs_entry or a lib_entry_v0" );;
140 }
141 offset = trx_id_file.tellp();
142 }
143 slice_number++;
144 }
145
146 // transaction's block is not irreversible
147 if (trx_entries > 0)
148 return trx_block_num;
149
150 return get_block_n{};
151 }
152
153 slice_directory::slice_directory(const bfs::path& slice_dir, uint32_t width, std::optional<uint32_t> minimum_irreversible_history_blocks, std::optional<uint32_t> minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
154 : _slice_dir(slice_dir)
155 , _width(width)
156 , _minimum_irreversible_history_blocks(minimum_irreversible_history_blocks)
157 , _minimum_uncompressed_irreversible_history_blocks(minimum_uncompressed_irreversible_history_blocks)
158 , _compression_seek_point_stride(compression_seek_point_stride)
159 , _best_known_lib(0) {
160 if (!exists(_slice_dir)) {
161 bfs::create_directories(slice_dir);
162 }
163 }
164
166 const bool found = find_index_slice(slice_number, state, index_file);
167 if( !found ) {
168 create_new_index_slice_file(index_file);
169 }
170 return found;
171 }
172
173 bool slice_directory::find_index_slice(uint32_t slice_number, open_state state, fc::cfile& index_file, bool open_file) const {
174 const bool found = find_slice(_trace_index_prefix, slice_number, index_file, open_file);
175 if( !found || !open_file ) {
176 return found;
177 }
178
179 validate_existing_index_slice_file(index_file, state);
180 return true;
181 }
182
183 void slice_directory::create_new_index_slice_file(fc::cfile& index_file) const {
185 index_header h { .version = _current_version };
186 append_store(h, index_file);
187 }
188
189 void slice_directory::validate_existing_index_slice_file(fc::cfile& index_file, open_state state) const {
190 const auto header = extract_store<index_header>(index_file);
191 if (header.version != _current_version) {
192 throw old_slice_version("Old slice file with version: " + std::to_string(header.version) +
193 " is in directory, only supporting version: " + std::to_string(_current_version));
194 }
195
196 if( state == open_state::write ) {
197 index_file.seek_end(0);
198 }
199 }
200
202 const bool found = find_trace_slice(slice_number, state, trace_file);
203
204 if( !found ) {
206 }
207
208 return found;
209 }
210
211 bool slice_directory::find_trace_slice(uint32_t slice_number, open_state state, fc::cfile& trace_file, bool open_file) const {
212 const bool found = find_slice(_trace_prefix, slice_number, trace_file, open_file);
213
214 if( !found || !open_file ) {
215 return found;
216 }
217
218 if( state == open_state::write ) {
219 trace_file.seek_end(0);
220 }
221 else {
222 trace_file.seek(0); // ensure we are at the start of the file
223 }
224 return true;
225 }
226
227 std::optional<compressed_file> slice_directory::find_compressed_trace_slice(uint32_t slice_number, bool open_file ) const {
228 auto filename = make_filename(_trace_prefix, _compressed_trace_ext, slice_number, _width);
229 const path slice_path = _slice_dir / filename;
230 const bool file_exists = exists(slice_path);
231
232 if (file_exists) {
233 auto result = compressed_file(slice_path);
234 if (open_file) {
235 result.open();
236 }
237
238 return std::move(result);
239 } else {
240 return {};
241 }
242 }
243
244 bool slice_directory::find_slice(const char* slice_prefix, uint32_t slice_number, fc::cfile& slice_file, bool open_file) const {
245 auto filename = make_filename(slice_prefix, _trace_ext, slice_number, _width);
246 const path slice_path = _slice_dir / filename;
247 slice_file.set_file_path(slice_path);
248
249 const bool file_exists = exists(slice_path);
250 if( !file_exists || !open_file ) {
251 return file_exists;
252 }
253
255 // TODO: this is a temporary fix until fc::cfile handles it internally. OSX and Linux differ on the read offset
256 // when opening in "ab+" mode
257 slice_file.seek(0);
258 return true;
259 }
260
261
263 const bool trace_found = find_or_create_trace_slice(slice_number, state, trace);
264 const bool index_found = find_or_create_index_slice(slice_number, state, index);
265 if (trace_found != index_found) {
266 const std::string trace_status = trace_found ? "existing" : "new";
267 const std::string index_status = index_found ? "existing" : "new";
268 elog("Trace file is ${ts}, but it's metadata file is ${is}. This means the files are not consistent.", ("ts", trace_status)("is", index_status));
269 }
270 }
271
273 const bool found = find_trx_id_slice(slice_number, state, trx_id_file);
274 if( !found ) {
276 }
277 return found;
278 }
279
280 bool slice_directory::find_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile& trx_id_file, bool open_file) const {
281 const bool found = find_slice(_trace_trx_id_prefix, slice_number, trx_id_file, open_file);
282 if( !found || !open_file ) {
283 return found;
284 }
285 if( state == open_state::write ) {
286 trx_id_file.seek_end(0);
287 }
288 return true;
289 }
290
292 {
293 std::scoped_lock lock(_maintenance_mtx);
294 _best_known_lib = lib;
295 }
296 _maintenance_condition.notify_one();
297 }
298
300 _maintenance_thread = std::thread([this, log=std::move(log)](){
301 fc::set_os_thread_name( "trace-mx" );
302 uint32_t last_lib = 0;
303
304 while(true) {
305 std::unique_lock<std::mutex> lock(_maintenance_mtx);
306 while ( last_lib >= _best_known_lib && !_maintenance_shutdown ) {
307 _maintenance_condition.wait(lock);
308 }
309
310 if (_maintenance_shutdown) {
311 break;
312 }
313
314 uint32_t best_known_lib = _best_known_lib;
315 lock.unlock();
316
317 log(std::string("Waking up to handle lib: ") + std::to_string(best_known_lib));
318
319 if (last_lib < best_known_lib) {
320 try {
321 run_maintenance_tasks(best_known_lib, log);
322 last_lib = best_known_lib;
323 } FC_LOG_AND_DROP();
324 }
325 }
326 });
327 }
328
330 {
331 std::scoped_lock lock(_maintenance_mtx);
332 _maintenance_shutdown = true;
333 }
334 _maintenance_condition.notify_one();
335 _maintenance_thread.join();
336 }
337
338 template<typename F>
339 void slice_directory::process_irreversible_slice_range(uint32_t lib, uint32_t min_irreversible, std::optional<uint32_t>& lower_bound_slice, F&& f) {
340 const uint32_t lib_slice_number = slice_number( lib );
341 if (lib_slice_number < 1 || (lower_bound_slice && *lower_bound_slice >= lib_slice_number - 1))
342 return;
343
344 const int64_t upper_bound_block_number = static_cast<int64_t>(lib) - static_cast<int64_t>(min_irreversible) - _width;
345 if (upper_bound_block_number >= 0) {
346 uint32_t upper_bound_slice_num = slice_number(static_cast<uint32_t>(upper_bound_block_number));
347 while (!lower_bound_slice || *lower_bound_slice < upper_bound_slice_num) {
348 const uint32_t slice_to_process = lower_bound_slice ? *lower_bound_slice + 1 : 0;
349 f(slice_to_process);
350 lower_bound_slice = slice_to_process;
351 }
352 }
353 }
354
356 if (_minimum_irreversible_history_blocks) {
357 process_irreversible_slice_range(lib, *_minimum_irreversible_history_blocks, _last_cleaned_up_slice, [this, &log](uint32_t slice_to_clean){
358 fc::cfile trace;
359 fc::cfile index;
360 fc::cfile trx_id;
361
362 log(std::string("Attempting Prune of slice: ") + std::to_string(slice_to_clean));
363
364 // cleanup index first to reduce the likelihood of reader finding index, but not finding trace
365 const bool dont_open_file = false;
366 const bool index_found = find_index_slice(slice_to_clean, open_state::read, index, dont_open_file);
367 if (index_found) {
368 log(std::string("Removing: ") + index.get_file_path().generic_string());
369 bfs::remove(index.get_file_path());
370 }
371 const bool trace_found = find_trace_slice(slice_to_clean, open_state::read, trace, dont_open_file);
372 if (trace_found) {
373 log(std::string("Removing: ") + trace.get_file_path().generic_string());
374 bfs::remove(trace.get_file_path());
375 }
376 const bool trx_id_found = find_trx_id_slice(slice_to_clean, open_state::read, trx_id, dont_open_file);
377 if (trx_id_found) {
378 log(std::string("Removing: ") + trx_id.get_file_path().generic_string());
379 bfs::remove(trx_id.get_file_path());
380 }
381
382 auto ctrace = find_compressed_trace_slice(slice_to_clean, dont_open_file);
383 if (ctrace) {
384 log(std::string("Removing: ") + ctrace->get_file_path().generic_string());
385 bfs::remove(ctrace->get_file_path());
386 }
387 });
388 }
389
390 // Only process compression if its configured AND there is a range of irreversible blocks which would not also
391 // be deleted
392 if (_minimum_uncompressed_irreversible_history_blocks &&
393 (!_minimum_irreversible_history_blocks || *_minimum_uncompressed_irreversible_history_blocks < *_minimum_irreversible_history_blocks) )
394 {
395 process_irreversible_slice_range(lib, *_minimum_uncompressed_irreversible_history_blocks, _last_compressed_slice, [this, &log](uint32_t slice_to_compress){
396 fc::cfile trace;
397 const bool dont_open_file = false;
398 const bool trace_found = find_trace_slice(slice_to_compress, open_state::read, trace, dont_open_file);
399
400 log(std::string("Attempting compression of slice: ") + std::to_string(slice_to_compress));
401
402 if (trace_found) {
403 auto compressed_path = trace.get_file_path();
404 compressed_path.replace_extension(_compressed_trace_ext);
405
406 log(std::string("Compressing: ") + trace.get_file_path().generic_string());
407 compressed_file::process(trace.get_file_path(), compressed_path.generic_string(), _compression_seek_point_stride);
408
409 // after compression is complete, delete the old uncompressed file
410 log(std::string("Removing: ") + trace.get_file_path().generic_string());
411 bfs::remove(trace.get_file_path());
412 }
413 });
414 }
415 }
416}
void seek_end(long loc)
Definition cfile.hpp:96
void seek(long loc)
Definition cfile.hpp:87
static constexpr const char * create_or_update_rw_mode
Definition cfile.hpp:57
cfile_datastream create_datastream()
Definition cfile.hpp:249
void open(const char *mode)
Definition cfile.hpp:65
fc::path get_file_path() const
Definition cfile.hpp:41
size_t tellp() const
Definition cfile.hpp:79
void set_file_path(fc::path file_path)
Definition cfile.hpp:37
wraps boost::filesystem::path to provide platform independent path manipulation.
std::string generic_string() const
void replace_extension(const fc::path &e)
static bool process(const fc::path &input_path, const fc::path &output_path, size_t seek_point_stride)
bool find_or_create_index_slice(uint32_t slice_number, open_state state, fc::cfile &index_file) const
bool find_trace_slice(uint32_t slice_number, open_state state, fc::cfile &trace_file, bool open_file=true) const
bool find_or_create_trace_slice(uint32_t slice_number, open_state state, fc::cfile &trace_file) const
uint32_t slice_number(uint32_t block_height) const
std::optional< compressed_file > find_compressed_trace_slice(uint32_t slice_number, bool open_file=true) const
void start_maintenance_thread(log_handler log)
void run_maintenance_tasks(uint32_t lib, const log_handler &log)
slice_directory(const boost::filesystem::path &slice_dir, uint32_t width, std::optional< uint32_t > minimum_irreversible_history_blocks, std::optional< uint32_t > minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
bool find_index_slice(uint32_t slice_number, open_state state, fc::cfile &index_file, bool open_file=true) const
bool find_or_create_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile &trx_id_file) const
void find_or_create_slice_pair(uint32_t slice_number, open_state state, fc::cfile &trace, fc::cfile &index)
bool find_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile &trx_id_file, bool open_file=true) const
void append_trx_ids(block_trxs_entry tt)
uint64_t scan_metadata_log_from(uint32_t block_height, uint64_t offset, Fn &&fn, const yield_function &yield)
get_block_n get_trx_block_number(const chain::transaction_id_type &trx_id, std::optional< uint32_t > minimum_irreversible_history_blocks, const yield_function &yield={})
void append(const BlockTrace &bt)
get_block_t get_block(uint32_t block_height, const yield_function &yield={})
store_provider(const boost::filesystem::path &slice_dir, uint32_t stride_width, std::optional< uint32_t > minimum_irreversible_history_blocks, std::optional< uint32_t > minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
std::optional< data_log_entry > read_data_log(uint32_t block_height, uint64_t offset)
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
#define FC_LOG_AND_DROP(...)
#define elog(FORMAT,...)
Definition logger.hpp:130
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
bool exists(const path &p)
uint64_t file_size(const path &p)
void set_os_thread_name(const string &name)
std::optional< std::tuple< data_log_entry, bool > > get_block_t
Definition common.hpp:49
std::variant< block_entry_v0, lib_entry_v0, block_trxs_entry > metadata_log_entry
std::variant< block_trace_v0, block_trace_v1, block_trace_v2 > data_log_entry
Definition data_log.hpp:9
std::optional< uint32_t > get_block_n
Definition common.hpp:51
signed __int64 int64_t
Definition stdint.h:135
unsigned int uint32_t
Definition stdint.h:126
signed int int32_t
Definition stdint.h:123
unsigned __int64 uint64_t
Definition stdint.h:136
Definition trace.hpp:93
uint32_t block_num
Definition trace.hpp:95
slice_directory::open_state open_state
Definition dtoa.c:306
void bt(const Operand &op, const Reg &reg)
void lock()