Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
store_provider.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <ios>
4#include <thread>
5#include <mutex>
6#include <condition_variable>
7#include <fc/io/cfile.hpp>
8#include <boost/filesystem.hpp>
9#include <fc/variant.hpp>
14
15namespace sysio::trace_api {
16 using namespace boost::filesystem;
17
18 class path_does_not_exist : public std::runtime_error {
19 public:
20 explicit path_does_not_exist(const char* what_arg)
21 :std::runtime_error(what_arg)
22 {}
23 explicit path_does_not_exist(const std::string& what_arg)
24 :std::runtime_error(what_arg)
25 {}
26 };
27
28 class old_slice_version : public std::runtime_error {
29 public:
30 explicit old_slice_version(const char* what_arg)
31 :std::runtime_error(what_arg)
32 {}
33 explicit old_slice_version(const std::string& what_arg)
34 :std::runtime_error(what_arg)
35 {}
36 };
37
38 class incompatible_slice_files : public std::runtime_error {
39 public:
40 explicit incompatible_slice_files(const char* what_arg)
41 :std::runtime_error(what_arg)
42 {}
43 explicit incompatible_slice_files(const std::string& what_arg)
44 :std::runtime_error(what_arg)
45 {}
46 };
47
48 class malformed_slice_file : public std::runtime_error {
49 public:
50 explicit malformed_slice_file(const char* what_arg)
51 :std::runtime_error(what_arg)
52 {}
53 explicit malformed_slice_file(const std::string& what_arg)
54 :std::runtime_error(what_arg)
55 {}
56 };
57
65 template<typename DataEntry, typename File>
66 static uint64_t append_store(const DataEntry &entry, File &file) {
67 auto data = fc::raw::pack(entry);
68 const auto offset = file.tellp();
69 file.write(data.data(), data.size());
70 file.flush();
71 file.sync();
72 return offset;
73 }
74
81 template<typename DataEntry, typename File>
82 static DataEntry extract_store( File& file ) {
83 DataEntry entry;
84 auto ds = file.create_datastream();
85 fc::raw::unpack(ds, entry);
86 return entry;
87 }
88
89
90 class store_provider;
91
97 public:
101
102 enum class open_state { read /*read from front to back*/, write /*write to end of file*/ };
103 slice_directory(const boost::filesystem::path& slice_dir, uint32_t width, std::optional<uint32_t> minimum_irreversible_history_blocks,
104 std::optional<uint32_t> minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride);
105
112 uint32_t slice_number(uint32_t block_height) const {
113 return block_height / _width;
114 }
115
126
138 bool find_index_slice(uint32_t slice_number, open_state state, fc::cfile& index_file, bool open_file = true) const;
139
150
162 bool find_trace_slice(uint32_t slice_number, open_state state, fc::cfile& trace_file, bool open_file = true) const;
163
173 std::optional<compressed_file> find_compressed_trace_slice(uint32_t slice_number, bool open_file = true) const;
174
184
194
205 bool find_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile& trx_id_file, bool open_file = true) const;
206
211 void set_lib(uint32_t lib);
212
217
222
229 void run_maintenance_tasks(uint32_t lib, const log_handler& log);
230
231 private:
232 // returns true if slice is found, slice_file will always be set to the appropriate path for
233 // the slice_prefix and slice_number, but will only be opened if found
234 bool find_slice(const char* slice_prefix, uint32_t slice_number, fc::cfile& slice_file, bool open_file) const;
235
236 // take an index file that is initialized to a file and open it and write its header
237 void create_new_index_slice_file(fc::cfile& index_file) const;
238
239 // take an open index slice file and verify its header is valid and prepare the file to be appended to (or read from)
240 void validate_existing_index_slice_file(fc::cfile& index_file, open_state state) const;
241
242 // helper for methods that process irreversible slice files
243 template<typename F>
244 void process_irreversible_slice_range(uint32_t lib, uint32_t upper_bound_block, std::optional<uint32_t>& lower_bound_slice, F&& f);
245
246 const boost::filesystem::path _slice_dir;
247 const uint32_t _width;
248 const std::optional<uint32_t> _minimum_irreversible_history_blocks;
249 std::optional<uint32_t> _last_cleaned_up_slice;
250 const std::optional<uint32_t> _minimum_uncompressed_irreversible_history_blocks;
251 std::optional<uint32_t> _last_compressed_slice;
252 const size_t _compression_seek_point_stride;
253
254 std::mutex _maintenance_mtx;
255 std::condition_variable _maintenance_condition;
256 std::thread _maintenance_thread;
257 bool _maintenance_shutdown{false};
258 uint32_t _best_known_lib{0};
259 };
260
265 public:
267
268 store_provider(const boost::filesystem::path& slice_dir, uint32_t stride_width, std::optional<uint32_t> minimum_irreversible_history_blocks,
269 std::optional<uint32_t> minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride);
270
271 template<typename BlockTrace>
272 void append(const BlockTrace& bt);
273 void append_lib(uint32_t lib);
275
282 get_block_t get_block(uint32_t block_height, const yield_function& yield= {});
283
284 get_block_n get_trx_block_number(const chain::transaction_id_type& trx_id, std::optional<uint32_t> minimum_irreversible_history_blocks, const yield_function& yield= {});
285
292
293 protected:
303 template<typename Fn>
304 uint64_t scan_metadata_log_from( uint32_t block_height, uint64_t offset, Fn&& fn, const yield_function& yield ) {
305 // ignoring offset
306 offset = 0;
307 fc::cfile index;
308 const uint32_t slice_number = _slice_directory.slice_number(block_height);
309 const bool found = _slice_directory.find_index_slice(slice_number, open_state::read, index);
310 if( !found ) {
311 return 0;
312 }
313 const uint64_t end = file_size(index.get_file_path());
314 offset = index.tellp();
315 uint64_t last_read_offset = offset;
316 while (offset < end) {
317 yield();
318 const auto metadata = extract_store<metadata_log_entry>(index);
319 if(! fn(metadata)) {
320 break;
321 }
322 last_read_offset = offset;
323 offset = index.tellp();
324 }
325 return last_read_offset;
326 }
327
336 std::optional<data_log_entry> read_data_log( uint32_t block_height, uint64_t offset ) {
337 const uint32_t slice_number = _slice_directory.slice_number(block_height);
338
339 fc::cfile trace;
340 if( !_slice_directory.find_trace_slice(slice_number, open_state::read, trace) ) {
341 // attempt to read a compressed trace if one exists
342 std::optional<compressed_file> ctrace = _slice_directory.find_compressed_trace_slice(slice_number);
343 if (ctrace) {
344 ctrace->seek(offset);
345 return extract_store<data_log_entry>(*ctrace);
346 }
347
348 const std::string offset_str = boost::lexical_cast<std::string>(offset);
349 const std::string bh_str = boost::lexical_cast<std::string>(block_height);
350 throw malformed_slice_file("Requested offset: " + offset_str + " to retrieve block number: " + bh_str + " but this trace file is new, so there are no traces present.");
351 }
352 const uint64_t end = file_size(trace.get_file_path());
353 if( offset >= end ) {
354 const std::string offset_str = boost::lexical_cast<std::string>(offset);
355 const std::string bh_str = boost::lexical_cast<std::string>(block_height);
356 const std::string end_str = boost::lexical_cast<std::string>(end);
357 throw malformed_slice_file("Requested offset: " + offset_str + " to retrieve block number: " + bh_str + " but this trace file only goes to offset: " + end_str);
358 }
359 trace.seek(offset);
360 return extract_store<data_log_entry>(trace);
361 }
362
369
377
379 };
380
381}
382
void seek(long loc)
Definition cfile.hpp:87
fc::path get_file_path() const
Definition cfile.hpp:41
incompatible_slice_files(const std::string &what_arg)
malformed_slice_file(const std::string &what_arg)
old_slice_version(const std::string &what_arg)
old_slice_version(const char *what_arg)
path_does_not_exist(const std::string &what_arg)
bool find_or_create_index_slice(uint32_t slice_number, open_state state, fc::cfile &index_file) const
bool find_trace_slice(uint32_t slice_number, open_state state, fc::cfile &trace_file, bool open_file=true) const
bool find_or_create_trace_slice(uint32_t slice_number, open_state state, fc::cfile &trace_file) const
uint32_t slice_number(uint32_t block_height) const
std::optional< compressed_file > find_compressed_trace_slice(uint32_t slice_number, bool open_file=true) const
void start_maintenance_thread(log_handler log)
void run_maintenance_tasks(uint32_t lib, const log_handler &log)
slice_directory(const boost::filesystem::path &slice_dir, uint32_t width, std::optional< uint32_t > minimum_irreversible_history_blocks, std::optional< uint32_t > minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
bool find_index_slice(uint32_t slice_number, open_state state, fc::cfile &index_file, bool open_file=true) const
bool find_or_create_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile &trx_id_file) const
void find_or_create_slice_pair(uint32_t slice_number, open_state state, fc::cfile &trace, fc::cfile &index)
bool find_trx_id_slice(uint32_t slice_number, open_state state, fc::cfile &trx_id_file, bool open_file=true) const
void append_trx_ids(block_trxs_entry tt)
uint64_t scan_metadata_log_from(uint32_t block_height, uint64_t offset, Fn &&fn, const yield_function &yield)
get_block_n get_trx_block_number(const chain::transaction_id_type &trx_id, std::optional< uint32_t > minimum_irreversible_history_blocks, const yield_function &yield={})
void append(const BlockTrace &bt)
void initialize_new_index_slice_file(fc::cfile &index)
get_block_t get_block(uint32_t block_height, const yield_function &yield={})
store_provider(const boost::filesystem::path &slice_dir, uint32_t stride_width, std::optional< uint32_t > minimum_irreversible_history_blocks, std::optional< uint32_t > minimum_uncompressed_irreversible_history_blocks, size_t compression_seek_point_stride)
void validate_existing_index_slice_file(fc::cfile &index, open_state state)
void start_maintenance_thread(log_handler log)
std::optional< data_log_entry > read_data_log(uint32_t block_height, uint64_t offset)
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
void pack(Stream &s, const std::deque< T > &value)
Definition raw.hpp:531
uint64_t file_size(const path &p)
Definition name.hpp:106
std::optional< std::tuple< data_log_entry, bool > > get_block_t
Definition common.hpp:49
std::optional< uint32_t > get_block_n
Definition common.hpp:51
#define FC_REFLECT(TYPE, MEMBERS)
Specializes fc::reflector for TYPE.
Definition reflect.hpp:311
unsigned int uint32_t
Definition stdint.h:126
unsigned __int64 uint64_t
Definition stdint.h:136
Definition trace.hpp:93
slice_directory::open_state open_state
void bt(const Operand &op, const Reg &reg)