Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
compile_monitor.cpp
Go to the documentation of this file.
1#include <signal.h>
2#include <sys/prctl.h>
3
9
11
12#include <boost/asio/local/datagram_protocol.hpp>
13#include <boost/signals2.hpp>
14
15namespace sysio { namespace chain { namespace eosvmoc {
16
17using namespace boost::asio;
18
19static size_t get_size_of_fd(int fd) {
20 struct stat st;
21 FC_ASSERT(fstat(fd, &st) == 0, "failed to get size of fd");
22 return st.st_size;
23}
24
25static void copy_memfd_contents_to_pointer(void* dst, int fd) {
26 struct stat st;
27 FC_ASSERT(fstat(fd, &st) == 0, "failed to get size of fd");
28 if(st.st_size == 0)
29 return;
30 void* contents = mmap(nullptr, st.st_size, PROT_READ, MAP_SHARED, fd, 0);
31 FC_ASSERT(contents != MAP_FAILED, "failed to map memfd file");
32 memcpy(dst, contents, st.st_size);
33 munmap(contents, st.st_size);
34}
35
37 compile_monitor_session(boost::asio::io_context& context, local::datagram_protocol::socket&& n, wrapped_fd&& c, wrapped_fd& t) :
38 _ctx(context),
39 _nodeop_instance_socket(std::move(n)),
40 _cache_fd(std::move(c)),
41 _trampoline_socket(t) {
42
43 struct stat st;
44 FC_ASSERT(fstat(_cache_fd, &st) == 0, "failed to stat cache fd");
45 _code_size = st.st_size;
46 _code_mapping = (char*)mmap(nullptr, _code_size, PROT_READ|PROT_WRITE, MAP_SHARED, _cache_fd, 0);
47 FC_ASSERT(_code_mapping != MAP_FAILED, "failed to mmap cache file");
48 _allocator = reinterpret_cast<allocator_t*>(_code_mapping);
49
51 }
52
54 munmap(_code_mapping, _code_size);
55 }
56
58 _nodeop_instance_socket.async_wait(local::datagram_protocol::socket::wait_read, [this](auto ec) {
59 if(ec) {
61 return;
62 }
63 auto [success, message, fds] = read_message_with_fds(_nodeop_instance_socket);
64 if(!success) {
66 return;
67 }
68 std::visit(overloaded {
69 [&, &fds=fds](const compile_wasm_message& compile) {
70 if(fds.size() != 1) {
72 return;
73 }
74 kick_compile_off(compile.code, std::move(fds[0]));
75 },
76 [&](const evict_wasms_message& evict) {
77 for(const code_descriptor& cd : evict.codes) {
78 _allocator->deallocate(_code_mapping + cd.code_begin);
79 _allocator->deallocate(_code_mapping + cd.initdata_begin);
80 }
81 },
82 [&](const auto&) {
83 //anything else is an error
85 return;
86 }
87 }, message);
88
90 });
91 }
92
93 void kick_compile_off(const code_tuple& code_id, wrapped_fd&& wasm_code) {
94 //prepare a requst to go out to the trampoline
95 int socks[2];
96 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
97 local::datagram_protocol::socket response_socket(_ctx);
98 response_socket.assign(local::datagram_protocol(), socks[0]);
99 std::vector<wrapped_fd> fds_pass_to_trampoline;
100 fds_pass_to_trampoline.emplace_back(socks[1]);
101 fds_pass_to_trampoline.emplace_back(std::move(wasm_code));
102
103 eosvmoc_message trampoline_compile_request = compile_wasm_message{code_id};
104 if(write_message_with_fds(_trampoline_socket, trampoline_compile_request, fds_pass_to_trampoline) == false) {
105 wasm_compilation_result_message reply{code_id, compilation_result_unknownfailure{}, _allocator->get_free_memory()};
106 write_message_with_fds(_nodeop_instance_socket, reply);
107 return;
108 }
109
110 current_compiles.emplace_front(code_id, std::move(response_socket));
111 read_message_from_compile_task(current_compiles.begin());
112 }
113
114 void read_message_from_compile_task(std::list<std::tuple<code_tuple, local::datagram_protocol::socket>>::iterator current_compile_it) {
115 auto& [code, socket] = *current_compile_it;
116 socket.async_wait(local::datagram_protocol::socket::wait_read, [this, current_compile_it](auto ec) {
117 //at this point we only expect 1 of 2 things to happen: we either get a reply (success), or we get no reply (failure)
118 auto& [code, socket] = *current_compile_it;
119 auto [success, message, fds] = read_message_with_fds(socket);
120
121 wasm_compilation_result_message reply{code, compilation_result_unknownfailure{}, _allocator->get_free_memory()};
122
123 void* code_ptr = nullptr;
124 void* mem_ptr = nullptr;
125 try {
126 if(success && std::holds_alternative<code_compilation_result_message>(message) && fds.size() == 2) {
127 code_compilation_result_message& result = std::get<code_compilation_result_message>(message);
128 code_ptr = _allocator->allocate(get_size_of_fd(fds[0]));
129 mem_ptr = _allocator->allocate(get_size_of_fd(fds[1]));
130
131 if(code_ptr == nullptr || mem_ptr == nullptr) {
132 _allocator->deallocate(code_ptr);
133 _allocator->deallocate(mem_ptr);
134 reply.result = compilation_result_toofull();
135 }
136 else {
137 copy_memfd_contents_to_pointer(code_ptr, fds[0]);
138 copy_memfd_contents_to_pointer(mem_ptr, fds[1]);
139
140 reply.result = code_descriptor {
141 code.code_id,
142 code.vm_version,
143 current_codegen_version,
144 (uintptr_t)code_ptr - (uintptr_t)_code_mapping,
145 result.start,
146 result.apply_offset,
147 result.starting_memory_pages,
148 (uintptr_t)mem_ptr - (uintptr_t)_code_mapping,
149 (unsigned)get_size_of_fd(fds[1]),
150 result.initdata_prologue_size
151 };
152 }
153 }
154 }
155 catch(...) {
156 _allocator->deallocate(code_ptr);
157 _allocator->deallocate(mem_ptr);
158 }
159
160 write_message_with_fds(_nodeop_instance_socket, reply);
161
162 //either way, we are done
163 _ctx.post([this, current_compile_it]() {
164 current_compiles.erase(current_compile_it);
165 });
166 });
167
168 }
169
170 boost::signals2::signal<void()> connection_dead_signal;
171
172private:
173 boost::asio::io_context& _ctx;
174 local::datagram_protocol::socket _nodeop_instance_socket;
175 wrapped_fd _cache_fd;
176 wrapped_fd& _trampoline_socket;
177
178 char* _code_mapping;
179 size_t _code_size;
180 allocator_t* _allocator;
181
182 std::list<std::tuple<code_tuple, local::datagram_protocol::socket>> current_compiles;
183};
184
186 compile_monitor(boost::asio::io_context& ctx, local::datagram_protocol::socket&& n, wrapped_fd&& t) : _nodeop_socket(std::move(n)), _trampoline_socket(std::move(t)) {
187 //the only duty of compile_monitor is to create a compile_monitor_session when a code_cache instance
188 // in nodeop wants one
190 }
191
192 void wait_for_new_incomming_session(boost::asio::io_context& ctx) {
193 _nodeop_socket.async_wait(boost::asio::local::datagram_protocol::socket::wait_read, [this, &ctx](auto ec) {
194 if(ec) {
195 ctx.stop();
196 return;
197 }
198 auto [success, message, fds] = read_message_with_fds(_nodeop_socket);
199 if(!success) { //failure reading indicates that nodeop has shut down
200 ctx.stop();
201 return;
202 }
203 if(!std::holds_alternative<initialize_message>(message) || fds.size() != 2) {
204 ctx.stop();
205 return;
206 }
207 try {
208 local::datagram_protocol::socket _socket_for_comm(ctx);
209 _socket_for_comm.assign(local::datagram_protocol(), fds[0].release());
210 _compile_sessions.emplace_front(ctx, std::move(_socket_for_comm), std::move(fds[1]), _trampoline_socket);
211 _compile_sessions.front().connection_dead_signal.connect([&, it = _compile_sessions.begin()]() {
212 ctx.post([&]() {
213 _compile_sessions.erase(it);
214 });
215 });
217 }
218 catch(const std::exception& e) {
220 }
221 catch(...) {
222 write_message_with_fds(_nodeop_socket, initalize_response_message{"Failed to create compile process"});
223 }
224
226 });
227 }
228
229 local::datagram_protocol::socket _nodeop_socket;
231
232 std::list<compile_monitor_session> _compile_sessions;
233};
234
235void launch_compile_monitor(int nodeop_fd) {
236 prctl(PR_SET_NAME, "oc-monitor");
237 prctl(PR_SET_PDEATHSIG, SIGKILL);
238
239 //first off, let's disable shutdown signals to us; we want all shutdown indicators to come from
240 // nodeop shutting us down
241 sigset_t set;
242 sigemptyset(&set);
243 sigaddset(&set, SIGHUP);
244 sigaddset(&set, SIGTERM);
245 sigaddset(&set, SIGPIPE);
246 sigaddset(&set, SIGINT);
247 sigaddset(&set, SIGQUIT);
248 sigprocmask(SIG_BLOCK, &set, nullptr);
249
250 struct sigaction sa;
251 sa.sa_handler = SIG_DFL;
252 sa.sa_flags = SA_NOCLDWAIT;
253 sigaction(SIGCHLD, &sa, nullptr);
254
255 int socks[2]; //0: local trampoline socket, 1: the one we give to trampoline
256 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
257 pid_t child = fork();
258 if(child == 0) {
259 close(socks[0]);
260 run_compile_trampoline(socks[1]);
261 }
262 close(socks[1]);
263
264 {
265 boost::asio::io_context ctx;
266 boost::asio::local::datagram_protocol::socket nodeop_socket(ctx);
267 nodeop_socket.assign(boost::asio::local::datagram_protocol(), nodeop_fd);
268 wrapped_fd trampoline_socket(socks[0]);
269 compile_monitor monitor(ctx, std::move(nodeop_socket), std::move(trampoline_socket));
270 ctx.run();
271 if(monitor._compile_sessions.size())
272 std::cerr << "ERROR: SYS VM OC compiler monitor exiting with active sessions" << std::endl;
273 }
274
275 _exit(0);
276}
277
279 void start() {
280 //create communication socket; let's hold off on asio usage until all forks are done
281 int socks[2];
282 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
283 compile_manager_fd = socks[0];
284
285 compile_manager_pid = fork();
286 if(compile_manager_pid == 0) {
287 close(socks[0]);
288 launch_compile_monitor(socks[1]);
289 }
290 close(socks[1]);
291 }
292
293 pid_t compile_manager_pid = -1;
294 int compile_manager_fd = -1;
295};
296
297static compile_monitor_trampoline the_compile_monitor_trampoline;
298extern "C" int __real_main(int, char*[]);
299extern "C" int __wrap_main(int argc, char* argv[]) {
300
301
302 the_compile_monitor_trampoline.start();
303 return __real_main(argc, argv);
304}
305
307 FC_ASSERT(the_compile_monitor_trampoline.compile_manager_pid >= 0, "SYS VM oop connection doesn't look active");
308
309 int socks[2]; //0: our socket to compile_manager_session, 1: socket we'll give to compile_maanger_session
310 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
311 wrapped_fd socket_to_monitor_session(socks[0]);
312 wrapped_fd socket_to_hand_to_monitor_session(socks[1]);
313
314 //we don't own cache_fd, so try to be extra careful not to accidentally close it: don't stick it in a wrapped_fd
315 // to hand off to write_message_with_fds even temporarily. make a copy of it.
316 int dup_of_cache_fd = dup(cache_fd);
317 FC_ASSERT(dup_of_cache_fd != -1, "failed to dup cache_fd");
318 wrapped_fd dup_cache_fd(dup_of_cache_fd);
319
320 std::vector<wrapped_fd> fds_to_pass;
321 fds_to_pass.emplace_back(std::move(socket_to_hand_to_monitor_session));
322 fds_to_pass.emplace_back(std::move(dup_cache_fd));
323 write_message_with_fds(the_compile_monitor_trampoline.compile_manager_fd, initialize_message(), fds_to_pass);
324
325 auto [success, message, fds] = read_message_with_fds(the_compile_monitor_trampoline.compile_manager_fd);
326 SYS_ASSERT(success, misc_exception, "failed to read response from monitor process");
327 SYS_ASSERT(std::holds_alternative<initalize_response_message>(message), misc_exception, "unexpected response from monitor process");
328 SYS_ASSERT(!std::get<initalize_response_message>(message).error_message, misc_exception, "Error message from monitor process: ${e}", ("e", *std::get<initalize_response_message>(message).error_message));
329 return socket_to_monitor_session;
330}
331
332}}}
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
Definition exceptions.hpp:7
int __real_main(int, char *[])
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
char ** argv
void close(T *e, websocketpp::connection_hdl hdl)
Definition name.hpp:106
bip::rbtree_best_fit< bip::null_mutex_family, bip::offset_ptr< void >, alignof(std::max_align_t)> allocator_t
void launch_compile_monitor(int nodeop_fd)
bool write_message_with_fds(boost::asio::local::datagram_protocol::socket &s, const eosvmoc_message &message, const std::vector< wrapped_fd > &fds=std::vector< wrapped_fd >())
std::tuple< bool, eosvmoc_message, std::vector< wrapped_fd > > read_message_with_fds(boost::asio::local::datagram_protocol::socket &s)
int __wrap_main(int argc, char *argv[])
std::variant< initialize_message, initalize_response_message, compile_wasm_message, evict_wasms_message, code_compilation_result_message, wasm_compilation_result_message > eosvmoc_message
wrapped_fd get_connection_to_compile_monitor(int cache_fd)
_W64 unsigned int uintptr_t
Definition stdint.h:165
void kick_compile_off(const code_tuple &code_id, wrapped_fd &&wasm_code)
void read_message_from_compile_task(std::list< std::tuple< code_tuple, local::datagram_protocol::socket > >::iterator current_compile_it)
compile_monitor_session(boost::asio::io_context &context, local::datagram_protocol::socket &&n, wrapped_fd &&c, wrapped_fd &t)
boost::signals2::signal< void()> connection_dead_signal
compile_monitor(boost::asio::io_context &ctx, local::datagram_protocol::socket &&n, wrapped_fd &&t)
std::list< compile_monitor_session > _compile_sessions
local::datagram_protocol::socket _nodeop_socket
void wait_for_new_incomming_session(boost::asio::io_context &ctx)
void monitor()
bool set
memcpy((char *) pInfo->slotDescription, s, l)