CCF
Loading...
Searching...
No Matches
udp.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "before_io.h"
6#include "ccf/ds/logger.h"
7#include "ccf/pal/locking.h"
8#include "dns.h"
9#include "ds/pending_io.h"
10#include "proxy.h"
11#include "socket.h"
12
13#include <optional>
14
15namespace asynchost
16{
17 class UDPImpl;
19
22 class UDPImpl : public with_uv_handle<uv_udp_t>
23 {
24 private:
25 friend class close_ptr<UDPImpl>;
26
27 static constexpr int backlog = 128;
28 static constexpr size_t max_read_size = 16384;
29
30 // Each uv iteration, read only a capped amount from all sockets.
31 static constexpr auto max_read_quota = max_read_size * 4;
32 static size_t remaining_read_quota;
33
34 // This is a simplified version of the state machine for QUIC that
35 // mostly follows plain UDP state. We should add more when we need
36 // for QUIC, not predict complexity prematurely.
37 enum Status
38 {
39 // Starting state + failure recovery (if any)
40 FRESH,
41 // DNS::resolve
42 RESOLVING,
43 RESOLVING_FAILED,
44 // uv_udp_recv_start <-> on_read
45 READING,
46 READING_FAILED,
47 // uv_udp_send has no state (it's synchronous)
48 WRITING_FAILED,
49 // There is no connected/reconnect/disconnect
50 };
51
53 Status status;
55 std::unique_ptr<SocketBehaviour<UDP>> behaviour;
56
57 using PendingWrites = std::vector<PendingIO<uv_udp_send_t>>;
59 PendingWrites pending_writes;
60
62 std::string host;
64 std::string port;
66 std::optional<std::string> listen_name = std::nullopt;
67
69 addrinfo* addr_base = nullptr;
71 addrinfo* addr_current = nullptr;
72
73 bool port_assigned() const
74 {
75 return port != "0";
76 }
77
78 std::string get_address_name() const
79 {
80 const std::string port_suffix =
81 port_assigned() ? fmt::format(":{}", port) : "";
82
83 if (addr_current != nullptr && addr_current->ai_family == AF_INET6)
84 {
85 return fmt::format("[{}]{}", host, port_suffix);
86 }
87 else
88 {
89 return fmt::format("{}{}", host, port_suffix);
90 }
91 }
92
93 UDPImpl() : status(FRESH)
94 {
95 if (!init())
96 {
97 throw std::logic_error("uv UDP initialization failed");
98 }
99
100 uv_handle.data = this;
101 }
102
103 ~UDPImpl()
104 {
105 {
106 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
107 for (auto& req : pending_resolve_requests)
108 {
109 // The UV request objects can stay, but if there are any references
110 // to `this` left, we need to remove them.
111 if (req->data == this)
112 {
113 req->data = nullptr;
114 }
115 }
116 }
117 if (addr_base != nullptr)
118 {
119 uv_freeaddrinfo(addr_base);
120 }
121 }
122
123 public:
124 static void reset_read_quota()
125 {
126 remaining_read_quota = max_read_quota;
127 }
128
129 void set_behaviour(std::unique_ptr<SocketBehaviour<UDP>> b)
130 {
131 behaviour = std::move(b);
132 }
133
134 std::string get_host() const
135 {
136 return host;
137 }
138
139 std::string get_port() const
140 {
141 return port;
142 }
143
144 std::optional<std::string> get_listen_name() const
145 {
146 return listen_name;
147 }
148
150 bool listen(
151 const std::string& host_,
152 const std::string& port_,
153 const std::optional<std::string>& name = std::nullopt)
154 {
155 listen_name = name;
156 auto name_str = name.has_value() ? name.value() : "";
157 LOG_TRACE_FMT("UDP listen on {}:{} [{}]", host_, port_, name_str);
158 return resolve(host_, port_, false);
159 }
160
162 void start(int64_t id)
163 {
164 behaviour->on_start(id);
165 }
166
167 bool connect(const std::string& host_, const std::string& port_)
168 {
169 LOG_TRACE_FMT("UDP dummy connect to {}:{}", host, port);
170 return true;
171 }
172
173 bool write(size_t len, const uint8_t* data, sockaddr addr)
174 {
175 auto req = new uv_udp_send_t;
176 char* copy = new char[len];
177 if (data)
178 memcpy(copy, data, len);
179 req->data = copy;
180
181 switch (status)
182 {
183 // Handles unbound or in unknown state
184 case RESOLVING:
185 case RESOLVING_FAILED:
186 case READING_FAILED:
187 case WRITING_FAILED:
188 {
189 pending_writes.emplace_back(req, len, addr, free_write);
190 break;
191 }
192
193 // Both read and write handles have been bound here
194 case READING:
195 {
196 auto [h, p] = addr_to_str(&addr);
197 LOG_TRACE_FMT("UDP write addr: {}:{}", h, p);
198 return send_write(req, len, &addr);
199 }
200
201 // This shouldn't happen, but the only state is FRESH
202 default:
203 {
204 free_write(req);
205 throw std::logic_error(
206 fmt::format("Unexpected status during write: {}", status));
207 }
208 }
209
210 return true;
211 }
212
213 private:
215 bool init()
216 {
217 assert_status(FRESH, FRESH);
218
219 int rc;
220 LOG_TRACE_FMT("UDP init");
221 if ((rc = uv_udp_init(uv_default_loop(), &uv_handle)) < 0)
222 {
223 LOG_FAIL_FMT("uv_udp_init failed on recv handle: {}", uv_strerror(rc));
224 return false;
225 }
226
227 return true;
228 }
229
230 bool send_write(uv_udp_send_t* req, size_t len, const struct sockaddr* addr)
231 {
232 char* copy = (char*)req->data;
233
234 uv_buf_t buf;
235 buf.base = copy;
236 buf.len = len;
237
238 int rc;
239
240 auto [h, p] = addr_to_str(addr);
241 LOG_TRACE_FMT("UDP send_write addr: {}:{}", h, p);
242 std::string data(copy, len);
243 LOG_TRACE_FMT("UDP send_write [{}]", data);
244 if ((rc = uv_udp_send(req, &uv_handle, &buf, 1, addr, on_write)) < 0)
245 {
246 free_write(req);
247 LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
248 status = WRITING_FAILED;
249 behaviour->on_disconnect();
250 return false;
251 }
252
253 return true;
254 }
255
256 void update_resolved_address(int address_family, sockaddr* sa)
257 {
258 auto [h, p] = addr_to_str(sa, address_family);
259 host = h;
260 port = p;
261 LOG_TRACE_FMT("UDP update address to {}:{}", host, port);
262 }
263
264 void resolved()
265 {
266 int rc;
267
268 LOG_TRACE_FMT("UDP bind to {}:{}", host, port);
269 while (addr_current != nullptr)
270 {
271 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
272
273 if ((rc = uv_udp_bind(&uv_handle, addr_current->ai_addr, 0)) < 0)
274 {
275 addr_current = addr_current->ai_next;
277 "uv_udp_bind failed on {}: {}",
278 get_address_name(),
279 uv_strerror(rc));
280 continue;
281 }
282
283 // If bound on port 0 (ie - asking the OS to assign a port), then we
284 // need to call uv_udp_getsockname to retrieve the bound port
285 // (addr_current will not contain it)
286 if (!port_assigned())
287 {
288 sockaddr_storage sa_storage;
289 const auto sa = (sockaddr*)&sa_storage;
290 int sa_len = sizeof(sa_storage);
291 if ((rc = uv_udp_getsockname(&uv_handle, sa, &sa_len)) != 0)
292 {
293 LOG_FAIL_FMT("uv_udp_getsockname failed: {}", uv_strerror(rc));
294 }
295 update_resolved_address(addr_current->ai_family, sa);
296 }
297
298 LOG_TRACE_FMT("UDP to call on_listening");
299
300 behaviour->on_listening(host, port);
301
302 assert_status(RESOLVING, READING);
303 read_start();
304 return;
305 }
306
307 status = RESOLVING_FAILED;
308
309 // This should show even when verbose logs are off
311 "Unable to connect: all resolved addresses failed: {}:{}", host, port);
312 }
313
314 void assert_status(Status from, Status to)
315 {
316 if (status != from)
317 {
318 throw std::logic_error(fmt::format(
319 "Trying to transition from {} to {} but current status is {}",
320 from,
321 to,
322 status));
323 }
324
325 status = to;
326 }
327
328 bool resolve(
329 const std::string& host_, const std::string& port_, bool async = true)
330 {
331 host = host_;
332 port = port_;
333
334 LOG_TRACE_FMT("UDP resolve {}:{}", host, port);
335 if (addr_base != nullptr)
336 {
337 uv_freeaddrinfo(addr_base);
338 addr_base = nullptr;
339 addr_current = nullptr;
340 }
341
342 assert_status(FRESH, RESOLVING);
343
344 if (!DNS::resolve(host, port, this, on_resolved, async))
345 {
346 LOG_DEBUG_FMT("Resolving '{}' failed", host);
347 status = RESOLVING_FAILED;
348 return false;
349 }
350
351 return true;
352 }
353
354 static void on_resolved(uv_getaddrinfo_t* req, int rc, struct addrinfo* res)
355 {
356 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
357 pending_resolve_requests.erase(req);
358
359 LOG_TRACE_FMT("UDP on_resolve static");
360 if (req->data)
361 {
362 static_cast<UDPImpl*>(req->data)->on_resolved(req, rc);
363 }
364 else
365 {
366 // The UDPImpl that submitted the request has been destroyed, but we
367 // need to clean up the request object.
368 uv_freeaddrinfo(res);
369 delete req;
370 }
371 }
372
373 void on_resolved(uv_getaddrinfo_t* req, int rc)
374 {
375 LOG_TRACE_FMT("UDP on_resolve dynamic");
376 // It is possible that on_resolved is triggered after there has been a
377 // request to close uv_handle. In this scenario, we should not try to
378 // do anything with the handle and return immediately (otherwise,
379 // uv_close cb will abort).
380 if (uv_is_closing((uv_handle_t*)&uv_handle))
381 {
382 LOG_DEBUG_FMT("on_resolved: closing");
383 uv_freeaddrinfo(req->addrinfo);
384 delete req;
385 return;
386 }
387
388 if (rc < 0)
389 {
390 status = RESOLVING_FAILED;
391 LOG_DEBUG_FMT("UDP resolve failed: {}", uv_strerror(rc));
392 behaviour->on_resolve_failed();
393 }
394 else
395 {
396 addr_base = req->addrinfo;
397 addr_current = addr_base;
398
399 LOG_TRACE_FMT("UDP to call resolved");
400 resolved();
401 }
402
403 delete req;
404 }
405
406 void push_pending_writes()
407 {
408 for (auto& w : pending_writes)
409 {
410 auto [h, p] = addr_to_str(&w.addr);
411 LOG_TRACE_FMT("UDP pending_writes addr: {}:{}", h, p);
412 send_write(w.req, w.len, &w.addr);
413 w.req = nullptr;
414 }
415
416 PendingWrites().swap(pending_writes);
417 }
418
419 void read_start()
420 {
421 int rc;
422
423 LOG_TRACE_FMT("UDP read start");
424 if ((rc = uv_udp_recv_start(&uv_handle, on_alloc, on_read)) < 0)
425 {
426 status = READING_FAILED;
427 LOG_FAIL_FMT("uv_udp_read_start failed: {}", uv_strerror(rc));
428 behaviour->on_disconnect();
429 }
430 }
431
432 static void on_alloc(
433 uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
434 {
435 static_cast<UDPImpl*>(handle->data)->on_alloc(suggested_size, buf);
436 }
437
438 void on_alloc(size_t suggested_size, uv_buf_t* buf)
439 {
440 auto alloc_size = std::min(suggested_size, max_read_size);
441
442 alloc_size = std::min(alloc_size, remaining_read_quota);
443 remaining_read_quota -= alloc_size;
445 "Allocating {} bytes for UDP read ({} of quota remaining)",
446 alloc_size,
447 remaining_read_quota);
448
449 buf->base = new char[alloc_size];
450 buf->len = alloc_size;
451 }
452
453 void on_free(const uv_buf_t* buf)
454 {
455 delete[] buf->base;
456 }
457
458 static void on_read(
459 uv_udp_t* handle,
460 ssize_t sz,
461 const uv_buf_t* buf,
462 const struct sockaddr* addr,
463 unsigned flags)
464 {
465 static_cast<UDPImpl*>(handle->data)->on_read(sz, buf, addr, flags);
466 }
467
468 void on_read(
469 ssize_t sz,
470 const uv_buf_t* buf,
471 const struct sockaddr* addr,
472 unsigned flags)
473 {
474 if (sz == 0)
475 {
476 on_free(buf);
477 return;
478 }
479
480 if (sz == UV_ENOBUFS)
481 {
482 LOG_DEBUG_FMT("UDP on_read reached allocation quota");
483 on_free(buf);
484 return;
485 }
486
487 if (sz < 0)
488 {
489 on_free(buf);
490 LOG_DEBUG_FMT("UDP on_read: {}", uv_strerror(sz));
491 behaviour->on_disconnect();
492 return;
493 }
494
495 auto [h, p] = addr_to_str(addr);
496 LOG_TRACE_FMT("UDP on_read addr: {}:{}", h, p);
497
498 uint8_t* b = (uint8_t*)buf->base;
499 std::string data((char*)b, sz);
500 LOG_TRACE_FMT("UDP on_read [{}]", data);
501 behaviour->on_read((size_t)sz, b, *addr);
502
503 if (b != nullptr)
504 on_free(buf);
505 }
506
507 static void on_write(uv_udp_send_t* req, int)
508 {
509 free_write(req);
510 }
511
512 static void free_write(uv_udp_send_t* req)
513 {
514 if (req == nullptr)
515 return;
516
517 char* copy = (char*)req->data;
518 delete[] copy;
519 delete req;
520 }
521 };
522
524 {
525 public:
527
529 {
531 }
532 };
533
535}
static bool resolve(const std::string &host_, const std::string &service, void *ud, uv_getaddrinfo_cb cb, bool async)
Definition dns.h:19
ResetUDPReadQuotaImpl()
Definition udp.h:526
void before_io()
Definition udp.h:528
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
Definition udp.h:23
bool connect(const std::string &host_, const std::string &port_)
Definition udp.h:167
std::optional< std::string > get_listen_name() const
Definition udp.h:144
std::string get_port() const
Definition udp.h:139
void start(int64_t id)
Start the service via behaviour (register on ringbuffer, etc)
Definition udp.h:162
std::string get_host() const
Definition udp.h:134
bool listen(const std::string &host_, const std::string &port_, const std::optional< std::string > &name=std::nullopt)
Listen to packets on host:port.
Definition udp.h:150
void set_behaviour(std::unique_ptr< SocketBehaviour< UDP > > b)
Definition udp.h:129
bool write(size_t len, const uint8_t *data, sockaddr addr)
Definition udp.h:173
static void reset_read_quota()
Definition udp.h:124
Definition proxy.h:15
Definition proxy.h:51
Definition proxy.h:82
uv_udp_t uv_handle
Definition proxy.h:84
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
Definition after_io.h:8
std::pair< std::string, std::string > addr_to_str(const sockaddr *addr, int address_family=AF_INET)
Definition socket.h:83
auto handle
Definition kv_helpers.h:85
Definition configuration.h:14