27 static constexpr int backlog = 128;
28 static constexpr size_t max_read_size = 16384;
31 static constexpr auto max_read_quota = max_read_size * 4;
32 static size_t remaining_read_quota;
33 static bool alloc_quota_logged;
53 std::optional<std::chrono::milliseconds> connection_timeout = std::nullopt;
55 std::unique_ptr<SocketBehaviour<TCP>> behaviour;
56 using PendingWrites = std::vector<PendingIO<uv_write_t>>;
57 PendingWrites pending_writes;
61 std::optional<std::string> client_host = std::nullopt;
62 std::optional<std::string> listen_name = std::nullopt;
64 addrinfo* client_addr_base =
nullptr;
65 addrinfo* addr_base =
nullptr;
66 addrinfo* addr_current =
nullptr;
68 [[nodiscard]]
bool port_assigned()
const
73 [[nodiscard]] std::string get_address_name()
const
75 const std::string port_suffix =
76 port_assigned() ? fmt::format(
":{}", port) :
"";
78 if (addr_current !=
nullptr && addr_current->ai_family == AF_INET6)
80 return fmt::format(
"[{}]{}",
host, port_suffix);
83 return fmt::format(
"{}{}",
host, port_suffix);
87 bool is_client_ =
false,
88 std::optional<std::chrono::milliseconds> connection_timeout_ =
90 is_client(is_client_),
91 connection_timeout(connection_timeout_)
95 throw std::logic_error(
"uv tcp initialization failed");
104 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
105 for (
const auto& req : pending_resolve_requests)
109 if (req->data ==
this)
115 if (addr_base !=
nullptr)
117 uv_freeaddrinfo(addr_base);
119 if (client_addr_base !=
nullptr)
121 uv_freeaddrinfo(client_addr_base);
128 remaining_read_quota = max_read_quota;
129 alloc_quota_logged =
false;
134 behaviour = std::move(b);
149 sockaddr_storage sa = {};
150 int name_len =
sizeof(sa);
153 &
uv_handle,
reinterpret_cast<sockaddr*
>(&sa), &name_len) < 0)
158 switch (sa.ss_family)
162 char tmp[INET_ADDRSTRLEN];
163 auto* sa4 =
reinterpret_cast<sockaddr_in*
>(&sa);
164 uv_ip4_name(sa4, tmp,
sizeof(tmp));
169 char tmp[INET6_ADDRSTRLEN];
170 auto* sa6 =
reinterpret_cast<sockaddr_in6*
>(&sa);
171 uv_ip6_name(sa6, tmp,
sizeof(tmp));
175 return fmt::format(
"unknown family: {}", sa.ss_family);
187 if ((rc = uv_tcp_bind(&
uv_handle, client_addr_base->ai_addr, 0)) < 0)
189 assert_status(BINDING, BINDING_FAILED);
190 LOG_FAIL_FMT(
"uv_tcp_bind failed: {}", uv_strerror(rc));
191 behaviour->on_bind_failed();
195 assert_status(BINDING, CONNECTING_RESOLVING);
196 if (addr_current !=
nullptr)
202 resolve(this->host, this->port,
true);
210 uv_getaddrinfo_t* req,
int rc,
struct addrinfo* )
217 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) == 0)
221 assert_status(BINDING, BINDING_FAILED);
222 LOG_DEBUG_FMT(
"TCP client resolve failed: {}", uv_strerror(rc));
223 behaviour->on_bind_failed();
227 client_addr_base = req->addrinfo;
239 const std::string& host_,
240 const std::string& port_,
241 const std::optional<std::string>& client_host_ = std::nullopt)
245 if (client_host_.has_value())
247 client_host = client_host_;
251 if (client_addr_base !=
nullptr)
253 uv_freeaddrinfo(client_addr_base);
254 client_addr_base =
nullptr;
262 status = BINDING_FAILED;
268 assert_status(FRESH, CONNECTING_RESOLVING);
269 return resolve(host_, port_,
true);
281 case LISTENING_RESOLVING:
283 case CONNECTING_RESOLVING:
286 case LISTENING_FAILED:
290 "Unexpected status during reconnect, ignoring: {}", status);
297 assert_status(BINDING_FAILED, BINDING);
300 case RESOLVING_FAILED:
301 case CONNECTING_FAILED:
305 status = CONNECTING_RESOLVING;
306 return resolve(
host, port,
true);
313 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) == 0)
318 status = RECONNECTING;
319 uv_close(
reinterpret_cast<uv_handle_t*
>(&
uv_handle), on_reconnect);
326 throw std::logic_error(
327 fmt::format(
"Unexpected status during reconnect: {}", status));
335 const std::string& host_,
336 const std::string& port_,
337 const std::optional<std::string>& name = std::nullopt)
339 assert_status(FRESH, LISTENING_RESOLVING);
340 bool ret = resolve(host_, port_,
false);
345 bool write(
size_t len,
const uint8_t* data, sockaddr = {})
347 auto* req =
new uv_write_t;
348 auto* copy =
new char[len];
351 memcpy(copy, data, len);
359 case CONNECTING_RESOLVING:
361 case RESOLVING_FAILED:
362 case CONNECTING_FAILED:
365 pending_writes.emplace_back(req, len, sockaddr{}, free_write);
371 return send_write(req, len);
376 LOG_DEBUG_FMT(
"Disconnected: Ignoring write of size {}", len);
382 case LISTENING_RESOLVING:
384 case LISTENING_FAILED:
388 throw std::logic_error(
389 fmt::format(
"Unexpected status during write: {}", status));
399 assert_status(FRESH, FRESH);
402 if ((rc = uv_tcp_init(uv_default_loop(), &
uv_handle)) < 0)
404 LOG_FAIL_FMT(
"uv_tcp_init failed: {}", uv_strerror(rc));
408 if ((rc = uv_tcp_nodelay(&
uv_handle, 1)) < 0)
410 LOG_FAIL_FMT(
"uv_tcp_nodelay failed: {}", uv_strerror(rc));
416 uv_os_sock_t sock = 0;
417 if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
420 "socket creation failed: {}",
421 std::strerror(errno));
425 if (connection_timeout.has_value())
427 const unsigned int ms = connection_timeout->count();
429 setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms,
sizeof(ms));
433 "Failed to set socket option (TCP_USER_TIMEOUT): {}",
434 std::strerror(errno));
439 if ((rc = uv_tcp_open(&
uv_handle, sock)) < 0)
441 LOG_FAIL_FMT(
"uv_tcp_open failed: {}", uv_strerror(rc));
446 if ((rc = uv_tcp_keepalive(&
uv_handle, 1, 30)) < 0)
448 LOG_FAIL_FMT(
"uv_tcp_keepalive failed: {}", uv_strerror(rc));
456 bool send_write(uv_write_t* req,
size_t len)
458 auto* copy =
static_cast<char*
>(req->data);
469 reinterpret_cast<uv_stream_t*
>(&
uv_handle),
476 assert_status(CONNECTED, DISCONNECTED);
477 behaviour->on_disconnect();
484 void update_resolved_address(
int address_family, sockaddr* sa)
492 void listen_resolved()
496 while (addr_current !=
nullptr)
498 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
500 if ((rc = uv_tcp_bind(&
uv_handle, addr_current->ai_addr, 0)) < 0)
502 addr_current = addr_current->ai_next;
504 "uv_tcp_bind failed on {}: {}",
512 reinterpret_cast<uv_stream_t*
>(&
uv_handle), backlog, on_accept)) <
516 "uv_listen failed on {}: {}", get_address_name(), uv_strerror(rc));
517 addr_current = addr_current->ai_next;
524 if (!port_assigned())
526 sockaddr_storage sa_storage{};
527 auto*
const sa =
reinterpret_cast<sockaddr*
>(&sa_storage);
528 int sa_len =
sizeof(sa_storage);
529 if ((rc = uv_tcp_getsockname(&
uv_handle, sa, &sa_len)) != 0)
531 LOG_FAIL_FMT(
"uv_tcp_getsockname failed: {}", uv_strerror(rc));
533 update_resolved_address(addr_current->ai_family, sa);
536 assert_status(LISTENING_RESOLVING, LISTENING);
537 behaviour->on_listening(
host, port);
541 assert_status(LISTENING_RESOLVING, LISTENING_FAILED);
542 behaviour->on_listen_failed();
545 bool connect_resolved()
547 auto* req =
new uv_connect_t;
550 while (addr_current !=
nullptr)
553 (rc = uv_tcp_connect(
554 req, &
uv_handle, addr_current->ai_addr, on_connect)) < 0)
557 addr_current = addr_current->ai_next;
561 assert_status(CONNECTING_RESOLVING, CONNECTING);
565 assert_status(CONNECTING_RESOLVING, CONNECTING_FAILED);
570 "Unable to connect: all resolved addresses failed: {}:{}",
host, port);
572 behaviour->on_connect_failed();
576 void assert_status(Status from, Status to)
580 throw std::logic_error(fmt::format(
581 "Trying to transition from {} to {} but current status is {}",
591 const std::string& host_,
const std::string& port_,
bool async =
true)
596 if (addr_base !=
nullptr)
598 uv_freeaddrinfo(addr_base);
600 addr_current =
nullptr;
606 status = RESOLVING_FAILED;
613 static void on_resolved(uv_getaddrinfo_t* req,
int rc,
struct addrinfo* res)
615 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
616 pending_resolve_requests.erase(req);
618 if (req->data !=
nullptr)
620 static_cast<TCPImpl*
>(req->data)->on_resolved(req, rc);
626 uv_freeaddrinfo(res);
631 void on_resolved(uv_getaddrinfo_t* req,
int rc)
637 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) != 0)
640 uv_freeaddrinfo(req->addrinfo);
647 status = RESOLVING_FAILED;
649 behaviour->on_resolve_failed();
653 addr_base = req->addrinfo;
654 addr_current = addr_base;
658 case CONNECTING_RESOLVING:
664 case LISTENING_RESOLVING:
677 case RESOLVING_FAILED:
678 case LISTENING_FAILED:
679 case CONNECTING_FAILED:
683 throw std::logic_error(
684 fmt::format(
"Unexpected status during on_resolved: {}", status));
692 static void on_accept(uv_stream_t* handle,
int rc)
694 static_cast<TCPImpl*
>(
handle->data)->on_accept(rc);
697 void on_accept(
int rc)
699 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) != 0)
715 reinterpret_cast<uv_stream_t*
>(&
uv_handle),
716 reinterpret_cast<uv_stream_t*
>(&peer->uv_handle))) < 0)
722 peer->assert_status(FRESH, CONNECTED);
724 if (!peer->read_start())
729 behaviour->on_accept(peer);
732 static void on_connect(uv_connect_t* req,
int rc)
734 auto* self =
static_cast<TCPImpl*
>(req->handle->data);
737 if (rc == UV_ECANCELED)
744 self->on_connect(rc);
747 void on_connect(
int rc)
749 if (uv_is_closing(
reinterpret_cast<uv_handle_t*
>(&
uv_handle)) != 0)
758 LOG_DEBUG_FMT(
"uv_tcp_connect async retry: {}", uv_strerror(rc));
759 addr_current = addr_current->ai_next;
760 assert_status(CONNECTING, CONNECTING_RESOLVING);
765 assert_status(CONNECTING, CONNECTED);
772 for (
auto& w : pending_writes)
774 send_write(w.req, w.len);
778 PendingWrites().swap(pending_writes);
779 behaviour->on_connect();
789 reinterpret_cast<uv_stream_t*
>(&
uv_handle), on_alloc, on_read)) < 0)
791 assert_status(CONNECTED, DISCONNECTED);
792 LOG_FAIL_FMT(
"uv_read_start failed: {}", uv_strerror(rc));
796 behaviour->on_disconnect();
805 static void on_alloc(
806 uv_handle_t* handle,
size_t suggested_size, uv_buf_t* buf)
808 static_cast<TCPImpl*
>(
handle->data)->on_alloc(suggested_size, buf);
811 void on_alloc(
size_t suggested_size, uv_buf_t* buf)
813 auto alloc_size = std::min(suggested_size, max_read_size);
815 alloc_size = std::min(alloc_size, remaining_read_quota);
816 remaining_read_quota -= alloc_size;
820 "Allocating {} bytes for TCP read ({} of quota remaining)",
822 remaining_read_quota);
826 buf->base =
new char[alloc_size];
827 buf->len = alloc_size;
830 void on_free(
const uv_buf_t* buf)
835 static void on_read(uv_stream_t* handle, ssize_t sz,
const uv_buf_t* buf)
837 static_cast<TCPImpl*
>(
handle->data)->on_read(sz, buf);
840 void on_read(ssize_t sz,
const uv_buf_t* buf)
848 if (sz == UV_ENOBUFS)
850 if (!alloc_quota_logged)
853 alloc_quota_logged =
true;
861 assert_status(CONNECTED, DISCONNECTED);
863 uv_read_stop(
reinterpret_cast<uv_stream_t*
>(&
uv_handle));
865 LOG_DEBUG_FMT(
"TCP on_read: {}", uv_strerror(
static_cast<int>(sz)));
866 behaviour->on_disconnect();
870 auto* p =
reinterpret_cast<uint8_t*
>(buf->base);
871 const bool read_good = behaviour->on_read(
static_cast<size_t>(sz), p, {});
880 behaviour->on_disconnect();
885 static void on_write(uv_write_t* req,
int )
890 static void free_write(uv_write_t* req)
897 auto* copy =
static_cast<char*
>(req->data);
902 static void on_reconnect(uv_handle_t* handle)
904 static_cast<TCPImpl*
>(
handle->data)->on_reconnect();
909 assert_status(RECONNECTING, FRESH);
913 assert_status(FRESH, CONNECTING_FAILED);
914 behaviour->on_connect_failed();
918 if (client_addr_base !=
nullptr)
920 assert_status(FRESH, BINDING);
925 assert_status(FRESH, CONNECTING_RESOLVING);