12#include <boost/asio/local/datagram_protocol.hpp>
13#include <boost/signals2.hpp>
15namespace sysio {
namespace chain {
namespace eosvmoc {
19static size_t get_size_of_fd(
int fd) {
21 FC_ASSERT(fstat(fd, &st) == 0,
"failed to get size of fd");
25static void copy_memfd_contents_to_pointer(
void* dst,
int fd) {
27 FC_ASSERT(fstat(fd, &st) == 0,
"failed to get size of fd");
30 void* contents = mmap(
nullptr, st.st_size, PROT_READ, MAP_SHARED, fd, 0);
31 FC_ASSERT(contents != MAP_FAILED,
"failed to map memfd file");
32 memcpy(dst, contents, st.st_size);
33 munmap(contents, st.st_size);
39 _nodeop_instance_socket(
std::move(n)),
40 _cache_fd(
std::move(c)),
41 _trampoline_socket(t) {
44 FC_ASSERT(fstat(_cache_fd, &st) == 0,
"failed to stat cache fd");
45 _code_size = st.st_size;
46 _code_mapping = (
char*)mmap(
nullptr, _code_size, PROT_READ|PROT_WRITE, MAP_SHARED, _cache_fd, 0);
47 FC_ASSERT(_code_mapping != MAP_FAILED,
"failed to mmap cache file");
48 _allocator =
reinterpret_cast<allocator_t*
>(_code_mapping);
54 munmap(_code_mapping, _code_size);
58 _nodeop_instance_socket.async_wait(local::datagram_protocol::socket::wait_read, [
this](
auto ec) {
78 _allocator->deallocate(_code_mapping + cd.
code_begin);
96 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
97 local::datagram_protocol::socket response_socket(_ctx);
98 response_socket.assign(local::datagram_protocol(), socks[0]);
99 std::vector<wrapped_fd> fds_pass_to_trampoline;
100 fds_pass_to_trampoline.emplace_back(socks[1]);
101 fds_pass_to_trampoline.emplace_back(std::move(wasm_code));
104 if(
write_message_with_fds(_trampoline_socket, trampoline_compile_request, fds_pass_to_trampoline) ==
false) {
110 current_compiles.emplace_front(code_id, std::move(response_socket));
115 auto& [code, socket] = *current_compile_it;
116 socket.async_wait(local::datagram_protocol::socket::wait_read, [
this, current_compile_it](
auto ec) {
118 auto& [code, socket] = *current_compile_it;
123 void* code_ptr =
nullptr;
124 void* mem_ptr =
nullptr;
126 if(success && std::holds_alternative<code_compilation_result_message>(message) && fds.size() == 2) {
128 code_ptr = _allocator->allocate(get_size_of_fd(fds[0]));
129 mem_ptr = _allocator->allocate(get_size_of_fd(fds[1]));
131 if(code_ptr ==
nullptr || mem_ptr ==
nullptr) {
132 _allocator->deallocate(code_ptr);
133 _allocator->deallocate(mem_ptr);
137 copy_memfd_contents_to_pointer(code_ptr, fds[0]);
138 copy_memfd_contents_to_pointer(mem_ptr, fds[1]);
143 current_codegen_version,
147 result.starting_memory_pages,
149 (unsigned)get_size_of_fd(fds[1]),
150 result.initdata_prologue_size
156 _allocator->deallocate(code_ptr);
157 _allocator->deallocate(mem_ptr);
163 _ctx.post([
this, current_compile_it]() {
164 current_compiles.erase(current_compile_it);
173 boost::asio::io_context& _ctx;
174 local::datagram_protocol::socket _nodeop_instance_socket;
182 std::list<std::tuple<code_tuple, local::datagram_protocol::socket>> current_compiles;
193 _nodeop_socket.async_wait(boost::asio::local::datagram_protocol::socket::wait_read, [
this, &ctx](
auto ec) {
203 if(!std::holds_alternative<initialize_message>(message) || fds.size() != 2) {
208 local::datagram_protocol::socket _socket_for_comm(ctx);
209 _socket_for_comm.assign(local::datagram_protocol(), fds[0].release());
213 _compile_sessions.erase(it);
218 catch(
const std::exception& e) {
236 prctl(PR_SET_NAME,
"oc-monitor");
237 prctl(PR_SET_PDEATHSIG, SIGKILL);
243 sigaddset(&set, SIGHUP);
244 sigaddset(&set, SIGTERM);
245 sigaddset(&set, SIGPIPE);
246 sigaddset(&set, SIGINT);
247 sigaddset(&set, SIGQUIT);
248 sigprocmask(SIG_BLOCK, &set,
nullptr);
251 sa.sa_handler = SIG_DFL;
252 sa.sa_flags = SA_NOCLDWAIT;
253 sigaction(SIGCHLD, &sa,
nullptr);
256 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
257 pid_t child = fork();
265 boost::asio::io_context ctx;
266 boost::asio::local::datagram_protocol::socket nodeop_socket(ctx);
267 nodeop_socket.assign(boost::asio::local::datagram_protocol(), nodeop_fd);
271 if(
monitor._compile_sessions.size())
272 std::cerr <<
"ERROR: SYS VM OC compiler monitor exiting with active sessions" << std::endl;
282 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
283 compile_manager_fd = socks[0];
285 compile_manager_pid = fork();
286 if(compile_manager_pid == 0) {
293 pid_t compile_manager_pid = -1;
294 int compile_manager_fd = -1;
302 the_compile_monitor_trampoline.
start();
310 socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, socks);
311 wrapped_fd socket_to_monitor_session(socks[0]);
312 wrapped_fd socket_to_hand_to_monitor_session(socks[1]);
316 int dup_of_cache_fd = dup(cache_fd);
317 FC_ASSERT(dup_of_cache_fd != -1,
"failed to dup cache_fd");
320 std::vector<wrapped_fd> fds_to_pass;
321 fds_to_pass.emplace_back(std::move(socket_to_hand_to_monitor_session));
322 fds_to_pass.emplace_back(std::move(dup_cache_fd));
326 SYS_ASSERT(success, misc_exception,
"failed to read response from monitor process");
327 SYS_ASSERT(std::holds_alternative<initalize_response_message>(message), misc_exception,
"unexpected response from monitor process");
328 SYS_ASSERT(!std::get<initalize_response_message>(message).
error_message, misc_exception,
"Error message from monitor process: ${e}", (
"e", *std::get<initalize_response_message>(message).
error_message));
329 return socket_to_monitor_session;
#define SYS_ASSERT(expr, exc_type, FORMAT,...)
int __real_main(int, char *[])
#define FC_ASSERT(TEST,...)
Checks a condition and throws an assert_exception if the test is FALSE.
void close(T *e, websocketpp::connection_hdl hdl)
bip::rbtree_best_fit< bip::null_mutex_family, bip::offset_ptr< void >, alignof(std::max_align_t)> allocator_t
void launch_compile_monitor(int nodeop_fd)
void run_compile_trampoline(int fd)
bool write_message_with_fds(boost::asio::local::datagram_protocol::socket &s, const eosvmoc_message &message, const std::vector< wrapped_fd > &fds=std::vector< wrapped_fd >())
std::tuple< bool, eosvmoc_message, std::vector< wrapped_fd > > read_message_with_fds(boost::asio::local::datagram_protocol::socket &s)
int __wrap_main(int argc, char *argv[])
std::variant< initialize_message, initalize_response_message, compile_wasm_message, evict_wasms_message, code_compilation_result_message, wasm_compilation_result_message > eosvmoc_message
wrapped_fd get_connection_to_compile_monitor(int cache_fd)
_W64 unsigned int uintptr_t
void kick_compile_off(const code_tuple &code_id, wrapped_fd &&wasm_code)
~compile_monitor_session()
void read_message_from_compile_task(std::list< std::tuple< code_tuple, local::datagram_protocol::socket > >::iterator current_compile_it)
void read_message_from_nodeop()
compile_monitor_session(boost::asio::io_context &context, local::datagram_protocol::socket &&n, wrapped_fd &&c, wrapped_fd &t)
boost::signals2::signal< void()> connection_dead_signal
pid_t compile_manager_pid
compile_monitor(boost::asio::io_context &ctx, local::datagram_protocol::socket &&n, wrapped_fd &&t)
wrapped_fd _trampoline_socket
std::list< compile_monitor_session > _compile_sessions
local::datagram_protocol::socket _nodeop_socket
void wait_for_new_incomming_session(boost::asio::io_context &ctx)
memcpy((char *) pInfo->slotDescription, s, l)