CCF
Loading...
Searching...
No Matches
process_launcher.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ds/messaging.h"
6#include "enclave/interface.h"
7
8#include <chrono>
9#include <queue>
10#include <unordered_map>
11#include <uv.h>
12
13namespace asynchost
14{
15 struct ProcessPipe : public with_uv_handle<uv_pipe_t>
16 {
17 public:
19 {
20 uv_handle.data = this;
21 uv_pipe_init(uv_default_loop(), &uv_handle, 0);
22 }
23 virtual ~ProcessPipe() = default;
24
25 uv_stream_t* stream()
26 {
27 return (uv_stream_t*)&uv_handle;
28 }
29
30 protected:
31 pid_t pid = 0;
32 };
33
38 {
39 static constexpr size_t max_read_size = 16384;
40
41 public:
42 ProcessReader(std::string name) : name(name) {}
43
44 void start(pid_t pid)
45 {
46 this->pid = pid;
47
48 int rc = uv_read_start((uv_stream_t*)&uv_handle, on_alloc_cb, on_read_cb);
49 if (rc < 0)
50 {
51 LOG_FAIL_FMT("uv_read_start failed: {}", uv_strerror(rc));
52 close();
53 }
54 }
55
56 private:
57 static void on_alloc_cb(
58 uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
59 {
60 static_cast<ProcessReader*>(handle->data)->on_alloc(suggested_size, buf);
61 }
62
63 static void on_read_cb(
64 uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
65 {
66 static_cast<ProcessReader*>(handle->data)->on_read(nread, buf);
67 }
68
69 void on_alloc(size_t suggested_size, uv_buf_t* buf)
70 {
71 auto alloc_size = std::min<size_t>(suggested_size, max_read_size);
73 "Allocating {} bytes for reading from host process pid={}",
74 alloc_size,
75 pid);
76
77 buf->base = new char[alloc_size];
78 buf->len = alloc_size;
79 }
80
81 void on_read(ssize_t nread, const uv_buf_t* buf)
82 {
83 if (nread < 0)
84 {
86 "ProcessReader on_read: status={} pid={} file={}",
87 uv_strerror(nread),
88 pid,
89 name);
90 // Print any trailing text which didn't have a newline
91 if (!buffer.empty())
92 {
93 LOG_INFO_FMT("{} from process {}: {}", name, pid, buffer);
94 }
95 close();
96 }
97 else if (nread > 0)
98 {
99 buffer.insert(buffer.end(), buf->base, buf->base + nread);
101 "Read {} bytes from host process, total={} file={}",
102 nread,
103 buffer.size(),
104 name);
105 print_lines();
106 }
107 on_free(buf);
108 }
109
110 void on_free(const uv_buf_t* buf)
111 {
112 delete[] buf->base;
113 }
114
118 void print_lines()
119 {
120 auto start = buffer.begin();
121 while (true)
122 {
123 auto newline = std::find(start, buffer.end(), '\n');
124 if (newline == buffer.end())
125 {
126 break;
127 }
128
129 size_t count = newline - start;
130 std::string_view line(&*start, count);
131 LOG_INFO_FMT("{} from process {}: {}", name, pid, line);
132
133 // Move past the newline character so we can look for the next one.
134 start = newline + 1;
135 }
136 buffer.erase(buffer.begin(), start);
137 }
138
139 std::string name;
140 std::string buffer;
141 };
142
147 {
148 public:
149 ProcessWriter(std::vector<uint8_t>&& data) : buffer(std::move(data))
150 {
151 request.data = this;
152 }
153
154 void start(pid_t pid)
155 {
156 this->pid = pid;
157
159 "Writing {} bytes to host process pid={}", buffer.size(), pid);
160
161 uv_buf_t buf = {(char*)buffer.data(), buffer.size()};
162 int rc =
163 uv_write(&request, (uv_stream_t*)&uv_handle, &buf, 1, on_write_done_cb);
164
165 if (rc < 0)
166 {
167 LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
168 close();
169 }
170 }
171
172 private:
173 static void on_write_done_cb(uv_write_t* req, int status)
174 {
175 static_cast<ProcessWriter*>(req->data)->on_write_done(req, status);
176 }
177
178 void on_write_done(uv_write_t* req, int status)
179 {
181 "Write to host process completed: status={} pid={}", status, pid);
182 close();
183 }
184
185 uv_write_t request;
186 std::vector<uint8_t> buffer;
187 };
188
190 {
191 static constexpr size_t max_processes = 8;
192
193 bool stopping = false;
194
195 struct QueueEntry
196 {
197 std::vector<std::string> args;
198 std::vector<uint8_t> input;
199 std::chrono::steady_clock::time_point queued_at;
200 };
201
202 std::queue<QueueEntry> queued;
203
204 struct ProcessEntry
205 {
206 std::vector<std::string> args;
207 std::chrono::steady_clock::time_point started_at;
208 };
209
210 std::unordered_map<pid_t, ProcessEntry> running;
211
212 void maybe_process_next_entry()
213 {
214 if (stopping || queued.empty() || running.size() >= max_processes)
215 {
216 return;
217 }
218 auto entry = std::move(queued.front());
219 queued.pop();
220 handle_entry(std::move(entry));
221 }
222
223 void handle_entry(QueueEntry&& entry)
224 {
225 auto now = std::chrono::steady_clock::now();
226 auto queue_time_ms =
227 std::chrono::duration_cast<std::chrono::milliseconds>(
228 now - entry.queued_at)
229 .count();
230
231 const auto& args = entry.args;
232
233 std::vector<const char*> argv;
234 for (size_t i = 0; i < args.size(); i++)
235 {
236 argv.push_back(args.at(i).c_str());
237 }
238 argv.push_back(nullptr);
239
240 close_ptr<ProcessReader> stdout_reader("stdout");
241 close_ptr<ProcessReader> stderr_reader("stderr");
242 close_ptr<ProcessWriter> stdin_writer(std::move(entry.input));
243
244 auto handle = new uv_process_t;
245 handle->data = this;
246
247 uv_stdio_container_t stdio[3];
248 stdio[0].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_READABLE_PIPE);
249 stdio[0].data.stream = stdin_writer->stream();
250
251 stdio[1].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
252 stdio[1].data.stream = stdout_reader->stream();
253
254 stdio[2].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
255 stdio[2].data.stream = stderr_reader->stream();
256
257 uv_process_options_t options = {};
258 options.file = argv.at(0);
259 options.args = const_cast<char**>(argv.data());
260 options.exit_cb = ProcessLauncher::on_process_exit;
261 options.stdio = stdio;
262 options.stdio_count = 3;
263
264 auto rc = uv_spawn(uv_default_loop(), handle, &options);
265 if (rc != 0)
266 {
267 LOG_FAIL_FMT("Error starting host process: {}", uv_strerror(rc));
268 return;
269 }
270
272 "Launching host process: pid={} queuetime={}ms cmd={}",
273 handle->pid,
274 queue_time_ms,
275 fmt::join(args, " "));
276
277 stdin_writer.release()->start(handle->pid);
278 stdout_reader.release()->start(handle->pid);
279 stderr_reader.release()->start(handle->pid);
280
281 auto started_at = std::chrono::steady_clock::now();
282 ProcessEntry process_entry{std::move(entry.args), started_at};
283 running.insert({handle->pid, std::move(process_entry)});
284 }
285
286 static void on_process_exit(
287 uv_process_t* handle, int64_t exit_status, int term_signal)
288 {
289 static_cast<ProcessLauncher*>(handle->data)
290 ->on_process_exit(handle, exit_status);
291 }
292
293 void on_process_exit(uv_process_t* handle, int64_t exit_status)
294 {
295 auto& process = running.at(handle->pid);
296
297 auto t_end = std::chrono::steady_clock::now();
298 auto runtime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
299 t_end - process.started_at)
300 .count();
301
302 if (exit_status == 0)
303 {
305 "Host process exited: pid={} status={} runtime={}ms cmd={}",
306 handle->pid,
307 exit_status,
308 runtime_ms,
309 fmt::join(process.args, " "));
310 }
311 else
312 {
314 "Host process exited: pid={} status={} runtime={}ms cmd={}",
315 handle->pid,
316 exit_status,
317 runtime_ms,
318 fmt::join(process.args, " "));
319 }
320
321 running.erase(handle->pid);
322
323 maybe_process_next_entry();
324
325 uv_close((uv_handle_t*)handle, ProcessLauncher::on_close);
326 }
327
328 static void on_close(uv_handle_t* handle)
329 {
330 delete handle;
331 }
332
333 public:
336 {
338 disp,
339 AppMessage::launch_host_process,
340 [this](const uint8_t* data, size_t size) {
341 auto [json, input] =
342 ringbuffer::read_message<AppMessage::launch_host_process>(
343 data, size);
344
345 auto obj = nlohmann::json::parse(json);
346 auto msg = obj.get<HostProcessArguments>();
347
348 auto queued_at = std::chrono::steady_clock::now();
349 QueueEntry entry{
350 std::move(msg.args),
351 std::move(input),
352 queued_at,
353 };
354
355 LOG_DEBUG_FMT("Queueing host process launch: {}", json);
356
357 queued.push(std::move(entry));
358
359 maybe_process_next_entry();
360 });
361 }
362
363 void stop()
364 {
365 stopping = true;
366 }
367 };
368}
Definition process_launcher.h:190
void stop()
Definition process_launcher.h:363
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition process_launcher.h:334
Definition process_launcher.h:38
void start(pid_t pid)
Definition process_launcher.h:44
ProcessReader(std::string name)
Definition process_launcher.h:42
Definition process_launcher.h:147
ProcessWriter(std::vector< uint8_t > &&data)
Definition process_launcher.h:149
void start(pid_t pid)
Definition process_launcher.h:154
Definition proxy.h:15
T * release()
Definition proxy.h:43
Definition proxy.h:82
void close()
Definition proxy.h:96
uv_pipe_t uv_handle
Definition proxy.h:84
Definition messaging.h:38
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition after_io.h:8
Definition json_schema.h:15
STL namespace.
Definition interface.h:51
Definition process_launcher.h:16
ProcessPipe()
Definition process_launcher.h:18
virtual ~ProcessPipe()=default
uv_stream_t * stream()
Definition process_launcher.h:25
pid_t pid
Definition process_launcher.h:31