27 static constexpr int backlog = 128;
28 static constexpr size_t max_read_size = 16384;
31 static constexpr auto max_read_quota = max_read_size * 4;
32 static size_t remaining_read_quota;
33 static bool alloc_quota_logged;
53 std::optional<std::chrono::milliseconds> connection_timeout = std::nullopt;
55 std::unique_ptr<SocketBehaviour<TCP>> behaviour;
56 using PendingWrites = std::vector<PendingIO<uv_write_t>>;
57 PendingWrites pending_writes;
61 std::optional<std::string> client_host = std::nullopt;
62 std::optional<std::string> listen_name = std::nullopt;
64 addrinfo* client_addr_base =
nullptr;
65 addrinfo* addr_base =
nullptr;
66 addrinfo* addr_current =
nullptr;
68 [[nodiscard]]
bool port_assigned()
const
73 [[nodiscard]] std::string get_address_name()
const
75 const std::string port_suffix =
76 port_assigned() ? fmt::format(
":{}", port) :
"";
78 if (addr_current !=
nullptr && addr_current->ai_family == AF_INET6)
80 return fmt::format(
"[{}]{}",
host, port_suffix);
83 return fmt::format(
"{}{}",
host, port_suffix);
87 bool is_client_ =
false,
88 std::optional<std::chrono::milliseconds> connection_timeout_ =
90 is_client(is_client_),
91 connection_timeout(connection_timeout_)
95 throw std::logic_error(
"uv tcp initialization failed");
104 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
105 for (
const auto& req : pending_resolve_requests)
109 if (req->data ==
this)
115 if (addr_base !=
nullptr)
117 uv_freeaddrinfo(addr_base);
119 if (client_addr_base !=
nullptr)
121 uv_freeaddrinfo(client_addr_base);
128 remaining_read_quota = max_read_quota;
129 alloc_quota_logged =
false;
134 behaviour = std::move(b);
149 sockaddr_storage sa = {};
150 int name_len =
sizeof(sa);
153 &
uv_handle,
reinterpret_cast<sockaddr*
>(&sa), &name_len) < 0)
158 switch (sa.ss_family)
162 char tmp[INET_ADDRSTRLEN];
163 auto* sa4 =
reinterpret_cast<sockaddr_in*
>(&sa);
164 uv_ip4_name(sa4, tmp,
sizeof(tmp));
169 char tmp[INET6_ADDRSTRLEN];
170 auto* sa6 =
reinterpret_cast<sockaddr_in6*
>(&sa);
171 uv_ip6_name(sa6, tmp,
sizeof(tmp));
175 return fmt::format(
"unknown family: {}", sa.ss_family);
187 if ((rc = uv_tcp_bind(&
uv_handle, client_addr_base->ai_addr, 0)) < 0)
189 assert_status(BINDING, BINDING_FAILED);
190 LOG_FAIL_FMT(
"uv_tcp_bind failed: {}", uv_strerror(rc));
191 behaviour->on_bind_failed();
195 assert_status(BINDING, CONNECTING_RESOLVING);
196 if (addr_current !=
nullptr)
202 resolve(this->host, this->port,
true);
210 uv_getaddrinfo_t* req,
int rc,
struct addrinfo* )
217 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) == 0)
221 assert_status(BINDING, BINDING_FAILED);
222 LOG_DEBUG_FMT(
"TCP client resolve failed: {}", uv_strerror(rc));
223 behaviour->on_bind_failed();
227 client_addr_base = req->addrinfo;
239 const std::string& host_,
240 const std::string& port_,
241 const std::optional<std::string>& client_host_ = std::nullopt)
245 if (client_host_.has_value())
247 client_host = client_host_;
251 if (client_addr_base !=
nullptr)
253 uv_freeaddrinfo(client_addr_base);
254 client_addr_base =
nullptr;
262 status = BINDING_FAILED;
268 assert_status(FRESH, CONNECTING_RESOLVING);
269 return resolve(host_, port_,
true);
283 assert_status(BINDING_FAILED, BINDING);
286 case RESOLVING_FAILED:
287 case CONNECTING_FAILED:
291 status = CONNECTING_RESOLVING;
292 return resolve(
host, port,
true);
299 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) == 0)
304 status = RECONNECTING;
305 uv_close(
reinterpret_cast<uv_handle_t*
>(&
uv_handle), on_reconnect);
313 "Unexpected status during reconnect, ignoring: {}", status);
321 const std::string& host_,
322 const std::string& port_,
323 const std::optional<std::string>& name = std::nullopt)
325 assert_status(FRESH, LISTENING_RESOLVING);
326 bool ret = resolve(host_, port_,
false);
331 bool write(
size_t len,
const uint8_t* data, sockaddr = {})
333 auto* req =
new uv_write_t;
334 auto* copy =
new char[len];
337 memcpy(copy, data, len);
345 case CONNECTING_RESOLVING:
347 case RESOLVING_FAILED:
348 case CONNECTING_FAILED:
351 pending_writes.emplace_back(req, len, sockaddr{}, free_write);
357 return send_write(req, len);
362 LOG_DEBUG_FMT(
"Disconnected: Ignoring write of size {}", len);
370 throw std::logic_error(
371 fmt::format(
"Unexpected status during write: {}", status));
381 assert_status(FRESH, FRESH);
384 if ((rc = uv_tcp_init(uv_default_loop(), &
uv_handle)) < 0)
386 LOG_FAIL_FMT(
"uv_tcp_init failed: {}", uv_strerror(rc));
390 if ((rc = uv_tcp_nodelay(&
uv_handle, 1)) < 0)
392 LOG_FAIL_FMT(
"uv_tcp_nodelay failed: {}", uv_strerror(rc));
398 uv_os_sock_t sock = 0;
399 if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
402 "socket creation failed: {}",
403 std::strerror(errno));
407 if (connection_timeout.has_value())
409 const unsigned int ms = connection_timeout->count();
411 setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms,
sizeof(ms));
415 "Failed to set socket option (TCP_USER_TIMEOUT): {}",
416 std::strerror(errno));
421 if ((rc = uv_tcp_open(&
uv_handle, sock)) < 0)
423 LOG_FAIL_FMT(
"uv_tcp_open failed: {}", uv_strerror(rc));
428 if ((rc = uv_tcp_keepalive(&
uv_handle, 1, 30)) < 0)
430 LOG_FAIL_FMT(
"uv_tcp_keepalive failed: {}", uv_strerror(rc));
438 bool send_write(uv_write_t* req,
size_t len)
440 auto* copy =
static_cast<char*
>(req->data);
451 reinterpret_cast<uv_stream_t*
>(&
uv_handle),
458 assert_status(CONNECTED, DISCONNECTED);
459 behaviour->on_disconnect();
466 void update_resolved_address(
int address_family, sockaddr* sa)
474 void listen_resolved()
478 while (addr_current !=
nullptr)
480 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
482 if ((rc = uv_tcp_bind(&
uv_handle, addr_current->ai_addr, 0)) < 0)
484 addr_current = addr_current->ai_next;
486 "uv_tcp_bind failed on {}: {}",
494 reinterpret_cast<uv_stream_t*
>(&
uv_handle), backlog, on_accept)) <
498 "uv_listen failed on {}: {}", get_address_name(), uv_strerror(rc));
499 addr_current = addr_current->ai_next;
506 if (!port_assigned())
508 sockaddr_storage sa_storage{};
509 auto*
const sa =
reinterpret_cast<sockaddr*
>(&sa_storage);
510 int sa_len =
sizeof(sa_storage);
511 if ((rc = uv_tcp_getsockname(&
uv_handle, sa, &sa_len)) != 0)
513 LOG_FAIL_FMT(
"uv_tcp_getsockname failed: {}", uv_strerror(rc));
515 update_resolved_address(addr_current->ai_family, sa);
518 assert_status(LISTENING_RESOLVING, LISTENING);
519 behaviour->on_listening(
host, port);
523 assert_status(LISTENING_RESOLVING, LISTENING_FAILED);
524 behaviour->on_listen_failed();
527 bool connect_resolved()
529 auto* req =
new uv_connect_t;
532 while (addr_current !=
nullptr)
535 (rc = uv_tcp_connect(
536 req, &
uv_handle, addr_current->ai_addr, on_connect)) < 0)
539 addr_current = addr_current->ai_next;
543 assert_status(CONNECTING_RESOLVING, CONNECTING);
547 assert_status(CONNECTING_RESOLVING, CONNECTING_FAILED);
552 "Unable to connect: all resolved addresses failed: {}:{}",
host, port);
554 behaviour->on_connect_failed();
558 void assert_status(Status from, Status to)
562 throw std::logic_error(fmt::format(
563 "Trying to transition from {} to {} but current status is {}",
573 const std::string& host_,
const std::string& port_,
bool async =
true)
578 if (addr_base !=
nullptr)
580 uv_freeaddrinfo(addr_base);
582 addr_current =
nullptr;
588 status = RESOLVING_FAILED;
595 static void on_resolved(uv_getaddrinfo_t* req,
int rc,
struct addrinfo* res)
597 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
598 pending_resolve_requests.erase(req);
600 if (req->data !=
nullptr)
602 static_cast<TCPImpl*
>(req->data)->on_resolved(req, rc);
608 uv_freeaddrinfo(res);
613 void on_resolved(uv_getaddrinfo_t* req,
int rc)
619 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) != 0)
622 uv_freeaddrinfo(req->addrinfo);
629 status = RESOLVING_FAILED;
631 behaviour->on_resolve_failed();
635 addr_base = req->addrinfo;
636 addr_current = addr_base;
640 case CONNECTING_RESOLVING:
646 case LISTENING_RESOLVING:
654 throw std::logic_error(
655 fmt::format(
"Unexpected status during on_resolved: {}", status));
663 static void on_accept(uv_stream_t* handle,
int rc)
665 static_cast<TCPImpl*
>(
handle->data)->on_accept(rc);
668 void on_accept(
int rc)
670 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) != 0)
686 reinterpret_cast<uv_stream_t*
>(&
uv_handle),
687 reinterpret_cast<uv_stream_t*
>(&peer->uv_handle))) < 0)
693 peer->assert_status(FRESH, CONNECTED);
695 if (!peer->read_start())
700 behaviour->on_accept(peer);
703 static void on_connect(uv_connect_t* req,
int rc)
705 auto* self =
static_cast<TCPImpl*
>(req->handle->data);
708 if (rc == UV_ECANCELED)
715 self->on_connect(rc);
718 void on_connect(
int rc)
720 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) != 0)
729 LOG_DEBUG_FMT(
"uv_tcp_connect async retry: {}", uv_strerror(rc));
730 addr_current = addr_current->ai_next;
731 assert_status(CONNECTING, CONNECTING_RESOLVING);
736 assert_status(CONNECTING, CONNECTED);
743 for (
auto& w : pending_writes)
745 send_write(w.req, w.len);
749 PendingWrites().swap(pending_writes);
750 behaviour->on_connect();
760 reinterpret_cast<uv_stream_t*
>(&
uv_handle), on_alloc, on_read)) < 0)
762 assert_status(CONNECTED, DISCONNECTED);
763 LOG_FAIL_FMT(
"uv_read_start failed: {}", uv_strerror(rc));
767 behaviour->on_disconnect();
776 static void on_alloc(
777 uv_handle_t* handle,
size_t suggested_size, uv_buf_t* buf)
779 static_cast<TCPImpl*
>(
handle->data)->on_alloc(suggested_size, buf);
782 void on_alloc(
size_t suggested_size, uv_buf_t* buf)
784 auto alloc_size = std::min(suggested_size, max_read_size);
786 alloc_size = std::min(alloc_size, remaining_read_quota);
787 remaining_read_quota -= alloc_size;
791 "Allocating {} bytes for TCP read ({} of quota remaining)",
793 remaining_read_quota);
797 buf->base =
new char[alloc_size];
798 buf->len = alloc_size;
801 void on_free(
const uv_buf_t* buf)
806 static void on_read(uv_stream_t* handle, ssize_t sz,
const uv_buf_t* buf)
808 static_cast<TCPImpl*
>(
handle->data)->on_read(sz, buf);
811 void on_read(ssize_t sz,
const uv_buf_t* buf)
819 if (sz == UV_ENOBUFS)
821 if (!alloc_quota_logged)
824 alloc_quota_logged =
true;
832 assert_status(CONNECTED, DISCONNECTED);
834 uv_read_stop(
reinterpret_cast<uv_stream_t*
>(&
uv_handle));
836 LOG_DEBUG_FMT(
"TCP on_read: {}", uv_strerror(
static_cast<int>(sz)));
837 behaviour->on_disconnect();
841 auto* p =
reinterpret_cast<uint8_t*
>(buf->base);
842 const bool read_good = behaviour->on_read(
static_cast<size_t>(sz), p, {});
851 behaviour->on_disconnect();
856 static void on_write(uv_write_t* req,
int )
861 static void free_write(uv_write_t* req)
868 auto* copy =
static_cast<char*
>(req->data);
873 static void on_reconnect(uv_handle_t* handle)
875 static_cast<TCPImpl*
>(
handle->data)->on_reconnect();
880 assert_status(RECONNECTING, FRESH);
884 assert_status(FRESH, CONNECTING_FAILED);
885 behaviour->on_connect_failed();
889 if (client_addr_base !=
nullptr)
891 assert_status(FRESH, BINDING);
896 assert_status(FRESH, CONNECTING_RESOLVING);