27 static constexpr int backlog = 128;
28 static constexpr size_t max_read_size = 16384;
31 static constexpr auto max_read_quota = max_read_size * 4;
32 static size_t remaining_read_quota;
55 std::unique_ptr<SocketBehaviour<UDP>> behaviour;
57 using PendingWrites = std::vector<PendingIO<uv_udp_send_t>>;
59 PendingWrites pending_writes;
66 std::optional<std::string> listen_name = std::nullopt;
69 addrinfo* addr_base =
nullptr;
71 addrinfo* addr_current =
nullptr;
73 bool port_assigned()
const
78 std::string get_address_name()
const
80 const std::string port_suffix =
81 port_assigned() ? fmt::format(
":{}", port) :
"";
83 if (addr_current !=
nullptr && addr_current->ai_family == AF_INET6)
85 return fmt::format(
"[{}]{}",
host, port_suffix);
89 return fmt::format(
"{}{}",
host, port_suffix);
97 throw std::logic_error(
"uv UDP initialization failed");
106 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
107 for (
auto& req : pending_resolve_requests)
111 if (req->data ==
this)
117 if (addr_base !=
nullptr)
119 uv_freeaddrinfo(addr_base);
126 remaining_read_quota = max_read_quota;
131 behaviour = std::move(b);
151 const std::string& host_,
152 const std::string& port_,
153 const std::optional<std::string>& name = std::nullopt)
156 auto name_str = name.has_value() ? name.value() :
"";
157 LOG_TRACE_FMT(
"UDP listen on {}:{} [{}]", host_, port_, name_str);
158 return resolve(host_, port_,
false);
164 behaviour->on_start(
id);
167 bool connect(
const std::string& host_,
const std::string& port_)
173 bool write(
size_t len,
const uint8_t* data, sockaddr addr)
175 auto req =
new uv_udp_send_t;
176 char* copy =
new char[len];
178 memcpy(copy, data, len);
185 case RESOLVING_FAILED:
189 pending_writes.emplace_back(req, len, addr, free_write);
198 return send_write(req, len, &addr);
205 throw std::logic_error(
206 fmt::format(
"Unexpected status during write: {}", status));
217 assert_status(FRESH, FRESH);
221 if ((rc = uv_udp_init(uv_default_loop(), &
uv_handle)) < 0)
223 LOG_FAIL_FMT(
"uv_udp_init failed on recv handle: {}", uv_strerror(rc));
230 bool send_write(uv_udp_send_t* req,
size_t len,
const struct sockaddr* addr)
232 char* copy = (
char*)req->data;
242 std::string data(copy, len);
244 if ((rc = uv_udp_send(req, &
uv_handle, &buf, 1, addr, on_write)) < 0)
248 status = WRITING_FAILED;
249 behaviour->on_disconnect();
256 void update_resolved_address(
int address_family, sockaddr* sa)
269 while (addr_current !=
nullptr)
271 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
273 if ((rc = uv_udp_bind(&
uv_handle, addr_current->ai_addr, 0)) < 0)
275 addr_current = addr_current->ai_next;
277 "uv_udp_bind failed on {}: {}",
286 if (!port_assigned())
288 sockaddr_storage sa_storage;
289 const auto sa = (sockaddr*)&sa_storage;
290 int sa_len =
sizeof(sa_storage);
291 if ((rc = uv_udp_getsockname(&
uv_handle, sa, &sa_len)) != 0)
293 LOG_FAIL_FMT(
"uv_udp_getsockname failed: {}", uv_strerror(rc));
295 update_resolved_address(addr_current->ai_family, sa);
300 behaviour->on_listening(
host, port);
302 assert_status(RESOLVING, READING);
307 status = RESOLVING_FAILED;
311 "Unable to connect: all resolved addresses failed: {}:{}",
host, port);
314 void assert_status(Status from, Status to)
318 throw std::logic_error(fmt::format(
319 "Trying to transition from {} to {} but current status is {}",
329 const std::string& host_,
const std::string& port_,
bool async =
true)
335 if (addr_base !=
nullptr)
337 uv_freeaddrinfo(addr_base);
339 addr_current =
nullptr;
342 assert_status(FRESH, RESOLVING);
347 status = RESOLVING_FAILED;
354 static void on_resolved(uv_getaddrinfo_t* req,
int rc,
struct addrinfo* res)
356 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
357 pending_resolve_requests.erase(req);
362 static_cast<UDPImpl*
>(req->data)->on_resolved(req, rc);
368 uv_freeaddrinfo(res);
373 void on_resolved(uv_getaddrinfo_t* req,
int rc)
380 if (uv_is_closing((uv_handle_t*)&
uv_handle))
383 uv_freeaddrinfo(req->addrinfo);
390 status = RESOLVING_FAILED;
392 behaviour->on_resolve_failed();
396 addr_base = req->addrinfo;
397 addr_current = addr_base;
406 void push_pending_writes()
408 for (
auto& w : pending_writes)
412 send_write(w.req, w.len, &w.addr);
416 PendingWrites().swap(pending_writes);
424 if ((rc = uv_udp_recv_start(&
uv_handle, on_alloc, on_read)) < 0)
426 status = READING_FAILED;
427 LOG_FAIL_FMT(
"uv_udp_read_start failed: {}", uv_strerror(rc));
428 behaviour->on_disconnect();
432 static void on_alloc(
433 uv_handle_t* handle,
size_t suggested_size, uv_buf_t* buf)
435 static_cast<UDPImpl*
>(
handle->data)->on_alloc(suggested_size, buf);
438 void on_alloc(
size_t suggested_size, uv_buf_t* buf)
440 auto alloc_size = std::min(suggested_size, max_read_size);
442 alloc_size = std::min(alloc_size, remaining_read_quota);
443 remaining_read_quota -= alloc_size;
445 "Allocating {} bytes for UDP read ({} of quota remaining)",
447 remaining_read_quota);
449 buf->base =
new char[alloc_size];
450 buf->len = alloc_size;
453 void on_free(
const uv_buf_t* buf)
462 const struct sockaddr* addr,
465 static_cast<UDPImpl*
>(
handle->data)->on_read(sz, buf, addr, flags);
471 const struct sockaddr* addr,
480 if (sz == UV_ENOBUFS)
491 behaviour->on_disconnect();
498 uint8_t* b = (uint8_t*)buf->base;
499 std::string data((
char*)b, sz);
501 behaviour->on_read((
size_t)sz, b, *addr);
507 static void on_write(uv_udp_send_t* req,
int)
512 static void free_write(uv_udp_send_t* req)
517 char* copy = (
char*)req->data;