Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
compressed_file.cpp
Go to the documentation of this file.
2
3#include <zlib.h>
4
5namespace {
6 using seek_point_entry = std::tuple<uint64_t, uint64_t>;
7 constexpr size_t expected_seek_point_entry_size = 16;
8
9 using seek_point_count_type = uint16_t;
10 constexpr size_t expected_seek_point_count_size = 2;
11
12 constexpr int raw_zlib_window_bits = -15;
13
14 // These are hard-coded expectations in the written file format
15 //
16 static_assert(sizeof(seek_point_entry) == expected_seek_point_entry_size, "unexpected size for seek point");
17 static_assert(sizeof(seek_point_count_type) == expected_seek_point_count_size, "Unexpected size for seek point count");
18}
19
20namespace sysio::trace_api {
21
23 static constexpr size_t read_buffer_size = 4*1024;
24 static constexpr size_t compressed_buffer_size = 4*1024;
25
27 {
28 if (initialized) {
29 inflateEnd(&strm);
30 initialized = false;
31 }
32 }
33
34 void read( char* d, size_t n, fc::cfile& file )
35 {
36 if (!initialized) {
37 if (Z_OK != inflateInit2(&strm, raw_zlib_window_bits)) {
38 throw std::runtime_error("failed to initialize decompression");
39 }
40
42 strm.avail_in = 0;
43 initialized = true;
44 }
45
46 size_t written = 0;
47
48 // consume the left over from the last read if there is any
49 if (remaining_read_buffer > 0) {
50 auto to_read = std::min(remaining_read_buffer, n);
51 std::memcpy(d, read_buffer.data(), to_read );
52 remaining_read_buffer -= to_read;
53 written += to_read;
54
55 if ( remaining_read_buffer > 0 ) {
56 std::memmove(read_buffer.data(), read_buffer.data() + to_read, remaining_read_buffer);
57 return;
58 }
59 }
60
61
62 // decompress more chunks
63 while (written < n) {
64 if ( strm.avail_in == 0 ) {
65 size_t remaining = file_size - file.tellp();
66 size_t to_read = std::min((size_t)compressed_buffer.size(), remaining);
67 file.read(reinterpret_cast<char*>(compressed_buffer.data()), to_read);
68 strm.avail_in = to_read;
69 strm.next_in = compressed_buffer.data();
70 }
71
72 do {
73 strm.avail_out = read_buffer.size();
74 strm.next_out = read_buffer.data();
75 auto ret = inflate(&strm, Z_NO_FLUSH);
76
77 if (ret == Z_NEED_DICT || ret == Z_DATA_ERROR || ret == Z_MEM_ERROR) {
78 throw compressed_file_error("Error decompressing: " + std::string(strm.msg));
79 }
80
81
82
83 auto bytes_decompressed = read_buffer.size() - strm.avail_out;
84 if (bytes_decompressed > 0) {
85 auto to_copy = std::min(bytes_decompressed, n - written);
86 std::memcpy(d + written, read_buffer.data(), to_copy);
87 written += to_copy;
88
89 if (bytes_decompressed > to_copy) {
90 // move remaining to the front of the buffer
91 std::memmove(read_buffer.data(), read_buffer.data() + to_copy, bytes_decompressed - to_copy);
92 remaining_read_buffer = bytes_decompressed - to_copy;
93 }
94 }
95
96 if (written < n && ret == Z_STREAM_END) {
97 throw std::ios_base::failure("Attempting to read past the end of a compressed file");
98 }
99
100 if (ret == Z_BUF_ERROR) {
101 // need more input
102 if (strm.avail_in != 0) {
103 throw compressed_file_error("Error decompressing cannot continue processing input");
104 }
105 break;
106 }
107
108 } while (strm.avail_out == 0 && written < n);
109 }
110 }
111
112 void seek( long loc, fc::cfile& file ) {
113 if (initialized) {
114 inflateEnd(&strm);
115 initialized = false;
116 }
117
118 long remaining = loc;
119
120 // read in the seek point map
121 file.seek_end(-expected_seek_point_count_size);
122 seek_point_count_type seek_point_count = 0;
123 file.read(reinterpret_cast<char*>(&seek_point_count), sizeof(seek_point_count));
124
125 if (seek_point_count > 0) {
126 int seek_map_size = sizeof(seek_point_entry) * seek_point_count;
127 file.seek_end(-expected_seek_point_count_size - seek_map_size);
128
129 std::vector<seek_point_entry> seek_point_map(seek_point_count);
130 file.read(reinterpret_cast<char*>(seek_point_map.data()), seek_point_map.size() * sizeof(seek_point_entry));
131
132 // seek to the neareast seek point
133 auto iter = std::lower_bound(seek_point_map.begin(), seek_point_map.end(), (uint64_t)loc, []( const auto& lhs, const auto& rhs ){
134 return std::get<0>(lhs) < rhs;
135 });
136
137 // special case when there is a seek point that is exact
138 if ( iter != seek_point_map.end() && std::get<0>(*iter) == loc ) {
139 file.seek(std::get<1>(*iter));
140 return;
141 }
142
143 // special case when this is before the first seek point
144 if ( iter == seek_point_map.begin() ) {
145 file.seek(0);
146 } else {
147 // if lower bound wasn't exact iter will be one past the seek point we need
148 const auto& seek_pt = *(iter - 1);
149 file.seek(std::get<1>(seek_pt));
150 remaining -= std::get<0>(seek_pt);
151 }
152 } else {
153 file.seek(0);
154 }
155
156 // read up to the expected offset
157 if (remaining > 0) {
158 auto pre_read_buffer = std::vector<char>(remaining);
159 read(pre_read_buffer.data(), pre_read_buffer.size(), file);
160 }
161 }
162
163 z_stream strm;
164 std::vector<uint8_t> compressed_buffer = std::vector<uint8_t>(compressed_buffer_size);
165 std::vector<uint8_t> read_buffer = std::vector<uint8_t>(read_buffer_size);
167 bool initialized = false;
168 size_t file_size = 0;
169};
170
172:file_path(std::move(file_path))
173,file_ptr(nullptr)
174,impl(std::make_unique<compressed_file_impl>())
175{
176 impl->file_size = fc::file_size(file_path);
177}
178
181
182void compressed_file::seek( long loc ) {
183 impl->seek(loc, *file_ptr);
184
185}
186
187void compressed_file::read( char* d, size_t n ) {
188 impl->read(d, n, *file_ptr);
189}
190
191// these are defaulted now that the opaque impl type is known
192//
195
196
197bool compressed_file::process( const fc::path& input_path, const fc::path& output_path, size_t seek_point_stride ) {
198 if (!fc::exists(input_path)) {
199 throw std::ios_base::failure(std::string("Attempting to create compressed_file from file that does not exist: ") + input_path.generic_string());
200 }
201
202 const size_t input_size = fc::file_size(input_path);
203 if (input_size == 0) {
204 throw std::ios_base::failure(std::string("Attempting to create compressed_file from file that is empty: ") + input_path.generic_string());
205 }
206
207 // subtract 1 to make sure that the truncated division will only create a seek point if there is at least one byte
208 // in the next stride. So, a file size of N and a stride >= N results in 0 seek points. N + 1 will have a seek
209 // point for the last byte as will XN + 1 which will create X seek points (the last of which is for the last byte)
210 // of the file
211 const auto seek_point_count = (input_size - 1) / seek_point_stride;
212 std::vector<seek_point_entry> seek_point_map(seek_point_count);
213
214 fc::cfile input_file;
215 input_file.set_file_path(input_path);
216 input_file.open("rb");
217
218 fc::cfile output_file;
219 output_file.set_file_path(output_path);
220 output_file.open("wb");
221
222 z_stream strm;
223 strm.zalloc = Z_NULL;
224 strm.zfree = Z_NULL;
225 strm.opaque = Z_NULL;
226
227 if (deflateInit2(&strm, Z_BEST_COMPRESSION, Z_DEFLATED, raw_zlib_window_bits, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
228 return false;
229 }
230
231 constexpr size_t buffer_size = 64*1024;
232 auto input_buffer = std::vector<uint8_t>(buffer_size);
233 auto output_buffer = std::vector<uint8_t>(buffer_size);
234
235 auto bytes_remaining_before_sync = seek_point_stride;
236 int next_sync_point = 0;
237
238 // process a single chunk of input completely,
239 // this may sometime loop multiple times if the compressor state combined with input data creates more than a
240 // single buffer's worth of data
241 //
242 auto process_chunk = [&]( size_t input_size, int mode ) {
243 strm.avail_in = input_size;
244 strm.next_in = input_buffer.data();
245
246 do {
247 strm.avail_out = output_buffer.size();
248 strm.next_out = output_buffer.data();
249 auto ret = deflate(&strm, mode);
250
251 const bool success = ret == Z_OK || (mode == Z_FINISH && ret == Z_STREAM_END);
252 if (!success) {
253 return ret;
254 }
255
256 output_file.write(reinterpret_cast<const char*>(output_buffer.data()), output_buffer.size() - strm.avail_out);
257 } while (strm.avail_out == 0);
258
259 return Z_OK;
260 };
261
262 size_t read_offset = 0;
263 while (read_offset < input_size) {
264 const auto bytes_remaining = input_size - read_offset;
265 const auto read_size = std::min({ buffer_size, bytes_remaining, bytes_remaining_before_sync });
266 input_file.read(reinterpret_cast<char*>(input_buffer.data()), read_size);
267
268 auto ret = process_chunk(read_size, Z_NO_FLUSH);
269 if (ret != Z_OK) {
270 throw compressed_file_error(std::string("deflate failed: ") + std::to_string(ret));
271 }
272 read_offset += read_size;
273
274 if (read_size == bytes_remaining ) {
275 // finish the file out by draining remaining output
276 ret = process_chunk(0, Z_FINISH);
277 if (ret != Z_OK) {
278 throw compressed_file_error(std::string("failed to finalize file compression: ") + std::to_string(ret));
279 }
280 } else if ( read_size == bytes_remaining_before_sync ) {
281 // create a sync point by flushing the compressor so a decompressor can start at this offset
282 ret = process_chunk(0, Z_FULL_FLUSH);
283 if (ret != Z_OK) {
284 throw compressed_file_error(std::string("failed to create sync point: ") + std::to_string(ret));
285 }
286
287 seek_point_map.at(next_sync_point++) = {read_offset, output_file.tellp()};
288
289 if (next_sync_point == seek_point_count) {
290 // if we are out of sync points, set this value one past the end (disabling it)
291 bytes_remaining_before_sync = input_size - read_offset + 1;
292 } else {
293 bytes_remaining_before_sync = seek_point_stride;
294 }
295 } else {
296 bytes_remaining_before_sync -= read_size;
297 }
298 }
299
300 deflateEnd(&strm);
301 input_file.close();
302
303 // write out the seek point table
304 if (seek_point_map.size() > 0) {
305 output_file.write(reinterpret_cast<const char*>(seek_point_map.data()), seek_point_map.size() * sizeof(seek_point_entry));
306 }
307
308 // write out the seek point count
309 output_file.write(reinterpret_cast<const char*>(&seek_point_count), sizeof(seek_point_count_type));
310
311 output_file.close();
312 return true;
313}
314
315}
void read(char *d, size_t n)
Definition cfile.hpp:114
void close()
Definition cfile.hpp:202
void open(const char *mode)
Definition cfile.hpp:65
size_t tellp() const
Definition cfile.hpp:79
void set_file_path(fc::path file_path)
Definition cfile.hpp:37
void write(const char *d, size_t n)
Definition cfile.hpp:127
wraps boost::filesystem::path to provide platform independent path manipulation.
std::string generic_string() const
compressed_file & operator=(compressed_file &&)
static bool process(const fc::path &input_path, const fc::path &output_path, size_t seek_point_stride)
bool exists(const path &p)
uint64_t file_size(const path &p)
Definition name.hpp:106
unsigned short uint16_t
Definition stdint.h:125
unsigned __int64 uint64_t
Definition stdint.h:136
void read(char *d, size_t n, fc::cfile &file)
void seek(long loc, fc::cfile &file)
static constexpr size_t compressed_buffer_size
CK_ULONG d
CK_RV ret