CCF
Loading...
Searching...
No Matches
tcp.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "before_io.h"
6#include "ccf/pal/locking.h"
7#include "dns.h"
9#include "ds/pending_io.h"
10#include "proxy.h"
11#include "socket.h"
12
13#include <netinet/in.h>
14#include <optional>
15
16namespace asynchost
17{
18 // NOLINTBEGIN(cppcoreguidelines-virtual-class-destructor)
19 class TCPImpl;
21
22 class TCPImpl : public with_uv_handle<uv_tcp_t>
23 {
24 private:
25 friend class close_ptr<TCPImpl>;
26
27 static constexpr int backlog = 128;
28 static constexpr size_t max_read_size = 16384;
29
30 // Each uv iteration, read only a capped amount from all sockets.
31 static constexpr auto max_read_quota = max_read_size * 4;
32 static size_t remaining_read_quota;
33 static bool alloc_quota_logged;
34
35 enum Status : uint8_t
36 {
37 FRESH,
38 LISTENING_RESOLVING,
39 LISTENING,
40 BINDING,
41 BINDING_FAILED,
42 CONNECTING_RESOLVING,
43 CONNECTING,
44 CONNECTED,
45 DISCONNECTED,
46 RESOLVING_FAILED,
47 LISTENING_FAILED,
48 CONNECTING_FAILED,
49 RECONNECTING
50 };
51
52 bool is_client;
53 std::optional<std::chrono::milliseconds> connection_timeout = std::nullopt;
54 Status status{FRESH};
55 std::unique_ptr<SocketBehaviour<TCP>> behaviour;
56 using PendingWrites = std::vector<PendingIO<uv_write_t>>;
57 PendingWrites pending_writes;
58
59 std::string host;
60 std::string port;
61 std::optional<std::string> client_host = std::nullopt;
62 std::optional<std::string> listen_name = std::nullopt;
63
64 addrinfo* client_addr_base = nullptr;
65 addrinfo* addr_base = nullptr;
66 addrinfo* addr_current = nullptr;
67
68 [[nodiscard]] bool port_assigned() const
69 {
70 return port != "0";
71 }
72
73 [[nodiscard]] std::string get_address_name() const
74 {
75 const std::string port_suffix =
76 port_assigned() ? fmt::format(":{}", port) : "";
77
78 if (addr_current != nullptr && addr_current->ai_family == AF_INET6)
79 {
80 return fmt::format("[{}]{}", host, port_suffix);
81 }
82
83 return fmt::format("{}{}", host, port_suffix);
84 }
85
86 TCPImpl(
87 bool is_client_ = false,
88 std::optional<std::chrono::milliseconds> connection_timeout_ =
89 std::nullopt) :
90 is_client(is_client_),
91 connection_timeout(connection_timeout_)
92 {
93 if (!init())
94 {
95 throw std::logic_error("uv tcp initialization failed");
96 }
97
98 uv_handle.data = this;
99 }
100
101 ~TCPImpl() override
102 {
103 {
104 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
105 for (const auto& req : pending_resolve_requests)
106 {
107 // The UV request objects can stay, but if there are any references
108 // to `this` left, we need to remove them.
109 if (req->data == this)
110 {
111 req->data = nullptr;
112 }
113 }
114 }
115 if (addr_base != nullptr)
116 {
117 uv_freeaddrinfo(addr_base);
118 }
119 if (client_addr_base != nullptr)
120 {
121 uv_freeaddrinfo(client_addr_base);
122 }
123 }
124
125 public:
126 static void reset_read_quota()
127 {
128 remaining_read_quota = max_read_quota;
129 alloc_quota_logged = false;
130 }
131
132 void set_behaviour(std::unique_ptr<SocketBehaviour<TCP>> b)
133 {
134 behaviour = std::move(b);
135 }
136
137 [[nodiscard]] std::string get_host() const
138 {
139 return host;
140 }
141
142 [[nodiscard]] std::string get_port() const
143 {
144 return port;
145 }
146
147 [[nodiscard]] std::string get_peer_name() const
148 {
149 sockaddr_storage sa = {};
150 int name_len = sizeof(sa);
151 if (
152 uv_tcp_getpeername(
153 &uv_handle, reinterpret_cast<sockaddr*>(&sa), &name_len) < 0)
154 {
155 LOG_FAIL_FMT("uv_tcp_getpeername failed");
156 return "";
157 }
158 switch (sa.ss_family)
159 {
160 case AF_INET:
161 {
162 char tmp[INET_ADDRSTRLEN];
163 auto* sa4 = reinterpret_cast<sockaddr_in*>(&sa);
164 uv_ip4_name(sa4, tmp, sizeof(tmp));
165 return tmp;
166 }
167 case AF_INET6:
168 {
169 char tmp[INET6_ADDRSTRLEN];
170 auto* sa6 = reinterpret_cast<sockaddr_in6*>(&sa);
171 uv_ip6_name(sa6, tmp, sizeof(tmp));
172 return tmp;
173 }
174 default:
175 return fmt::format("unknown family: {}", sa.ss_family);
176 }
177 }
178
179 [[nodiscard]] std::optional<std::string> get_listen_name() const
180 {
181 return listen_name;
182 }
183
185 {
186 int rc = 0;
187 if ((rc = uv_tcp_bind(&uv_handle, client_addr_base->ai_addr, 0)) < 0)
188 {
189 assert_status(BINDING, BINDING_FAILED);
190 LOG_FAIL_FMT("uv_tcp_bind failed: {}", uv_strerror(rc));
191 behaviour->on_bind_failed();
192 }
193 else
194 {
195 assert_status(BINDING, CONNECTING_RESOLVING);
196 if (addr_current != nullptr)
197 {
198 connect_resolved();
199 }
200 else
201 {
202 resolve(this->host, this->port, true);
203 }
204 }
205 }
206
207 // NOLINTEND(cppcoreguidelines-virtual-class-destructor)
208
210 uv_getaddrinfo_t* req, int rc, struct addrinfo* /*res*/)
211 {
212 static_cast<TCPImpl*>(req->data)->on_client_resolved(req, rc);
213 }
214
215 void on_client_resolved(uv_getaddrinfo_t* req, int rc)
216 {
217 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) == 0)
218 {
219 if (rc < 0)
220 {
221 assert_status(BINDING, BINDING_FAILED);
222 LOG_DEBUG_FMT("TCP client resolve failed: {}", uv_strerror(rc));
223 behaviour->on_bind_failed();
224 }
225 else
226 {
227 client_addr_base = req->addrinfo;
228 client_bind();
229 }
230 }
231
232 delete req; // NOLINT(cppcoreguidelines-owning-memory)
233 }
234
236 void start(int64_t /*id*/) {}
237
239 const std::string& host_,
240 const std::string& port_,
241 const std::optional<std::string>& client_host_ = std::nullopt)
242 {
243 // If a client host is set, bind to this first. Otherwise, connect
244 // straight away.
245 if (client_host_.has_value())
246 {
247 client_host = client_host_;
248 host = host_;
249 port = port_;
250
251 if (client_addr_base != nullptr)
252 {
253 uv_freeaddrinfo(client_addr_base);
254 client_addr_base = nullptr;
255 }
256
257 status = BINDING;
258 if (!DNS::resolve(
259 client_host.value(), "0", this, on_client_resolved, false))
260 {
261 LOG_DEBUG_FMT("Bind to '{}' failed", client_host.value());
262 status = BINDING_FAILED;
263 return false;
264 }
265 }
266 else
267 {
268 assert_status(FRESH, CONNECTING_RESOLVING);
269 return resolve(host_, port_, true);
270 }
271
272 return true;
273 }
274
276 {
277 switch (status)
278 {
279 case BINDING_FAILED:
280 {
281 // Try again, from the start.
282 LOG_DEBUG_FMT("Reconnect from initial state");
283 assert_status(BINDING_FAILED, BINDING);
284 return connect(host, port, client_host);
285 }
286 case RESOLVING_FAILED:
287 case CONNECTING_FAILED:
288 {
289 // Try again, starting with DNS.
290 LOG_DEBUG_FMT("Reconnect from DNS");
291 status = CONNECTING_RESOLVING;
292 return resolve(host, port, true);
293 }
294
295 case DISCONNECTED:
296 {
297 // It's possible there was a request to close the uv_handle in the
298 // meanwhile; in that case we abort the reconnection attempt.
299 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) == 0)
300 {
301 // Close and reset the uv_handle before trying again with the same
302 // addr_current that succeeded previously.
303 LOG_DEBUG_FMT("Reconnect from resolved address");
304 status = RECONNECTING;
305 uv_close(reinterpret_cast<uv_handle_t*>(&uv_handle), on_reconnect);
306 }
307 return true;
308 }
309
310 default:
311 {
313 "Unexpected status during reconnect, ignoring: {}", status);
314 }
315 }
316
317 return false;
318 }
319
320 bool listen(
321 const std::string& host_,
322 const std::string& port_,
323 const std::optional<std::string>& name = std::nullopt)
324 {
325 assert_status(FRESH, LISTENING_RESOLVING);
326 bool ret = resolve(host_, port_, false);
327 listen_name = name;
328 return ret;
329 }
330
331 bool write(size_t len, const uint8_t* data, sockaddr /*addr*/ = {})
332 {
333 auto* req = new uv_write_t; // NOLINT(cppcoreguidelines-owning-memory)
334 auto* copy = new char[len]; // NOLINT(cppcoreguidelines-owning-memory)
335 if (data != nullptr)
336 {
337 memcpy(copy, data, len);
338 }
339 req->data = copy;
340
341 switch (status)
342 {
343 case BINDING:
344 case BINDING_FAILED:
345 case CONNECTING_RESOLVING:
346 case CONNECTING:
347 case RESOLVING_FAILED:
348 case CONNECTING_FAILED:
349 case RECONNECTING:
350 {
351 pending_writes.emplace_back(req, len, sockaddr{}, free_write);
352 break;
353 }
354
355 case CONNECTED:
356 {
357 return send_write(req, len);
358 }
359
360 case DISCONNECTED:
361 {
362 LOG_DEBUG_FMT("Disconnected: Ignoring write of size {}", len);
363 free_write(req);
364 break;
365 }
366
367 default:
368 {
369 free_write(req);
370 throw std::logic_error(
371 fmt::format("Unexpected status during write: {}", status));
372 }
373 }
374
375 return true;
376 }
377
378 private:
379 bool init()
380 {
381 assert_status(FRESH, FRESH);
382
383 int rc = 0;
384 if ((rc = uv_tcp_init(uv_default_loop(), &uv_handle)) < 0)
385 {
386 LOG_FAIL_FMT("uv_tcp_init failed: {}", uv_strerror(rc));
387 return false;
388 }
389
390 if ((rc = uv_tcp_nodelay(&uv_handle, 1)) < 0)
391 {
392 LOG_FAIL_FMT("uv_tcp_nodelay failed: {}", uv_strerror(rc));
393 return false;
394 }
395
396 if (is_client)
397 {
398 uv_os_sock_t sock = 0;
399 if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
400 {
402 "socket creation failed: {}",
403 std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
404 return false;
405 }
406
407 if (connection_timeout.has_value())
408 {
409 const unsigned int ms = connection_timeout->count();
410 const auto ret =
411 setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms, sizeof(ms));
412 if (ret != 0)
413 {
415 "Failed to set socket option (TCP_USER_TIMEOUT): {}",
416 std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
417 return false;
418 }
419 }
420
421 if ((rc = uv_tcp_open(&uv_handle, sock)) < 0)
422 {
423 LOG_FAIL_FMT("uv_tcp_open failed: {}", uv_strerror(rc));
424 return false;
425 }
426 }
427
428 if ((rc = uv_tcp_keepalive(&uv_handle, 1, 30)) < 0)
429 {
430 LOG_FAIL_FMT("uv_tcp_keepalive failed: {}", uv_strerror(rc));
431 return false;
432 }
433
434 uv_handle.data = this;
435 return true;
436 }
437
438 bool send_write(uv_write_t* req, size_t len)
439 {
440 auto* copy = static_cast<char*>(req->data);
441
442 uv_buf_t buf;
443 buf.base = copy;
444 buf.len = len;
445
446 int rc = 0;
447
448 if (
449 (rc = uv_write(
450 req,
451 reinterpret_cast<uv_stream_t*>(&uv_handle),
452 &buf,
453 1,
454 on_write)) < 0)
455 {
456 free_write(req);
457 LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
458 assert_status(CONNECTED, DISCONNECTED);
459 behaviour->on_disconnect();
460 return false;
461 }
462
463 return true;
464 }
465
466 void update_resolved_address(int address_family, sockaddr* sa)
467 {
468 auto [h, p] = addr_to_str(sa, address_family);
469 host = h;
470 port = p;
471 LOG_TRACE_FMT("TCP update address to {}:{}", host, port);
472 }
473
474 void listen_resolved()
475 {
476 int rc = 0;
477
478 while (addr_current != nullptr)
479 {
480 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
481
482 if ((rc = uv_tcp_bind(&uv_handle, addr_current->ai_addr, 0)) < 0)
483 {
484 addr_current = addr_current->ai_next;
486 "uv_tcp_bind failed on {}: {}",
487 get_address_name(),
488 uv_strerror(rc));
489 continue;
490 }
491
492 if (
493 (rc = uv_listen(
494 reinterpret_cast<uv_stream_t*>(&uv_handle), backlog, on_accept)) <
495 0)
496 {
498 "uv_listen failed on {}: {}", get_address_name(), uv_strerror(rc));
499 addr_current = addr_current->ai_next;
500 continue;
501 }
502
503 // If bound on port 0 (ie - asking the OS to assign a port), then we
504 // need to call uv_tcp_getsockname to retrieve the bound port
505 // (addr_current will not contain it)
506 if (!port_assigned())
507 {
508 sockaddr_storage sa_storage{};
509 auto* const sa = reinterpret_cast<sockaddr*>(&sa_storage);
510 int sa_len = sizeof(sa_storage);
511 if ((rc = uv_tcp_getsockname(&uv_handle, sa, &sa_len)) != 0)
512 {
513 LOG_FAIL_FMT("uv_tcp_getsockname failed: {}", uv_strerror(rc));
514 }
515 update_resolved_address(addr_current->ai_family, sa);
516 }
517
518 assert_status(LISTENING_RESOLVING, LISTENING);
519 behaviour->on_listening(host, port);
520 return;
521 }
522
523 assert_status(LISTENING_RESOLVING, LISTENING_FAILED);
524 behaviour->on_listen_failed();
525 }
526
527 bool connect_resolved()
528 {
529 auto* req = new uv_connect_t; // NOLINT(cppcoreguidelines-owning-memory)
530 int rc = 0;
531
532 while (addr_current != nullptr)
533 {
534 if (
535 (rc = uv_tcp_connect(
536 req, &uv_handle, addr_current->ai_addr, on_connect)) < 0)
537 {
538 LOG_DEBUG_FMT("uv_tcp_connect retry: {}", uv_strerror(rc));
539 addr_current = addr_current->ai_next;
540 continue;
541 }
542
543 assert_status(CONNECTING_RESOLVING, CONNECTING);
544 return true;
545 }
546
547 assert_status(CONNECTING_RESOLVING, CONNECTING_FAILED);
548 delete req; // NOLINT(cppcoreguidelines-owning-memory)
549
550 // This should show even when verbose logs are off
552 "Unable to connect: all resolved addresses failed: {}:{}", host, port);
553
554 behaviour->on_connect_failed();
555 return false;
556 }
557
558 void assert_status(Status from, Status to)
559 {
560 if (status != from)
561 {
562 throw std::logic_error(fmt::format(
563 "Trying to transition from {} to {} but current status is {}",
564 from,
565 to,
566 status));
567 }
568
569 status = to;
570 }
571
572 bool resolve(
573 const std::string& host_, const std::string& port_, bool async = true)
574 {
575 host = host_;
576 port = port_;
577
578 if (addr_base != nullptr)
579 {
580 uv_freeaddrinfo(addr_base);
581 addr_base = nullptr;
582 addr_current = nullptr;
583 }
584
585 if (!DNS::resolve(host, port, this, on_resolved, async))
586 {
587 LOG_DEBUG_FMT("Resolving '{}' failed", host);
588 status = RESOLVING_FAILED;
589 return false;
590 }
591
592 return true;
593 }
594
595 static void on_resolved(uv_getaddrinfo_t* req, int rc, struct addrinfo* res)
596 {
597 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
598 pending_resolve_requests.erase(req);
599
600 if (req->data != nullptr)
601 {
602 static_cast<TCPImpl*>(req->data)->on_resolved(req, rc);
603 }
604 else
605 {
606 // The TCPImpl that submitted the request has been destroyed, but we
607 // need to clean up the request object.
608 uv_freeaddrinfo(res);
609 delete req; // NOLINT(cppcoreguidelines-owning-memory)
610 }
611 }
612
613 void on_resolved(uv_getaddrinfo_t* req, int rc)
614 {
615 // It is possible that on_resolved is triggered after there has been a
616 // request to close uv_handle. In this scenario, we should not try to
617 // do anything with the handle and return immediately (otherwise,
618 // uv_close cb will abort).
619 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) != 0)
620 {
621 LOG_DEBUG_FMT("on_resolved: closing");
622 uv_freeaddrinfo(req->addrinfo);
623 delete req; // NOLINT(cppcoreguidelines-owning-memory)
624 return;
625 }
626
627 if (rc < 0)
628 {
629 status = RESOLVING_FAILED;
630 LOG_DEBUG_FMT("TCP resolve failed: {}", uv_strerror(rc));
631 behaviour->on_resolve_failed();
632 }
633 else
634 {
635 addr_base = req->addrinfo;
636 addr_current = addr_base;
637
638 switch (status)
639 {
640 case CONNECTING_RESOLVING:
641 {
642 connect_resolved();
643 break;
644 }
645
646 case LISTENING_RESOLVING:
647 {
648 listen_resolved();
649 break;
650 }
651
652 default:
653 {
654 throw std::logic_error(
655 fmt::format("Unexpected status during on_resolved: {}", status));
656 }
657 }
658 }
659
660 delete req; // NOLINT(cppcoreguidelines-owning-memory)
661 }
662
663 static void on_accept(uv_stream_t* handle, int rc)
664 {
665 static_cast<TCPImpl*>(handle->data)->on_accept(rc);
666 }
667
668 void on_accept(int rc)
669 {
670 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) != 0)
671 {
672 LOG_DEBUG_FMT("on_accept: closing");
673 return;
674 }
675
676 if (rc < 0)
677 {
678 LOG_DEBUG_FMT("on_accept failed: {}", uv_strerror(rc));
679 return;
680 }
681
682 TCP peer;
683
684 if (
685 (rc = uv_accept(
686 reinterpret_cast<uv_stream_t*>(&uv_handle),
687 reinterpret_cast<uv_stream_t*>(&peer->uv_handle))) < 0)
688 {
689 LOG_DEBUG_FMT("uv_accept failed: {}", uv_strerror(rc));
690 return;
691 }
692
693 peer->assert_status(FRESH, CONNECTED);
694
695 if (!peer->read_start())
696 {
697 return;
698 }
699
700 behaviour->on_accept(peer);
701 }
702
703 static void on_connect(uv_connect_t* req, int rc)
704 {
705 auto* self = static_cast<TCPImpl*>(req->handle->data);
706 delete req; // NOLINT(cppcoreguidelines-owning-memory)
707
708 if (rc == UV_ECANCELED)
709 {
710 // Break reconnection loop early if cancelled
711 LOG_FAIL_FMT("on_connect: cancelled");
712 return;
713 }
714
715 self->on_connect(rc);
716 }
717
718 void on_connect(int rc)
719 {
720 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) != 0)
721 {
722 LOG_DEBUG_FMT("on_connect: closing");
723 return;
724 }
725
726 if (rc < 0)
727 {
728 // Try again on the next address.
729 LOG_DEBUG_FMT("uv_tcp_connect async retry: {}", uv_strerror(rc));
730 addr_current = addr_current->ai_next;
731 assert_status(CONNECTING, CONNECTING_RESOLVING);
732 connect_resolved();
733 }
734 else
735 {
736 assert_status(CONNECTING, CONNECTED);
737
738 if (!read_start())
739 {
740 return;
741 }
742
743 for (auto& w : pending_writes) // NOLINT(readability-qualified-auto)
744 {
745 send_write(w.req, w.len);
746 w.req = nullptr;
747 }
748
749 PendingWrites().swap(pending_writes);
750 behaviour->on_connect();
751 }
752 }
753
754 bool read_start()
755 {
756 int rc = 0;
757
758 if (
759 (rc = uv_read_start(
760 reinterpret_cast<uv_stream_t*>(&uv_handle), on_alloc, on_read)) < 0)
761 {
762 assert_status(CONNECTED, DISCONNECTED);
763 LOG_FAIL_FMT("uv_read_start failed: {}", uv_strerror(rc));
764
765 if (behaviour)
766 {
767 behaviour->on_disconnect();
768 }
769
770 return false;
771 }
772
773 return true;
774 }
775
776 static void on_alloc(
777 uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
778 {
779 static_cast<TCPImpl*>(handle->data)->on_alloc(suggested_size, buf);
780 }
781
782 void on_alloc(size_t suggested_size, uv_buf_t* buf)
783 {
784 auto alloc_size = std::min(suggested_size, max_read_size);
785
786 alloc_size = std::min(alloc_size, remaining_read_quota);
787 remaining_read_quota -= alloc_size;
788 if (alloc_size != 0)
789 {
791 "Allocating {} bytes for TCP read ({} of quota remaining)",
792 alloc_size,
793 remaining_read_quota);
794 }
795
796 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
797 buf->base = new char[alloc_size];
798 buf->len = alloc_size;
799 }
800
801 void on_free(const uv_buf_t* buf)
802 {
803 delete[] buf->base; // NOLINT(cppcoreguidelines-owning-memory)
804 }
805
806 static void on_read(uv_stream_t* handle, ssize_t sz, const uv_buf_t* buf)
807 {
808 static_cast<TCPImpl*>(handle->data)->on_read(sz, buf);
809 }
810
811 void on_read(ssize_t sz, const uv_buf_t* buf)
812 {
813 if (sz == 0)
814 {
815 on_free(buf);
816 return;
817 }
818
819 if (sz == UV_ENOBUFS)
820 {
821 if (!alloc_quota_logged)
822 {
823 LOG_DEBUG_FMT("TCP on_read reached allocation quota");
824 alloc_quota_logged = true;
825 }
826 on_free(buf);
827 return;
828 }
829
830 if (sz < 0)
831 {
832 assert_status(CONNECTED, DISCONNECTED);
833 on_free(buf);
834 uv_read_stop(reinterpret_cast<uv_stream_t*>(&uv_handle));
835
836 LOG_DEBUG_FMT("TCP on_read: {}", uv_strerror(static_cast<int>(sz)));
837 behaviour->on_disconnect();
838 return;
839 }
840
841 auto* p = reinterpret_cast<uint8_t*>(buf->base);
842 const bool read_good = behaviour->on_read(static_cast<size_t>(sz), p, {});
843
844 if (p != nullptr)
845 {
846 on_free(buf);
847 }
848
849 if (!read_good)
850 {
851 behaviour->on_disconnect();
852 return;
853 }
854 }
855
856 static void on_write(uv_write_t* req, int /*status*/)
857 {
858 free_write(req);
859 }
860
861 static void free_write(uv_write_t* req)
862 {
863 if (req == nullptr)
864 {
865 return;
866 }
867
868 auto* copy = static_cast<char*>(req->data);
869 delete[] copy; // NOLINT(cppcoreguidelines-owning-memory)
870 delete req; // NOLINT(cppcoreguidelines-owning-memory)
871 }
872
873 static void on_reconnect(uv_handle_t* handle)
874 {
875 static_cast<TCPImpl*>(handle->data)->on_reconnect();
876 }
877
878 void on_reconnect()
879 {
880 assert_status(RECONNECTING, FRESH);
881
882 if (!init())
883 {
884 assert_status(FRESH, CONNECTING_FAILED);
885 behaviour->on_connect_failed();
886 return;
887 }
888
889 if (client_addr_base != nullptr)
890 {
891 assert_status(FRESH, BINDING);
892 client_bind();
893 }
894 else
895 {
896 assert_status(FRESH, CONNECTING_RESOLVING);
897 connect_resolved();
898 }
899 }
900 };
901
903 {
904 public:
906
908 {
910 }
911 };
912
914}
static bool resolve(const std::string &host_, const std::string &service, void *ud, uv_getaddrinfo_cb cb, bool async)
Definition dns.h:19
void before_io()
Definition tcp.h:907
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
Definition tcp.h:23
void client_bind()
Definition tcp.h:184
static void on_client_resolved(uv_getaddrinfo_t *req, int rc, struct addrinfo *)
Definition tcp.h:209
bool listen(const std::string &host_, const std::string &port_, const std::optional< std::string > &name=std::nullopt)
Definition tcp.h:320
std::string get_port() const
Definition tcp.h:142
std::string get_host() const
Definition tcp.h:137
void set_behaviour(std::unique_ptr< SocketBehaviour< TCP > > b)
Definition tcp.h:132
std::optional< std::string > get_listen_name() const
Definition tcp.h:179
bool write(size_t len, const uint8_t *data, sockaddr={})
Definition tcp.h:331
void on_client_resolved(uv_getaddrinfo_t *req, int rc)
Definition tcp.h:215
std::string get_peer_name() const
Definition tcp.h:147
bool reconnect()
Definition tcp.h:275
static void reset_read_quota()
Definition tcp.h:126
void start(int64_t)
This is to mimic UDP's implementation. TCP's start is on_accept.
Definition tcp.h:236
bool connect(const std::string &host_, const std::string &port_, const std::optional< std::string > &client_host_=std::nullopt)
Definition tcp.h:238
Definition proxy.h:15
Definition proxy.h:84
uv_tcp_t uv_handle
Definition proxy.h:90
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition after_io.h:8
std::pair< std::string, std::string > addr_to_str(const sockaddr *addr, int address_family=AF_INET)
Definition socket.h:84
proxy_ptr< TCPImpl > TCP
Definition tcp.h:20
auto * handle
Definition kv_helpers.h:87
Definition configuration.h:14