Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
snapshot.cpp
Go to the documentation of this file.
3#include <fc/scoped_exit.hpp>
4
5namespace sysio { namespace chain {
6
8: snapshot(snapshot)
9{
10 snapshot.set("sections", fc::variants());
11 snapshot.set("version", current_snapshot_version );
12}
13
14void variant_snapshot_writer::write_start_section( const std::string& section_name ) {
15 current_rows.clear();
16 current_section_name = section_name;
17}
18
20 current_rows.emplace_back(row_writer.to_variant());
21}
22
24 snapshot["sections"].get_array().emplace_back(fc::mutable_variant_object()("name", std::move(current_section_name))("rows", std::move(current_rows)));
25}
26
30
32:snapshot(snapshot)
33,cur_section(nullptr)
34,cur_row(0)
35{
36}
37
39 SYS_ASSERT(snapshot.is_object(), snapshot_validation_exception,
40 "Variant snapshot is not an object");
41 const fc::variant_object& o = snapshot.get_object();
42
43 SYS_ASSERT(o.contains("version"), snapshot_validation_exception,
44 "Variant snapshot has no version");
45
46 const auto& version = o["version"];
47 SYS_ASSERT(version.is_integer(), snapshot_validation_exception,
48 "Variant snapshot version is not an integer");
49
50 SYS_ASSERT(version.as_uint64() == (uint64_t)current_snapshot_version, snapshot_validation_exception,
51 "Variant snapshot is an unsuppored version. Expected : ${expected}, Got: ${actual}",
52 ("expected", current_snapshot_version)("actual",o["version"].as_uint64()));
53
54 SYS_ASSERT(o.contains("sections"), snapshot_validation_exception,
55 "Variant snapshot has no sections");
56
57 const auto& sections = o["sections"];
58 SYS_ASSERT(sections.is_array(), snapshot_validation_exception, "Variant snapshot sections is not an array");
59
60 const auto& section_array = sections.get_array();
61 for( const auto& section: section_array ) {
62 SYS_ASSERT(section.is_object(), snapshot_validation_exception, "Variant snapshot section is not an object");
63
64 const auto& so = section.get_object();
65 SYS_ASSERT(so.contains("name"), snapshot_validation_exception,
66 "Variant snapshot section has no name");
67
68 SYS_ASSERT(so["name"].is_string(), snapshot_validation_exception,
69 "Variant snapshot section name is not a string");
70
71 SYS_ASSERT(so.contains("rows"), snapshot_validation_exception,
72 "Variant snapshot section has no rows");
73
74 SYS_ASSERT(so["rows"].is_array(), snapshot_validation_exception,
75 "Variant snapshot section rows is not an array");
76 }
77}
78
79bool variant_snapshot_reader::has_section( const string& section_name ) {
80 const auto& sections = snapshot["sections"].get_array();
81 for( const auto& section: sections ) {
82 if (section["name"].as_string() == section_name) {
83 return true;
84 }
85 }
86
87 return false;
88}
89
90void variant_snapshot_reader::set_section( const string& section_name ) {
91 const auto& sections = snapshot["sections"].get_array();
92 for( const auto& section: sections ) {
93 if (section["name"].as_string() == section_name) {
94 cur_section = &section.get_object();
95 return;
96 }
97 }
98
99 SYS_THROW(snapshot_exception, "Variant snapshot has no section named ${n}", ("n", section_name));
100}
101
103 const auto& rows = (*cur_section)["rows"].get_array();
104 row_reader.provide(rows.at(cur_row++));
105 return cur_row < rows.size();
106}
107
109 const auto& rows = (*cur_section)["rows"].get_array();
110 return rows.empty();
111}
112
114 cur_section = nullptr;
115 cur_row = 0;
116}
117
121
123:snapshot(snapshot)
124,header_pos(snapshot.tellp())
125,section_pos(-1)
126,row_count(0)
127{
128 // write magic number
129 auto totem = magic_number;
130 snapshot.write((char*)&totem, sizeof(totem));
131
132 // write version
133 auto version = current_snapshot_version;
134 snapshot.write((char*)&version, sizeof(version));
135}
136
137void ostream_snapshot_writer::write_start_section( const std::string& section_name )
138{
139 SYS_ASSERT(section_pos == std::streampos(-1), snapshot_exception, "Attempting to write a new section without closing the previous section");
140 section_pos = snapshot.tellp();
141 row_count = 0;
142
143 uint64_t placeholder = std::numeric_limits<uint64_t>::max();
144
145 // write a placeholder for the section size
146 snapshot.write((char*)&placeholder, sizeof(placeholder));
147
148 // write placeholder for row count
149 snapshot.write((char*)&placeholder, sizeof(placeholder));
150
151 // write the section name (null terminated)
152 snapshot.write(section_name.data(), section_name.size());
153 snapshot.put(0);
154}
155
157 auto restore = snapshot.tellp();
158 try {
159 row_writer.write(snapshot);
160 } catch (...) {
161 snapshot.seekp(restore);
162 throw;
163 }
164 row_count++;
165}
166
168 auto restore = snapshot.tellp();
169
170 uint64_t section_size = restore - section_pos - sizeof(uint64_t);
171
172 snapshot.seekp(section_pos);
173
174 // write a the section size
175 snapshot.write((char*)&section_size, sizeof(section_size));
176
177 // write the row count
178 snapshot.write((char*)&row_count, sizeof(row_count));
179
180 snapshot.seekp(restore);
181
182 section_pos = std::streampos(-1);
183 row_count = 0;
184}
185
187 uint64_t end_marker = std::numeric_limits<uint64_t>::max();
188
189 // write a placeholder for the section size
190 snapshot.write((char*)&end_marker, sizeof(end_marker));
191}
192
194:snapshot(snapshot)
195,header_pos(snapshot.tellg())
196,num_rows(0)
197,cur_row(0)
198{
199
200}
201
203 // make sure to restore the read pos
204 auto restore_pos = fc::make_scoped_exit([this,pos=snapshot.tellg(),ex=snapshot.exceptions()](){
205 snapshot.seekg(pos);
206 snapshot.exceptions(ex);
207 });
208
209 snapshot.exceptions(std::istream::failbit|std::istream::eofbit);
210
211 try {
212 // validate totem
213 auto expected_totem = ostream_snapshot_writer::magic_number;
214 decltype(expected_totem) actual_totem;
215 snapshot.read((char*)&actual_totem, sizeof(actual_totem));
216 SYS_ASSERT(actual_totem == expected_totem, snapshot_exception,
217 "Binary snapshot has unexpected magic number!");
218
219 // validate version
220 auto expected_version = current_snapshot_version;
221 decltype(expected_version) actual_version;
222 snapshot.read((char*)&actual_version, sizeof(actual_version));
223 SYS_ASSERT(actual_version == expected_version, snapshot_exception,
224 "Binary snapshot is an unsuppored version. Expected : ${expected}, Got: ${actual}",
225 ("expected", expected_version)("actual", actual_version));
226
227 while (validate_section()) {}
228 } catch( const std::exception& e ) { \
229 snapshot_exception fce(FC_LOG_MESSAGE( warn, "Binary snapshot validation threw IO exception (${what})",("what",e.what())));
230 throw fce;
231 }
232}
233
234bool istream_snapshot_reader::validate_section() const {
235 uint64_t section_size = 0;
236 snapshot.read((char*)&section_size,sizeof(section_size));
237
238 // stop when we see the end marker
239 if (section_size == std::numeric_limits<uint64_t>::max()) {
240 return false;
241 }
242
243 // seek past the section
244 snapshot.seekg(snapshot.tellg() + std::streamoff(section_size));
245
246 return true;
247}
248
249bool istream_snapshot_reader::has_section( const string& section_name ) {
250 auto restore_pos = fc::make_scoped_exit([this,pos=snapshot.tellg()](){
251 snapshot.seekg(pos);
252 });
253
254 const std::streamoff header_size = sizeof(ostream_snapshot_writer::magic_number) + sizeof(current_snapshot_version);
255
256 auto next_section_pos = header_pos + header_size;
257
258 while (true) {
259 snapshot.seekg(next_section_pos);
260 uint64_t section_size = 0;
261 snapshot.read((char*)&section_size,sizeof(section_size));
262 if (section_size == std::numeric_limits<uint64_t>::max()) {
263 break;
264 }
265
266 next_section_pos = snapshot.tellg() + std::streamoff(section_size);
267
268 uint64_t ignore = 0;
269 snapshot.read((char*)&ignore,sizeof(ignore));
270
271 bool match = true;
272 for(auto c : section_name) {
273 if(snapshot.get() != c) {
274 match = false;
275 break;
276 }
277 }
278
279 if (match && snapshot.get() == 0) {
280 return true;
281 }
282 }
283
284 return false;
285}
286
287void istream_snapshot_reader::set_section( const string& section_name ) {
288 auto restore_pos = fc::make_scoped_exit([this,pos=snapshot.tellg()](){
289 snapshot.seekg(pos);
290 });
291
292 const std::streamoff header_size = sizeof(ostream_snapshot_writer::magic_number) + sizeof(current_snapshot_version);
293
294 auto next_section_pos = header_pos + header_size;
295
296 while (true) {
297 snapshot.seekg(next_section_pos);
298 uint64_t section_size = 0;
299 snapshot.read((char*)&section_size,sizeof(section_size));
300 if (section_size == std::numeric_limits<uint64_t>::max()) {
301 break;
302 }
303
304 next_section_pos = snapshot.tellg() + std::streamoff(section_size);
305
306 uint64_t row_count = 0;
307 snapshot.read((char*)&row_count,sizeof(row_count));
308
309 bool match = true;
310 for(auto c : section_name) {
311 if(snapshot.get() != c) {
312 match = false;
313 break;
314 }
315 }
316
317 if (match && snapshot.get() == 0) {
318 cur_row = 0;
319 num_rows = row_count;
320
321 // leave the stream at the right point
322 restore_pos.cancel();
323 return;
324 }
325 }
326
327 SYS_THROW(snapshot_exception, "Binary snapshot has no section named ${n}", ("n", section_name));
328}
329
331 row_reader.provide(snapshot);
332 return ++cur_row < num_rows;
333}
334
336 return num_rows == 0;
337}
338
340 num_rows = 0;
341 cur_row = 0;
342}
343
345 snapshot.seekg( header_pos );
347}
348
353
355{
356 // no-op for structural details
357}
358
362
364 // no-op for structural details
365}
366
368 // no-op for structural details
369}
370
371}}
#define SYS_THROW(exc_type, FORMAT,...)
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
An order-preserving dictionary of variants.
An order-preserving dictionary of variants.
bool contains(const char *key) const
stores null, int64, uint64, double, bool, string, std::vector<variant>, and variant_object's.
Definition variant.hpp:191
void write_row(const detail::abstract_snapshot_row_writer &row_writer) override
Definition snapshot.cpp:359
integrity_hash_snapshot_writer(fc::sha256::encoder &enc)
Definition snapshot.cpp:349
void write_start_section(const std::string &section_name) override
Definition snapshot.cpp:354
void set_section(const string &section_name) override
Definition snapshot.cpp:287
istream_snapshot_reader(std::istream &snapshot)
Definition snapshot.cpp:193
bool read_row(detail::abstract_snapshot_row_reader &row_reader) override
Definition snapshot.cpp:330
bool has_section(const string &section_name) override
Definition snapshot.cpp:249
static const uint32_t magic_number
Definition snapshot.hpp:338
void write_row(const detail::abstract_snapshot_row_writer &row_writer) override
Definition snapshot.cpp:156
void write_start_section(const std::string &section_name) override
Definition snapshot.cpp:137
ostream_snapshot_writer(std::ostream &snapshot)
Definition snapshot.cpp:122
bool has_section(const string &section_name) override
Definition snapshot.cpp:79
void set_section(const string &section_name) override
Definition snapshot.cpp:90
variant_snapshot_reader(const fc::variant &snapshot)
Definition snapshot.cpp:31
bool read_row(detail::abstract_snapshot_row_reader &row_reader) override
Definition snapshot.cpp:102
void write_start_section(const std::string &section_name) override
Definition snapshot.cpp:14
void write_row(const detail::abstract_snapshot_row_writer &row_writer) override
Definition snapshot.cpp:19
variant_snapshot_writer(fc::mutable_variant_object &snapshot)
Definition snapshot.cpp:7
#define FC_LOG_MESSAGE(LOG_LEVEL, FORMAT,...)
A helper method for generating log messages.
scoped_exit< Callback > make_scoped_exit(Callback &&c)
std::vector< fc::variant > variants
Definition variant.hpp:173
unsigned __int64 uint64_t
Definition stdint.h:136
virtual void provide(std::istream &in) const =0
virtual void write(ostream_wrapper &out) const =0