39 static constexpr size_t max_read_size = 16384;
48 int rc = uv_read_start((uv_stream_t*)&
uv_handle, on_alloc_cb, on_read_cb);
51 LOG_FAIL_FMT(
"uv_read_start failed: {}", uv_strerror(rc));
57 static void on_alloc_cb(
58 uv_handle_t* handle,
size_t suggested_size, uv_buf_t* buf)
60 static_cast<ProcessReader*
>(handle->data)->on_alloc(suggested_size, buf);
63 static void on_read_cb(
64 uv_stream_t* handle, ssize_t nread,
const uv_buf_t* buf)
66 static_cast<ProcessReader*
>(handle->data)->on_read(nread, buf);
69 void on_alloc(
size_t suggested_size, uv_buf_t* buf)
71 auto alloc_size = std::min<size_t>(suggested_size, max_read_size);
73 "Allocating {} bytes for reading from host process pid={}",
77 buf->base =
new char[alloc_size];
78 buf->len = alloc_size;
81 void on_read(ssize_t nread,
const uv_buf_t* buf)
86 "ProcessReader on_read: status={} pid={} file={}",
99 buffer.insert(buffer.end(), buf->base, buf->base + nread);
101 "Read {} bytes from host process, total={} file={}",
110 void on_free(
const uv_buf_t* buf)
120 auto start = buffer.begin();
123 auto newline = std::find(
start, buffer.end(),
'\n');
124 if (newline == buffer.end())
129 size_t count = newline -
start;
130 std::string_view line(&*
start, count);
136 buffer.erase(buffer.begin(),
start);
191 static constexpr size_t max_processes = 8;
193 bool stopping =
false;
197 std::vector<std::string> args;
198 std::vector<uint8_t> input;
199 std::chrono::steady_clock::time_point queued_at;
202 std::queue<QueueEntry> queued;
206 std::vector<std::string> args;
207 std::chrono::steady_clock::time_point started_at;
210 std::unordered_map<pid_t, ProcessEntry> running;
212 void maybe_process_next_entry()
214 if (stopping || queued.empty() || running.size() >= max_processes)
218 auto entry = std::move(queued.front());
220 handle_entry(std::move(entry));
223 void handle_entry(QueueEntry&& entry)
225 auto now = std::chrono::steady_clock::now();
227 std::chrono::duration_cast<std::chrono::milliseconds>(
228 now - entry.queued_at)
231 const auto& args = entry.args;
233 std::vector<const char*> argv;
234 for (
size_t i = 0; i < args.size(); i++)
236 argv.push_back(args.at(i).c_str());
238 argv.push_back(
nullptr);
244 auto handle =
new uv_process_t;
247 uv_stdio_container_t stdio[3];
248 stdio[0].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_READABLE_PIPE);
249 stdio[0].data.stream = stdin_writer->stream();
251 stdio[1].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
252 stdio[1].data.stream = stdout_reader->stream();
254 stdio[2].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
255 stdio[2].data.stream = stderr_reader->stream();
257 uv_process_options_t options = {};
258 options.file = argv.at(0);
259 options.args =
const_cast<char**
>(argv.data());
260 options.exit_cb = ProcessLauncher::on_process_exit;
261 options.stdio = stdio;
262 options.stdio_count = 3;
264 auto rc = uv_spawn(uv_default_loop(), handle, &options);
267 LOG_FAIL_FMT(
"Error starting host process: {}", uv_strerror(rc));
272 "Launching host process: pid={} queuetime={}ms cmd={}",
275 fmt::join(args,
" "));
277 stdin_writer.
release()->start(handle->pid);
278 stdout_reader.
release()->start(handle->pid);
279 stderr_reader.
release()->start(handle->pid);
281 auto started_at = std::chrono::steady_clock::now();
282 ProcessEntry process_entry{std::move(entry.args), started_at};
283 running.insert({handle->pid, std::move(process_entry)});
286 static void on_process_exit(
287 uv_process_t* handle, int64_t exit_status,
int term_signal)
290 ->on_process_exit(handle, exit_status);
293 void on_process_exit(uv_process_t* handle, int64_t exit_status)
295 auto& process = running.at(handle->pid);
297 auto t_end = std::chrono::steady_clock::now();
298 auto runtime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
299 t_end - process.started_at)
302 if (exit_status == 0)
305 "Host process exited: pid={} status={} runtime={}ms cmd={}",
309 fmt::join(process.args,
" "));
314 "Host process exited: pid={} status={} runtime={}ms cmd={}",
318 fmt::join(process.args,
" "));
321 running.erase(handle->pid);
323 maybe_process_next_entry();
325 uv_close((uv_handle_t*)handle, ProcessLauncher::on_close);
328 static void on_close(uv_handle_t* handle)
339 AppMessage::launch_host_process,
340 [
this](
const uint8_t* data,
size_t size) {
342 ringbuffer::read_message<AppMessage::launch_host_process>(
345 auto obj = nlohmann::json::parse(
json);
348 auto queued_at = std::chrono::steady_clock::now();
357 queued.push(std::move(entry));
359 maybe_process_next_entry();