Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
code_cache.cpp
Go to the documentation of this file.
1#include <fc/log/logger_config.hpp> //set_thread_name
11
12#include <unistd.h>
13#include <sys/syscall.h>
14#include <sys/mman.h>
15#include <linux/memfd.h>
16
17#include "IR/Module.h"
18#include "IR/Validate.h"
19#include "WASM/WASM.h"
20#include "LLVMJIT.h"
21
22using namespace IR;
23using namespace Runtime;
24
25namespace sysio { namespace chain { namespace eosvmoc {
26
27static constexpr size_t header_offset = 512u;
28static constexpr size_t header_size = 512u;
29static constexpr size_t total_header_size = header_offset + header_size;
30static constexpr uint64_t header_id = 0x32434f4d56534f45ULL; //"SYSVMOC2" little endian
31
33 uint64_t id = header_id;
34 bool dirty = false;
36} __attribute__ ((packed));
37static constexpr size_t header_dirty_bit_offset_from_file_start = header_offset + offsetof(code_cache_header, dirty);
38static constexpr size_t descriptor_ptr_from_file_start = header_offset + offsetof(code_cache_header, serialized_descriptor_index);
39
40static_assert(sizeof(code_cache_header) <= header_size, "code_cache_header too big");
42code_cache_async::code_cache_async(const bfs::path data_dir, const eosvmoc::config& eosvmoc_config, const chainbase::database& db) :
43 code_cache_base(data_dir, eosvmoc_config, db),
44 _result_queue(eosvmoc_config.threads * 2),
45 _threads(eosvmoc_config.threads)
46{
47 FC_ASSERT(_threads, "SYS VM OC requires at least 1 compile thread");
48
49 wait_on_compile_monitor_message();
50
51 _monitor_reply_thread = std::thread([this]() {
52 fc::set_os_thread_name("oc-monitor");
53 _ctx.run();
54 });
55}
56
58 _compile_monitor_write_socket.shutdown(local::datagram_protocol::socket::shutdown_send);
59 _monitor_reply_thread.join();
60 consume_compile_thread_queue();
61}
62
63//remember again: wait_on_compile_monitor_message's callback is non-main thread!
64void code_cache_async::wait_on_compile_monitor_message() {
65 _compile_monitor_read_socket.async_wait(local::datagram_protocol::socket::wait_read, [this](auto ec) {
66 if(ec) {
67 _ctx.stop();
68 return;
69 }
70
71 auto [success, message, fds] = read_message_with_fds(_compile_monitor_read_socket);
72 if(!success || !std::holds_alternative<wasm_compilation_result_message>(message)) {
73 _ctx.stop();
74 return;
75 }
76
77 _result_queue.push(std::get<wasm_compilation_result_message>(message));
78
79 wait_on_compile_monitor_message();
80 });
81}
82
83
84//number processed, bytes available (only if number processed > 0)
85std::tuple<size_t, size_t> code_cache_async::consume_compile_thread_queue() {
86 size_t bytes_remaining = 0;
87 size_t gotsome = _result_queue.consume_all([&](const wasm_compilation_result_message& result) {
88 if(_outstanding_compiles_and_poison[result.code] == false) {
89 std::visit(overloaded {
90 [&](const code_descriptor& cd) {
91 _cache_index.push_front(cd);
92 },
93 [&](const compilation_result_unknownfailure&) {
94 wlog("code ${c} failed to tier-up with SYS VM OC", ("c", result.code.code_id));
95 _blacklist.emplace(result.code);
96 },
97 [&](const compilation_result_toofull&) {
99 }
100 }, result.result);
101 }
102 _outstanding_compiles_and_poison.erase(result.code);
103 bytes_remaining = result.cache_free_bytes;
104 });
105
106 return {gotsome, bytes_remaining};
107}
108
109const code_descriptor* const code_cache_async::get_descriptor_for_code(const digest_type& code_id, const uint8_t& vm_version) {
110 //if there are any outstanding compiles, process the result queue now
112 auto [count_processed, bytes_remaining] = consume_compile_thread_queue();
113
114 if(count_processed)
115 check_eviction_threshold(bytes_remaining);
116
117 while(count_processed && _queued_compiles.size()) {
118 auto nextup = _queued_compiles.begin();
119
120 //it's not clear this check is required: if apply() was called for code then it existed in the code_index; and then
121 // if we got notification of it no longer existing we would have removed it from queued_compiles
122 const code_object* const codeobject = _db.find<code_object,by_code_hash>(boost::make_tuple(nextup->code_id, 0, nextup->vm_version));
123 if(codeobject) {
124 _outstanding_compiles_and_poison.emplace(*nextup, false);
125 std::vector<wrapped_fd> fds_to_pass;
126 fds_to_pass.emplace_back(memfd_for_bytearray(codeobject->code));
127 FC_ASSERT(write_message_with_fds(_compile_monitor_write_socket, compile_wasm_message{ *nextup }, fds_to_pass), "SYS VM failed to communicate to OOP manager");
128 --count_processed;
129 }
130 _queued_compiles.erase(nextup);
131 }
132 }
133
134 //check for entry in cache
135 code_cache_index::index<by_hash>::type::iterator it = _cache_index.get<by_hash>().find(boost::make_tuple(code_id, vm_version));
136 if(it != _cache_index.get<by_hash>().end()) {
137 _cache_index.relocate(_cache_index.begin(), _cache_index.project<0>(it));
138 return &*it;
139 }
140
141 const code_tuple ct = code_tuple{code_id, vm_version};
142
143 if(_blacklist.find(ct) != _blacklist.end())
144 return nullptr;
145 if(auto it = _outstanding_compiles_and_poison.find(ct); it != _outstanding_compiles_and_poison.end()) {
146 it->second = false;
147 return nullptr;
148 }
149 if(_queued_compiles.find(ct) != _queued_compiles.end())
150 return nullptr;
151
152 if(_outstanding_compiles_and_poison.size() >= _threads) {
153 _queued_compiles.emplace(ct);
154 return nullptr;
155 }
156
157 const code_object* const codeobject = _db.find<code_object,by_code_hash>(boost::make_tuple(code_id, 0, vm_version));
158 if(!codeobject) //should be impossible right?
159 return nullptr;
160
161 _outstanding_compiles_and_poison.emplace(ct, false);
162 std::vector<wrapped_fd> fds_to_pass;
163 fds_to_pass.emplace_back(memfd_for_bytearray(codeobject->code));
165 return nullptr;
166}
167
169 //it's exceedingly critical that we wait for the compile monitor to be done with all its work
170 //This is easy in the sync case
171 _compile_monitor_write_socket.shutdown(local::datagram_protocol::socket::shutdown_send);
172 auto [success, message, fds] = read_message_with_fds(_compile_monitor_read_socket);
173 if(success)
174 elog("unexpected response from SYS VM OC compile monitor during shutdown");
175}
176
178 //check for entry in cache
179 code_cache_index::index<by_hash>::type::iterator it = _cache_index.get<by_hash>().find(boost::make_tuple(code_id, vm_version));
180 if(it != _cache_index.get<by_hash>().end()) {
181 _cache_index.relocate(_cache_index.begin(), _cache_index.project<0>(it));
182 return &*it;
183 }
184
185 const code_object* const codeobject = _db.find<code_object,by_code_hash>(boost::make_tuple(code_id, 0, vm_version));
186 if(!codeobject) //should be impossible right?
187 return nullptr;
188
189 std::vector<wrapped_fd> fds_to_pass;
190 fds_to_pass.emplace_back(memfd_for_bytearray(codeobject->code));
191
192 write_message_with_fds(_compile_monitor_write_socket, compile_wasm_message{ {code_id, vm_version} }, fds_to_pass);
193 auto [success, message, fds] = read_message_with_fds(_compile_monitor_read_socket);
194 SYS_ASSERT(success, wasm_execution_error, "failed to read response from monitor process");
195 SYS_ASSERT(std::holds_alternative<wasm_compilation_result_message>(message), wasm_execution_error, "unexpected response from monitor process");
196
197 wasm_compilation_result_message result = std::get<wasm_compilation_result_message>(message);
198 SYS_ASSERT(std::holds_alternative<code_descriptor>(result.result), wasm_execution_error, "failed to compile wasm");
199
200 check_eviction_threshold(result.cache_free_bytes);
201
202 return &*_cache_index.push_front(std::move(std::get<code_descriptor>(result.result))).first;
203}
204
205code_cache_base::code_cache_base(const boost::filesystem::path data_dir, const eosvmoc::config& eosvmoc_config, const chainbase::database& db) :
206 _db(db),
207 _cache_file_path(data_dir/"code_cache.bin")
208{
209 static_assert(sizeof(allocator_t) <= header_offset, "header offset intersects with allocator");
210
211 bfs::create_directories(data_dir);
212
213 if(!bfs::exists(_cache_file_path)) {
214 SYS_ASSERT(eosvmoc_config.cache_size >= allocator_t::get_min_size(total_header_size), database_exception, "configured code cache size is too small");
215 std::ofstream ofs(_cache_file_path.generic_string(), std::ofstream::trunc);
216 SYS_ASSERT(ofs.good(), database_exception, "unable to create SYS VM Optimized Compiler code cache");
217 bfs::resize_file(_cache_file_path, eosvmoc_config.cache_size);
218 bip::file_mapping creation_mapping(_cache_file_path.generic_string().c_str(), bip::read_write);
219 bip::mapped_region creation_region(creation_mapping, bip::read_write);
220 new (creation_region.get_address()) allocator_t(eosvmoc_config.cache_size, total_header_size);
221 new ((char*)creation_region.get_address() + header_offset) code_cache_header;
222 }
223
224 code_cache_header cache_header;
225 {
226 char header_buff[total_header_size];
227 std::ifstream hs(_cache_file_path.generic_string(), std::ifstream::binary);
228 hs.read(header_buff, sizeof(header_buff));
229 SYS_ASSERT(!hs.fail(), bad_database_version_exception, "failed to read code cache header");
230 memcpy((char*)&cache_header, header_buff + header_offset, sizeof(cache_header));
231 }
232
233 SYS_ASSERT(cache_header.id == header_id, bad_database_version_exception, "existing SYS VM OC code cache not compatible with this version");
234 SYS_ASSERT(!cache_header.dirty, database_exception, "code cache is dirty");
235
237
238 auto existing_file_size = bfs::file_size(_cache_file_path);
239 if(eosvmoc_config.cache_size > existing_file_size) {
240 bfs::resize_file(_cache_file_path, eosvmoc_config.cache_size);
241
242 bip::file_mapping resize_mapping(_cache_file_path.generic_string().c_str(), bip::read_write);
243 bip::mapped_region resize_region(resize_mapping, bip::read_write);
244
245 allocator_t* resize_allocator = reinterpret_cast<allocator_t*>(resize_region.get_address());
246 resize_allocator->grow(eosvmoc_config.cache_size - existing_file_size);
247 }
248
249 _cache_fd = ::open(_cache_file_path.generic_string().c_str(), O_RDWR | O_CLOEXEC);
250 SYS_ASSERT(_cache_fd >= 0, database_exception, "failure to open code cache");
251
252 //load up the previous cache index
253 char* code_mapping = (char*)mmap(nullptr, eosvmoc_config.cache_size, PROT_READ|PROT_WRITE, MAP_SHARED, _cache_fd, 0);
254 SYS_ASSERT(code_mapping != MAP_FAILED, database_exception, "failure to mmap code cache");
255
256 allocator_t* allocator = reinterpret_cast<allocator_t*>(code_mapping);
257
258 if(cache_header.serialized_descriptor_index) {
259 fc::datastream<const char*> ds(code_mapping + cache_header.serialized_descriptor_index, eosvmoc_config.cache_size - cache_header.serialized_descriptor_index);
260 unsigned number_entries;
261 fc::raw::unpack(ds, number_entries);
262 for(unsigned i = 0; i < number_entries; ++i) {
264 fc::raw::unpack(ds, cd);
265 if(cd.codegen_version != current_codegen_version) {
266 allocator->deallocate(code_mapping + cd.code_begin);
267 allocator->deallocate(code_mapping + cd.initdata_begin);
268 continue;
269 }
270 _cache_index.push_back(std::move(cd));
271 }
272 allocator->deallocate(code_mapping + cache_header.serialized_descriptor_index);
273
274 ilog("SYS VM Optimized Compiler code cache loaded with ${c} entries; ${f} of ${t} bytes free", ("c", number_entries)("f", allocator->get_free_memory())("t", allocator->get_size()));
275 }
276 munmap(code_mapping, eosvmoc_config.cache_size);
277
278 _free_bytes_eviction_threshold = eosvmoc_config.cache_size * .1;
279
281
282 //okay, let's do this by the book: we're not allowed to write & read on different threads to the same asio socket. So create two fds
283 //representing the same unix socket. we'll read on one and write on the other
284 int duped = dup(compile_monitor_conn);
285 _compile_monitor_write_socket.assign(local::datagram_protocol(), duped);
286 _compile_monitor_read_socket.assign(local::datagram_protocol(), compile_monitor_conn.release());
287}
288
290 bip::file_mapping dirty_mapping(_cache_file_path.generic_string().c_str(), bip::read_write);
291 bip::mapped_region dirty_region(dirty_mapping, bip::read_write);
292
293 *((char*)dirty_region.get_address()+header_dirty_bit_offset_from_file_start) = dirty;
294 if(dirty_region.flush(0, 0, false) == false)
295 elog("Syncing code cache failed");
296}
297
298template <typename T>
300 unsigned entries = _cache_index.size();
301 fc::raw::pack(ds, entries);
302 for(const code_descriptor& cd : _cache_index)
303 fc::raw::pack(ds, cd);
304}
305
307 //reopen the code cache in our process
308 struct stat st;
309 if(fstat(_cache_fd, &st))
310 return;
311 char* code_mapping = (char*)mmap(nullptr, st.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, _cache_fd, 0);
312 if(code_mapping == MAP_FAILED)
313 return;
314
315 allocator_t* allocator = reinterpret_cast<allocator_t*>(code_mapping);
316
317 //serialize out the cache index
320 size_t sz = dssz.tellp();
321
322 char* p = nullptr;
323 while(_cache_index.size()) {
324 p = (char*)allocator->allocate(sz);
325 if(p != nullptr)
326 break;
327 //in theory, there could be too little free space avaiable to store the cache index
328 //try to free up some space
329 for(unsigned int i = 0; i < 25 && _cache_index.size(); ++i) {
330 allocator->deallocate(code_mapping + _cache_index.back().code_begin);
331 allocator->deallocate(code_mapping + _cache_index.back().initdata_begin);
332 _cache_index.pop_back();
333 }
334 }
335
336 if(p) {
337 fc::datastream<char*> ds(p, sz);
339
340 uintptr_t ptr_offset = p-code_mapping;
341 *((uintptr_t*)(code_mapping+descriptor_ptr_from_file_start)) = ptr_offset;
342 }
343 else
344 *((uintptr_t*)(code_mapping+descriptor_ptr_from_file_start)) = 0;
345
346 msync(code_mapping, allocator->get_size(), MS_SYNC);
347 munmap(code_mapping, allocator->get_size());
350
351}
352
353void code_cache_base::free_code(const digest_type& code_id, const uint8_t& vm_version) {
354 code_cache_index::index<by_hash>::type::iterator it = _cache_index.get<by_hash>().find(boost::make_tuple(code_id, vm_version));
355 if(it != _cache_index.get<by_hash>().end()) {
357 _cache_index.get<by_hash>().erase(it);
358 }
359
360 //if it's in the queued list, erase it
361 _queued_compiles.erase({code_id, vm_version});
362
363 //however, if it's currently being compiled there is no way to cancel the compile,
364 //so instead set a poison boolean that indicates not to insert the code in to the cache
365 //once the compile is complete
366 const std::unordered_map<code_tuple, bool>::iterator compiling_it = _outstanding_compiles_and_poison.find({code_id, vm_version});
367 if(compiling_it != _outstanding_compiles_and_poison.end())
368 compiling_it->second = true;
369}
370
372 evict_wasms_message evict_msg;
373 for(unsigned int i = 0; i < 25 && _cache_index.size() > 1; ++i) {
374 evict_msg.codes.emplace_back(_cache_index.back());
375 _cache_index.pop_back();
376 }
378}
379
381 if(free_bytes < _free_bytes_eviction_threshold)
383}
384
385}}}
const mie::Vuint & p
Definition bn.cpp:27
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
const ObjectType * find(CompatibleKey &&key) const
code_cache_async(const bfs::path data_dir, const eosvmoc::config &eosvmoc_config, const chainbase::database &db)
const code_descriptor *const get_descriptor_for_code(const digest_type &code_id, const uint8_t &vm_version)
void serialize_cache_index(fc::datastream< T > &ds)
local::datagram_protocol::socket _compile_monitor_write_socket
void check_eviction_threshold(size_t free_bytes)
local::datagram_protocol::socket _compile_monitor_read_socket
std::unordered_set< code_tuple > _queued_compiles
std::unordered_map< code_tuple, bool > _outstanding_compiles_and_poison
void free_code(const digest_type &code_id, const uint8_t &vm_version)
code_cache_base(const bfs::path data_dir, const eosvmoc::config &eosvmoc_config, const chainbase::database &db)
const chainbase::database & _db
const code_descriptor *const get_descriptor_for_code_sync(const digest_type &code_id, const uint8_t &vm_version)
uintptr_t serialized_descriptor_index
Definition code_cache.cpp:2
bool dirty
Definition code_cache.cpp:1
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
void close(T *e, websocketpp::connection_hdl hdl)
#define wlog(FORMAT,...)
Definition logger.hpp:124
#define ilog(FORMAT,...)
Definition logger.hpp:118
#define elog(FORMAT,...)
Definition logger.hpp:130
bip::allocator< T, pinnable_mapped_file::segment_manager > allocator
Definition chainbase.hpp:56
void unpack(Stream &s, std::deque< T > &value)
Definition raw.hpp:540
void pack(Stream &s, const std::deque< T > &value)
Definition raw.hpp:531
void set_os_thread_name(const string &name)
bip::rbtree_best_fit< bip::null_mutex_family, bip::offset_ptr< void >, alignof(std::max_align_t)> allocator_t
bool write_message_with_fds(boost::asio::local::datagram_protocol::socket &s, const eosvmoc_message &message, const std::vector< wrapped_fd > &fds=std::vector< wrapped_fd >())
std::tuple< bool, eosvmoc_message, std::vector< wrapped_fd > > read_message_with_fds(boost::asio::local::datagram_protocol::socket &s)
struct sysio::chain::eosvmoc::code_cache_header __attribute__((packed))
wrapped_fd get_connection_to_compile_monitor(int cache_fd)
wrapped_fd memfd_for_bytearray(const T &bytes)
key Invalid authority Invalid transaction Invalid block ID Invalid packed transaction Invalid chain ID Invalid symbol Signature type is not a currently activated type Block can not be found Unlinkable block Block does not guarantee concurrent execution without conflicts Block exhausted allowed resources Block is from the future Block is not signed by expected producer Block includes an ill formed protocol feature activation extension Block includes an ill formed additional block signature extension Error decompressing transaction Transaction should have at least one required authority Expired Transaction Invalid Reference Block Duplicate deferred transaction The transaction can not be found Transaction is too big Invalid transaction extension Transaction includes disallowed Transaction exceeded transient resource limit Account name already exists sysio_assert_message assertion failure Action can not be found Attempt to use unaccessible API Inline Action exceeds maximum size limit sysio_assert_code assertion failure uses restricted error code value action return value size too big database_exception
overloaded(Ts...) -> overloaded< Ts... >
_W64 unsigned int uintptr_t
Definition stdint.h:165
unsigned char uint8_t
Definition stdint.h:124
unsigned __int64 uint64_t
Definition stdint.h:136
std::vector< code_descriptor > codes
memcpy((char *) pInfo->slotDescription, s, l)