28 static constexpr int backlog = 128;
29 static constexpr size_t max_read_size = 16384;
32 static constexpr auto max_read_quota = max_read_size * 4;
33 static size_t remaining_read_quota;
56 std::unique_ptr<SocketBehaviour<UDP>> behaviour;
58 using PendingWrites = std::vector<PendingIO<uv_udp_send_t>>;
60 PendingWrites pending_writes;
67 std::optional<std::string> listen_name = std::nullopt;
70 addrinfo* addr_base =
nullptr;
72 addrinfo* addr_current =
nullptr;
74 [[nodiscard]]
bool port_assigned()
const
79 [[nodiscard]] std::string get_address_name()
const
81 const std::string port_suffix =
82 port_assigned() ? fmt::format(
":{}", port) :
"";
84 if (addr_current !=
nullptr && addr_current->ai_family == AF_INET6)
86 return fmt::format(
"[{}]{}",
host, port_suffix);
89 return fmt::format(
"{}{}",
host, port_suffix);
96 throw std::logic_error(
"uv UDP initialization failed");
105 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
106 for (
const auto& req : pending_resolve_requests)
110 if (req->data ==
this)
116 if (addr_base !=
nullptr)
118 uv_freeaddrinfo(addr_base);
125 remaining_read_quota = max_read_quota;
130 behaviour = std::move(b);
150 const std::string& host_,
151 const std::string& port_,
152 const std::optional<std::string>& name = std::nullopt)
155 auto name_str = name.has_value() ? name.value() :
"";
156 LOG_TRACE_FMT(
"UDP listen on {}:{} [{}]", host_, port_, name_str);
157 return resolve(host_, port_,
false);
163 behaviour->on_start(
id);
166 bool connect(
const std::string& ,
const std::string& )
172 bool write(
size_t len,
const uint8_t* data, sockaddr addr)
174 auto* req =
new uv_udp_send_t;
175 auto* copy =
new char[len];
178 memcpy(copy, data, len);
186 case RESOLVING_FAILED:
190 pending_writes.emplace_back(req, len, addr, free_write);
199 return send_write(req, len, &addr);
206 throw std::logic_error(
207 fmt::format(
"Unexpected status during write: {}", status));
218 assert_status(FRESH, FRESH);
222 if ((rc = uv_udp_init(uv_default_loop(), &
uv_handle)) < 0)
224 LOG_FAIL_FMT(
"uv_udp_init failed on recv handle: {}", uv_strerror(rc));
231 bool send_write(uv_udp_send_t* req,
size_t len,
const struct sockaddr* addr)
233 auto* copy =
static_cast<char*
>(req->data);
243 std::string data(copy, len);
245 if ((rc = uv_udp_send(req, &
uv_handle, &buf, 1, addr, on_write)) < 0)
249 status = WRITING_FAILED;
250 behaviour->on_disconnect();
257 void update_resolved_address(
int address_family, sockaddr* sa)
270 while (addr_current !=
nullptr)
272 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
274 if ((rc = uv_udp_bind(&
uv_handle, addr_current->ai_addr, 0)) < 0)
276 addr_current = addr_current->ai_next;
278 "uv_udp_bind failed on {}: {}",
287 if (!port_assigned())
289 sockaddr_storage sa_storage{};
290 auto*
const sa =
reinterpret_cast<sockaddr*
>(&sa_storage);
291 int sa_len =
sizeof(sa_storage);
292 if ((rc = uv_udp_getsockname(&
uv_handle, sa, &sa_len)) != 0)
294 LOG_FAIL_FMT(
"uv_udp_getsockname failed: {}", uv_strerror(rc));
296 update_resolved_address(addr_current->ai_family, sa);
301 behaviour->on_listening(
host, port);
303 assert_status(RESOLVING, READING);
308 status = RESOLVING_FAILED;
312 "Unable to connect: all resolved addresses failed: {}:{}",
host, port);
315 void assert_status(Status from, Status to)
319 throw std::logic_error(fmt::format(
320 "Trying to transition from {} to {} but current status is {}",
330 const std::string& host_,
const std::string& port_,
bool async =
true)
336 if (addr_base !=
nullptr)
338 uv_freeaddrinfo(addr_base);
340 addr_current =
nullptr;
343 assert_status(FRESH, RESOLVING);
348 status = RESOLVING_FAILED;
355 static void on_resolved(uv_getaddrinfo_t* req,
int rc,
struct addrinfo* res)
357 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
358 pending_resolve_requests.erase(req);
361 if (req->data !=
nullptr)
363 static_cast<UDPImpl*
>(req->data)->on_resolved(req, rc);
369 uv_freeaddrinfo(res);
374 void on_resolved(uv_getaddrinfo_t* req,
int rc)
381 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) != 0)
384 uv_freeaddrinfo(req->addrinfo);
391 status = RESOLVING_FAILED;
393 behaviour->on_resolve_failed();
397 addr_base = req->addrinfo;
398 addr_current = addr_base;
407 void push_pending_writes()
409 for (
auto& w : pending_writes)
413 send_write(w.req, w.len, &w.addr);
417 PendingWrites().swap(pending_writes);
425 if ((rc = uv_udp_recv_start(&
uv_handle, on_alloc, on_read)) < 0)
427 status = READING_FAILED;
428 LOG_FAIL_FMT(
"uv_udp_read_start failed: {}", uv_strerror(rc));
429 behaviour->on_disconnect();
433 static void on_alloc(
434 uv_handle_t* handle,
size_t suggested_size, uv_buf_t* buf)
436 static_cast<UDPImpl*
>(
handle->data)->on_alloc(suggested_size, buf);
439 void on_alloc(
size_t suggested_size, uv_buf_t* buf)
441 auto alloc_size = std::min(suggested_size, max_read_size);
443 alloc_size = std::min(alloc_size, remaining_read_quota);
444 remaining_read_quota -= alloc_size;
446 "Allocating {} bytes for UDP read ({} of quota remaining)",
448 remaining_read_quota);
451 buf->base =
new char[alloc_size];
452 buf->len = alloc_size;
455 void on_free(
const uv_buf_t* buf)
464 const struct sockaddr* addr,
467 static_cast<UDPImpl*
>(
handle->data)->on_read(sz, buf, addr, flags);
473 const struct sockaddr* addr,
482 if (sz == UV_ENOBUFS)
492 LOG_DEBUG_FMT(
"UDP on_read: {}", uv_strerror(
static_cast<int>(sz)));
493 behaviour->on_disconnect();
500 auto* b =
reinterpret_cast<uint8_t*
>(buf->base);
501 std::string data(
reinterpret_cast<char*
>(b), sz);
503 behaviour->on_read(
static_cast<size_t>(sz), b, *addr);
511 static void on_write(uv_udp_send_t* req,
int )
516 static void free_write(uv_udp_send_t* req)
523 auto* copy =
static_cast<char*
>(req->data);