26 static constexpr int backlog = 128;
27 static constexpr size_t max_read_size = 16384;
30 static constexpr auto max_read_quota = max_read_size * 4;
31 static size_t remaining_read_quota;
32 static bool alloc_quota_logged;
52 std::optional<std::chrono::milliseconds> connection_timeout = std::nullopt;
54 std::unique_ptr<SocketBehaviour<TCP>> behaviour;
55 using PendingWrites = std::vector<PendingIO<uv_write_t>>;
56 PendingWrites pending_writes;
60 std::optional<std::string> client_host = std::nullopt;
61 std::optional<std::string> listen_name = std::nullopt;
63 addrinfo* client_addr_base =
nullptr;
64 addrinfo* addr_base =
nullptr;
65 addrinfo* addr_current =
nullptr;
67 bool port_assigned()
const
72 std::string get_address_name()
const
74 const std::string port_suffix =
75 port_assigned() ? fmt::format(
":{}", port) :
"";
77 if (addr_current !=
nullptr && addr_current->ai_family == AF_INET6)
79 return fmt::format(
"[{}]{}",
host, port_suffix);
83 return fmt::format(
"{}{}",
host, port_suffix);
88 bool is_client_ =
false,
89 std::optional<std::chrono::milliseconds> connection_timeout_ =
91 is_client(is_client_),
92 connection_timeout(connection_timeout_),
97 throw std::logic_error(
"uv tcp initialization failed");
106 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
107 for (
auto& req : pending_resolve_requests)
111 if (req->data ==
this)
117 if (addr_base !=
nullptr)
119 uv_freeaddrinfo(addr_base);
121 if (client_addr_base !=
nullptr)
123 uv_freeaddrinfo(client_addr_base);
130 remaining_read_quota = max_read_quota;
131 alloc_quota_logged =
false;
136 behaviour = std::move(b);
151 sockaddr_storage sa = {};
152 int name_len =
sizeof(sa);
153 if (uv_tcp_getpeername(&
uv_handle, (sockaddr*)&sa, &name_len) < 0)
158 switch (sa.ss_family)
162 char tmp[INET_ADDRSTRLEN];
163 sockaddr_in* sa4 = (sockaddr_in*)&sa;
164 uv_ip4_name(sa4, tmp,
sizeof(tmp));
169 char tmp[INET6_ADDRSTRLEN];
170 sockaddr_in6* sa6 = (sockaddr_in6*)&sa;
171 uv_ip6_name(sa6, tmp,
sizeof(tmp));
175 return fmt::format(
"unknown family: {}", sa.ss_family);
187 if ((rc = uv_tcp_bind(&
uv_handle, client_addr_base->ai_addr, 0)) < 0)
189 assert_status(BINDING, BINDING_FAILED);
190 LOG_FAIL_FMT(
"uv_tcp_bind failed: {}", uv_strerror(rc));
191 behaviour->on_bind_failed();
195 assert_status(BINDING, CONNECTING_RESOLVING);
196 if (addr_current !=
nullptr)
202 resolve(this->host, this->port,
true);
208 uv_getaddrinfo_t* req,
int rc,
struct addrinfo*)
215 if (!uv_is_closing((uv_handle_t*)&
uv_handle))
219 assert_status(BINDING, BINDING_FAILED);
220 LOG_DEBUG_FMT(
"TCP client resolve failed: {}", uv_strerror(rc));
221 behaviour->on_bind_failed();
225 client_addr_base = req->addrinfo;
237 const std::string& host_,
238 const std::string& port_,
239 const std::optional<std::string>& client_host_ = std::nullopt)
243 if (client_host_.has_value())
245 client_host = client_host_;
249 if (client_addr_base !=
nullptr)
251 uv_freeaddrinfo(client_addr_base);
252 client_addr_base =
nullptr;
260 status = BINDING_FAILED;
266 assert_status(FRESH, CONNECTING_RESOLVING);
267 return resolve(host_, port_,
true);
281 assert_status(BINDING_FAILED, BINDING);
284 case RESOLVING_FAILED:
285 case CONNECTING_FAILED:
289 status = CONNECTING_RESOLVING;
290 return resolve(
host, port,
true);
297 if (!uv_is_closing((uv_handle_t*)&
uv_handle))
302 status = RECONNECTING;
303 uv_close((uv_handle_t*)&
uv_handle, on_reconnect);
311 "Unexpected status during reconnect, ignoring: {}", status);
319 const std::string& host_,
320 const std::string& port_,
321 const std::optional<std::string>& name = std::nullopt)
323 assert_status(FRESH, LISTENING_RESOLVING);
324 bool ret = resolve(host_, port_,
false);
329 bool write(
size_t len,
const uint8_t* data, sockaddr addr = {})
331 auto req =
new uv_write_t;
332 char* copy =
new char[len];
334 memcpy(copy, data, len);
341 case CONNECTING_RESOLVING:
343 case RESOLVING_FAILED:
344 case CONNECTING_FAILED:
347 pending_writes.emplace_back(req, len, sockaddr{}, free_write);
353 return send_write(req, len);
358 LOG_DEBUG_FMT(
"Disconnected: Ignoring write of size {}", len);
366 throw std::logic_error(
367 fmt::format(
"Unexpected status during write: {}", status));
377 assert_status(FRESH, FRESH);
380 if ((rc = uv_tcp_init(uv_default_loop(), &
uv_handle)) < 0)
382 LOG_FAIL_FMT(
"uv_tcp_init failed: {}", uv_strerror(rc));
386 if ((rc = uv_tcp_nodelay(&
uv_handle,
true)) < 0)
388 LOG_FAIL_FMT(
"uv_tcp_nodelay failed: {}", uv_strerror(rc));
395 if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
397 LOG_FAIL_FMT(
"socket creation failed: {}", strerror(errno));
401 if (connection_timeout.has_value())
403 const unsigned int ms = connection_timeout->count();
405 setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms,
sizeof(ms));
409 "Failed to set socket option (TCP_USER_TIMEOUT): {}",
415 if ((rc = uv_tcp_open(&
uv_handle, sock)) < 0)
417 LOG_FAIL_FMT(
"uv_tcp_open failed: {}", uv_strerror(rc));
422 if ((rc = uv_tcp_keepalive(&
uv_handle, 1, 30)) < 0)
424 LOG_FAIL_FMT(
"uv_tcp_keepalive failed: {}", uv_strerror(rc));
432 bool send_write(uv_write_t* req,
size_t len)
434 char* copy = (
char*)req->data;
442 if ((rc = uv_write(req, (uv_stream_t*)&
uv_handle, &buf, 1, on_write)) < 0)
446 assert_status(CONNECTED, DISCONNECTED);
447 behaviour->on_disconnect();
454 void update_resolved_address(
int address_family, sockaddr* sa)
462 void listen_resolved()
466 while (addr_current !=
nullptr)
468 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
470 if ((rc = uv_tcp_bind(&
uv_handle, addr_current->ai_addr, 0)) < 0)
472 addr_current = addr_current->ai_next;
474 "uv_tcp_bind failed on {}: {}",
480 if ((rc = uv_listen((uv_stream_t*)&
uv_handle, backlog, on_accept)) < 0)
483 "uv_listen failed on {}: {}", get_address_name(), uv_strerror(rc));
484 addr_current = addr_current->ai_next;
491 if (!port_assigned())
493 sockaddr_storage sa_storage;
494 const auto sa = (sockaddr*)&sa_storage;
495 int sa_len =
sizeof(sa_storage);
496 if ((rc = uv_tcp_getsockname(&
uv_handle, sa, &sa_len)) != 0)
498 LOG_FAIL_FMT(
"uv_tcp_getsockname failed: {}", uv_strerror(rc));
500 update_resolved_address(addr_current->ai_family, sa);
503 assert_status(LISTENING_RESOLVING, LISTENING);
504 behaviour->on_listening(
host, port);
508 assert_status(LISTENING_RESOLVING, LISTENING_FAILED);
509 behaviour->on_listen_failed();
512 bool connect_resolved()
514 auto req =
new uv_connect_t;
517 while (addr_current !=
nullptr)
520 (rc = uv_tcp_connect(
521 req, &
uv_handle, addr_current->ai_addr, on_connect)) < 0)
524 addr_current = addr_current->ai_next;
528 assert_status(CONNECTING_RESOLVING, CONNECTING);
532 assert_status(CONNECTING_RESOLVING, CONNECTING_FAILED);
537 "Unable to connect: all resolved addresses failed: {}:{}",
host, port);
539 behaviour->on_connect_failed();
543 void assert_status(Status from, Status to)
547 throw std::logic_error(fmt::format(
548 "Trying to transition from {} to {} but current status is {}",
558 const std::string& host_,
const std::string& port_,
bool async =
true)
563 if (addr_base !=
nullptr)
565 uv_freeaddrinfo(addr_base);
567 addr_current =
nullptr;
573 status = RESOLVING_FAILED;
580 static void on_resolved(uv_getaddrinfo_t* req,
int rc,
struct addrinfo* res)
582 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
583 pending_resolve_requests.erase(req);
587 static_cast<TCPImpl*
>(req->data)->on_resolved(req, rc);
593 uv_freeaddrinfo(res);
598 void on_resolved(uv_getaddrinfo_t* req,
int rc)
604 if (uv_is_closing((uv_handle_t*)&
uv_handle))
607 uv_freeaddrinfo(req->addrinfo);
614 status = RESOLVING_FAILED;
616 behaviour->on_resolve_failed();
620 addr_base = req->addrinfo;
621 addr_current = addr_base;
625 case CONNECTING_RESOLVING:
631 case LISTENING_RESOLVING:
639 throw std::logic_error(
640 fmt::format(
"Unexpected status during on_resolved: {}", status));
648 static void on_accept(uv_stream_t* handle,
int rc)
650 static_cast<TCPImpl*
>(
handle->data)->on_accept(rc);
653 void on_accept(
int rc)
665 (uv_stream_t*)&
uv_handle, (uv_stream_t*)&peer->uv_handle)) < 0)
671 peer->assert_status(FRESH, CONNECTED);
673 if (!peer->read_start())
676 behaviour->on_accept(peer);
679 static void on_connect(uv_connect_t* req,
int rc)
681 auto self =
static_cast<TCPImpl*
>(req->handle->data);
684 if (rc == UV_ECANCELED)
691 self->on_connect(rc);
694 void on_connect(
int rc)
699 LOG_DEBUG_FMT(
"uv_tcp_connect async retry: {}", uv_strerror(rc));
700 addr_current = addr_current->ai_next;
701 assert_status(CONNECTING, CONNECTING_RESOLVING);
706 assert_status(CONNECTING, CONNECTED);
713 for (
auto& w : pending_writes)
715 send_write(w.req, w.len);
719 PendingWrites().swap(pending_writes);
720 behaviour->on_connect();
728 if ((rc = uv_read_start((uv_stream_t*)&
uv_handle, on_alloc, on_read)) < 0)
730 assert_status(CONNECTED, DISCONNECTED);
731 LOG_FAIL_FMT(
"uv_read_start failed: {}", uv_strerror(rc));
735 behaviour->on_disconnect();
744 static void on_alloc(
745 uv_handle_t* handle,
size_t suggested_size, uv_buf_t* buf)
747 static_cast<TCPImpl*
>(
handle->data)->on_alloc(suggested_size, buf);
750 void on_alloc(
size_t suggested_size, uv_buf_t* buf)
752 auto alloc_size = std::min(suggested_size, max_read_size);
754 alloc_size = std::min(alloc_size, remaining_read_quota);
755 remaining_read_quota -= alloc_size;
759 "Allocating {} bytes for TCP read ({} of quota remaining)",
761 remaining_read_quota);
764 buf->base =
new char[alloc_size];
765 buf->len = alloc_size;
768 void on_free(
const uv_buf_t* buf)
773 static void on_read(uv_stream_t* handle, ssize_t sz,
const uv_buf_t* buf)
775 static_cast<TCPImpl*
>(
handle->data)->on_read(sz, buf);
778 void on_read(ssize_t sz,
const uv_buf_t* buf)
786 if (sz == UV_ENOBUFS)
788 if (!alloc_quota_logged)
791 alloc_quota_logged =
true;
799 assert_status(CONNECTED, DISCONNECTED);
804 behaviour->on_disconnect();
808 uint8_t* p = (uint8_t*)buf->base;
809 const bool read_good = behaviour->on_read((
size_t)sz, p, {});
818 behaviour->on_disconnect();
823 static void on_write(uv_write_t* req,
int)
828 static void free_write(uv_write_t* req)
835 char* copy = (
char*)req->data;
840 static void on_reconnect(uv_handle_t* handle)
842 static_cast<TCPImpl*
>(
handle->data)->on_reconnect();
847 assert_status(RECONNECTING, FRESH);
851 assert_status(FRESH, CONNECTING_FAILED);
852 behaviour->on_connect_failed();
856 if (client_addr_base !=
nullptr)
858 assert_status(FRESH, BINDING);
863 assert_status(FRESH, CONNECTING_RESOLVING);