CCF
Loading...
Searching...
No Matches
udp.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "before_io.h"
6#include "ccf/pal/locking.h"
7#include "dns.h"
9#include "ds/pending_io.h"
10#include "proxy.h"
11#include "socket.h"
12
13#include <optional>
14
15namespace asynchost
16{
17 // NOLINTBEGIN(cppcoreguidelines-virtual-class-destructor)
18 class UDPImpl;
20
23 class UDPImpl : public with_uv_handle<uv_udp_t>
24 {
25 private:
26 friend class close_ptr<UDPImpl>;
27
28 static constexpr int backlog = 128;
29 static constexpr size_t max_read_size = 16384;
30
31 // Each uv iteration, read only a capped amount from all sockets.
32 static constexpr auto max_read_quota = max_read_size * 4;
33 static size_t remaining_read_quota;
34
35 // This is a simplified version of the state machine for QUIC that
36 // mostly follows plain UDP state. We should add more when we need
37 // for QUIC, not predict complexity prematurely.
38 enum Status : uint8_t
39 {
40 // Starting state + failure recovery (if any)
41 FRESH,
42 // DNS::resolve
43 RESOLVING,
44 RESOLVING_FAILED,
45 // uv_udp_recv_start <-> on_read
46 READING,
47 READING_FAILED,
48 // uv_udp_send has no state (it's synchronous)
49 WRITING_FAILED,
50 // There is no connected/reconnect/disconnect
51 };
52
54 Status status{FRESH};
56 std::unique_ptr<SocketBehaviour<UDP>> behaviour;
57
58 using PendingWrites = std::vector<PendingIO<uv_udp_send_t>>;
60 PendingWrites pending_writes;
61
63 std::string host;
65 std::string port;
67 std::optional<std::string> listen_name = std::nullopt;
68
70 addrinfo* addr_base = nullptr;
72 addrinfo* addr_current = nullptr;
73
74 [[nodiscard]] bool port_assigned() const
75 {
76 return port != "0";
77 }
78
79 [[nodiscard]] std::string get_address_name() const
80 {
81 const std::string port_suffix =
82 port_assigned() ? fmt::format(":{}", port) : "";
83
84 if (addr_current != nullptr && addr_current->ai_family == AF_INET6)
85 {
86 return fmt::format("[{}]{}", host, port_suffix);
87 }
88
89 return fmt::format("{}{}", host, port_suffix);
90 }
91
92 UDPImpl()
93 {
94 if (!init())
95 {
96 throw std::logic_error("uv UDP initialization failed");
97 }
98
99 uv_handle.data = this;
100 }
101
102 ~UDPImpl() override
103 {
104 {
105 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
106 for (const auto& req : pending_resolve_requests)
107 {
108 // The UV request objects can stay, but if there are any references
109 // to `this` left, we need to remove them.
110 if (req->data == this)
111 {
112 req->data = nullptr;
113 }
114 }
115 }
116 if (addr_base != nullptr)
117 {
118 uv_freeaddrinfo(addr_base);
119 }
120 }
121
122 public:
123 static void reset_read_quota()
124 {
125 remaining_read_quota = max_read_quota;
126 }
127
128 void set_behaviour(std::unique_ptr<SocketBehaviour<UDP>> b)
129 {
130 behaviour = std::move(b);
131 }
132
133 [[nodiscard]] std::string get_host() const
134 {
135 return host;
136 }
137
138 [[nodiscard]] std::string get_port() const
139 {
140 return port;
141 }
142
143 [[nodiscard]] std::optional<std::string> get_listen_name() const
144 {
145 return listen_name;
146 }
147
149 bool listen(
150 const std::string& host_,
151 const std::string& port_,
152 const std::optional<std::string>& name = std::nullopt)
153 {
154 listen_name = name;
155 auto name_str = name.has_value() ? name.value() : "";
156 LOG_TRACE_FMT("UDP listen on {}:{} [{}]", host_, port_, name_str);
157 return resolve(host_, port_, false);
158 }
159
161 void start(int64_t id)
162 {
163 behaviour->on_start(id);
164 }
165
166 bool connect(const std::string& /*host_*/, const std::string& /*port_*/)
167 {
168 LOG_TRACE_FMT("UDP dummy connect to {}:{}", host, port);
169 return true;
170 }
171
172 bool write(size_t len, const uint8_t* data, sockaddr addr)
173 {
174 auto* req = new uv_udp_send_t; // NOLINT(cppcoreguidelines-owning-memory)
175 auto* copy = new char[len]; // NOLINT(cppcoreguidelines-owning-memory)
176 if (data != nullptr)
177 {
178 memcpy(copy, data, len);
179 }
180 req->data = copy;
181
182 switch (status)
183 {
184 // Handles unbound or in unknown state
185 case RESOLVING:
186 case RESOLVING_FAILED:
187 case READING_FAILED:
188 case WRITING_FAILED:
189 {
190 pending_writes.emplace_back(req, len, addr, free_write);
191 break;
192 }
193
194 // Both read and write handles have been bound here
195 case READING:
196 {
197 auto [h, p] = addr_to_str(&addr);
198 LOG_TRACE_FMT("UDP write addr: {}:{}", h, p);
199 return send_write(req, len, &addr);
200 }
201
202 // This shouldn't happen, but the only state is FRESH
203 default:
204 {
205 free_write(req);
206 throw std::logic_error(
207 fmt::format("Unexpected status during write: {}", status));
208 }
209 }
210
211 return true;
212 }
213
214 private:
216 bool init()
217 {
218 assert_status(FRESH, FRESH);
219
220 int rc = 0;
221 LOG_TRACE_FMT("UDP init");
222 if ((rc = uv_udp_init(uv_default_loop(), &uv_handle)) < 0)
223 {
224 LOG_FAIL_FMT("uv_udp_init failed on recv handle: {}", uv_strerror(rc));
225 return false;
226 }
227
228 return true;
229 }
230
231 bool send_write(uv_udp_send_t* req, size_t len, const struct sockaddr* addr)
232 {
233 auto* copy = static_cast<char*>(req->data);
234
235 uv_buf_t buf;
236 buf.base = copy;
237 buf.len = len;
238
239 int rc = 0;
240
241 auto [h, p] = addr_to_str(addr);
242 LOG_TRACE_FMT("UDP send_write addr: {}:{}", h, p);
243 std::string data(copy, len);
244 LOG_TRACE_FMT("UDP send_write [{}]", data);
245 if ((rc = uv_udp_send(req, &uv_handle, &buf, 1, addr, on_write)) < 0)
246 {
247 free_write(req);
248 LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
249 status = WRITING_FAILED;
250 behaviour->on_disconnect();
251 return false;
252 }
253
254 return true;
255 }
256
257 void update_resolved_address(int address_family, sockaddr* sa)
258 {
259 auto [h, p] = addr_to_str(sa, address_family);
260 host = h;
261 port = p;
262 LOG_TRACE_FMT("UDP update address to {}:{}", host, port);
263 }
264
265 void resolved()
266 {
267 int rc = 0;
268
269 LOG_TRACE_FMT("UDP bind to {}:{}", host, port);
270 while (addr_current != nullptr)
271 {
272 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
273
274 if ((rc = uv_udp_bind(&uv_handle, addr_current->ai_addr, 0)) < 0)
275 {
276 addr_current = addr_current->ai_next;
278 "uv_udp_bind failed on {}: {}",
279 get_address_name(),
280 uv_strerror(rc));
281 continue;
282 }
283
284 // If bound on port 0 (ie - asking the OS to assign a port), then we
285 // need to call uv_udp_getsockname to retrieve the bound port
286 // (addr_current will not contain it)
287 if (!port_assigned())
288 {
289 sockaddr_storage sa_storage{};
290 auto* const sa = reinterpret_cast<sockaddr*>(&sa_storage);
291 int sa_len = sizeof(sa_storage);
292 if ((rc = uv_udp_getsockname(&uv_handle, sa, &sa_len)) != 0)
293 {
294 LOG_FAIL_FMT("uv_udp_getsockname failed: {}", uv_strerror(rc));
295 }
296 update_resolved_address(addr_current->ai_family, sa);
297 }
298
299 LOG_TRACE_FMT("UDP to call on_listening");
300
301 behaviour->on_listening(host, port);
302
303 assert_status(RESOLVING, READING);
304 read_start();
305 return;
306 }
307
308 status = RESOLVING_FAILED;
309
310 // This should show even when verbose logs are off
312 "Unable to connect: all resolved addresses failed: {}:{}", host, port);
313 }
314
315 void assert_status(Status from, Status to)
316 {
317 if (status != from)
318 {
319 throw std::logic_error(fmt::format(
320 "Trying to transition from {} to {} but current status is {}",
321 from,
322 to,
323 status));
324 }
325
326 status = to;
327 }
328
329 bool resolve(
330 const std::string& host_, const std::string& port_, bool async = true)
331 {
332 host = host_;
333 port = port_;
334
335 LOG_TRACE_FMT("UDP resolve {}:{}", host, port);
336 if (addr_base != nullptr)
337 {
338 uv_freeaddrinfo(addr_base);
339 addr_base = nullptr;
340 addr_current = nullptr;
341 }
342
343 assert_status(FRESH, RESOLVING);
344
345 if (!DNS::resolve(host, port, this, on_resolved, async))
346 {
347 LOG_DEBUG_FMT("Resolving '{}' failed", host);
348 status = RESOLVING_FAILED;
349 return false;
350 }
351
352 return true;
353 }
354
355 static void on_resolved(uv_getaddrinfo_t* req, int rc, struct addrinfo* res)
356 {
357 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
358 pending_resolve_requests.erase(req);
359
360 LOG_TRACE_FMT("UDP on_resolve static");
361 if (req->data != nullptr)
362 {
363 static_cast<UDPImpl*>(req->data)->on_resolved(req, rc);
364 }
365 else
366 {
367 // The UDPImpl that submitted the request has been destroyed, but we
368 // need to clean up the request object.
369 uv_freeaddrinfo(res);
370 delete req; // NOLINT(cppcoreguidelines-owning-memory)
371 }
372 }
373
374 void on_resolved(uv_getaddrinfo_t* req, int rc)
375 {
376 LOG_TRACE_FMT("UDP on_resolve dynamic");
377 // It is possible that on_resolved is triggered after there has been a
378 // request to close uv_handle. In this scenario, we should not try to
379 // do anything with the handle and return immediately (otherwise,
380 // uv_close cb will abort).
381 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) != 0)
382 {
383 LOG_DEBUG_FMT("on_resolved: closing");
384 uv_freeaddrinfo(req->addrinfo);
385 delete req; // NOLINT(cppcoreguidelines-owning-memory)
386 return;
387 }
388
389 if (rc < 0)
390 {
391 status = RESOLVING_FAILED;
392 LOG_DEBUG_FMT("UDP resolve failed: {}", uv_strerror(rc));
393 behaviour->on_resolve_failed();
394 }
395 else
396 {
397 addr_base = req->addrinfo;
398 addr_current = addr_base;
399
400 LOG_TRACE_FMT("UDP to call resolved");
401 resolved();
402 }
403
404 delete req; // NOLINT(cppcoreguidelines-owning-memory)
405 }
406
407 void push_pending_writes()
408 {
409 for (auto& w : pending_writes)
410 {
411 auto [h, p] = addr_to_str(&w.addr);
412 LOG_TRACE_FMT("UDP pending_writes addr: {}:{}", h, p);
413 send_write(w.req, w.len, &w.addr);
414 w.req = nullptr;
415 }
416
417 PendingWrites().swap(pending_writes);
418 }
419
420 void read_start()
421 {
422 int rc = 0;
423
424 LOG_TRACE_FMT("UDP read start");
425 if ((rc = uv_udp_recv_start(&uv_handle, on_alloc, on_read)) < 0)
426 {
427 status = READING_FAILED;
428 LOG_FAIL_FMT("uv_udp_read_start failed: {}", uv_strerror(rc));
429 behaviour->on_disconnect();
430 }
431 }
432
433 static void on_alloc(
434 uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
435 {
436 static_cast<UDPImpl*>(handle->data)->on_alloc(suggested_size, buf);
437 }
438
439 void on_alloc(size_t suggested_size, uv_buf_t* buf)
440 {
441 auto alloc_size = std::min(suggested_size, max_read_size);
442
443 alloc_size = std::min(alloc_size, remaining_read_quota);
444 remaining_read_quota -= alloc_size;
446 "Allocating {} bytes for UDP read ({} of quota remaining)",
447 alloc_size,
448 remaining_read_quota);
449
450 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
451 buf->base = new char[alloc_size];
452 buf->len = alloc_size;
453 }
454
455 void on_free(const uv_buf_t* buf)
456 {
457 delete[] buf->base; // NOLINT(cppcoreguidelines-owning-memory)
458 }
459
460 static void on_read(
461 uv_udp_t* handle,
462 ssize_t sz,
463 const uv_buf_t* buf,
464 const struct sockaddr* addr,
465 unsigned flags)
466 {
467 static_cast<UDPImpl*>(handle->data)->on_read(sz, buf, addr, flags);
468 }
469
470 void on_read(
471 ssize_t sz,
472 const uv_buf_t* buf,
473 const struct sockaddr* addr,
474 unsigned /*flags*/)
475 {
476 if (sz == 0)
477 {
478 on_free(buf);
479 return;
480 }
481
482 if (sz == UV_ENOBUFS)
483 {
484 LOG_DEBUG_FMT("UDP on_read reached allocation quota");
485 on_free(buf);
486 return;
487 }
488
489 if (sz < 0)
490 {
491 on_free(buf);
492 LOG_DEBUG_FMT("UDP on_read: {}", uv_strerror(static_cast<int>(sz)));
493 behaviour->on_disconnect();
494 return;
495 }
496
497 auto [h, p] = addr_to_str(addr);
498 LOG_TRACE_FMT("UDP on_read addr: {}:{}", h, p);
499
500 auto* b = reinterpret_cast<uint8_t*>(buf->base);
501 std::string data(reinterpret_cast<char*>(b), sz);
502 LOG_TRACE_FMT("UDP on_read [{}]", data);
503 behaviour->on_read(static_cast<size_t>(sz), b, *addr);
504
505 if (b != nullptr)
506 {
507 on_free(buf);
508 }
509 }
510
511 static void on_write(uv_udp_send_t* req, int /*status*/)
512 {
513 free_write(req);
514 }
515
516 static void free_write(uv_udp_send_t* req)
517 {
518 if (req == nullptr)
519 {
520 return;
521 }
522
523 auto* copy = static_cast<char*>(req->data);
524 delete[] copy; // NOLINT(cppcoreguidelines-owning-memory)
525 delete req; // NOLINT(cppcoreguidelines-owning-memory)
526 }
527 };
528
529 // NOLINTEND(cppcoreguidelines-virtual-class-destructor)
530
532 {
533 public:
535
537 {
539 }
540 };
541
543}
static bool resolve(const std::string &host_, const std::string &service, void *ud, uv_getaddrinfo_cb cb, bool async)
Definition dns.h:19
void before_io()
Definition udp.h:536
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
Definition udp.h:24
std::optional< std::string > get_listen_name() const
Definition udp.h:143
bool connect(const std::string &, const std::string &)
Definition udp.h:166
std::string get_port() const
Definition udp.h:138
void start(int64_t id)
Start the service via behaviour (register on ringbuffer, etc)
Definition udp.h:161
std::string get_host() const
Definition udp.h:133
bool listen(const std::string &host_, const std::string &port_, const std::optional< std::string > &name=std::nullopt)
Listen to packets on host:port.
Definition udp.h:149
void set_behaviour(std::unique_ptr< SocketBehaviour< UDP > > b)
Definition udp.h:128
bool write(size_t len, const uint8_t *data, sockaddr addr)
Definition udp.h:172
static void reset_read_quota()
Definition udp.h:123
Definition proxy.h:15
Definition proxy.h:51
Definition proxy.h:84
uv_udp_t uv_handle
Definition proxy.h:90
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition after_io.h:8
std::pair< std::string, std::string > addr_to_str(const sockaddr *addr, int address_family=AF_INET)
Definition socket.h:84
auto * handle
Definition kv_helpers.h:87
Definition configuration.h:14