CCF
Loading...
Searching...
No Matches
tcp.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "before_io.h"
6#include "ccf/pal/locking.h"
7#include "dns.h"
9#include "ds/pending_io.h"
10#include "proxy.h"
11#include "socket.h"
12
13#include <netinet/in.h>
14#include <optional>
15
16namespace asynchost
17{
18 // NOLINTBEGIN(cppcoreguidelines-virtual-class-destructor)
19 class TCPImpl;
21
22 class TCPImpl : public with_uv_handle<uv_tcp_t>
23 {
24 private:
25 friend class close_ptr<TCPImpl>;
26
27 static constexpr int backlog = 128;
28 static constexpr size_t max_read_size = 16384;
29
30 // Each uv iteration, read only a capped amount from all sockets.
31 static constexpr auto max_read_quota = max_read_size * 4;
32 static size_t remaining_read_quota;
33 static bool alloc_quota_logged;
34
35 enum Status : uint8_t
36 {
37 FRESH,
38 LISTENING_RESOLVING,
39 LISTENING,
40 BINDING,
41 BINDING_FAILED,
42 CONNECTING_RESOLVING,
43 CONNECTING,
44 CONNECTED,
45 DISCONNECTED,
46 RESOLVING_FAILED,
47 LISTENING_FAILED,
48 CONNECTING_FAILED,
49 RECONNECTING
50 };
51
52 bool is_client;
53 std::optional<std::chrono::milliseconds> connection_timeout = std::nullopt;
54 Status status{FRESH};
55 std::unique_ptr<SocketBehaviour<TCP>> behaviour;
56 using PendingWrites = std::vector<PendingIO<uv_write_t>>;
57 PendingWrites pending_writes;
58
59 std::string host;
60 std::string port;
61 std::optional<std::string> client_host = std::nullopt;
62 std::optional<std::string> listen_name = std::nullopt;
63
64 addrinfo* client_addr_base = nullptr;
65 addrinfo* addr_base = nullptr;
66 addrinfo* addr_current = nullptr;
67
68 [[nodiscard]] bool port_assigned() const
69 {
70 return port != "0";
71 }
72
73 [[nodiscard]] std::string get_address_name() const
74 {
75 const std::string port_suffix =
76 port_assigned() ? fmt::format(":{}", port) : "";
77
78 if (addr_current != nullptr && addr_current->ai_family == AF_INET6)
79 {
80 return fmt::format("[{}]{}", host, port_suffix);
81 }
82
83 return fmt::format("{}{}", host, port_suffix);
84 }
85
86 TCPImpl(
87 bool is_client_ = false,
88 std::optional<std::chrono::milliseconds> connection_timeout_ =
89 std::nullopt) :
90 is_client(is_client_),
91 connection_timeout(connection_timeout_)
92 {
93 if (!init())
94 {
95 throw std::logic_error("uv tcp initialization failed");
96 }
97
98 uv_handle.data = this;
99 }
100
101 ~TCPImpl() override
102 {
103 {
104 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
105 for (const auto& req : pending_resolve_requests)
106 {
107 // The UV request objects can stay, but if there are any references
108 // to `this` left, we need to remove them.
109 if (req->data == this)
110 {
111 req->data = nullptr;
112 }
113 }
114 }
115 if (addr_base != nullptr)
116 {
117 uv_freeaddrinfo(addr_base);
118 }
119 if (client_addr_base != nullptr)
120 {
121 uv_freeaddrinfo(client_addr_base);
122 }
123 }
124
125 public:
126 static void reset_read_quota()
127 {
128 remaining_read_quota = max_read_quota;
129 alloc_quota_logged = false;
130 }
131
132 void set_behaviour(std::unique_ptr<SocketBehaviour<TCP>> b)
133 {
134 behaviour = std::move(b);
135 }
136
137 [[nodiscard]] std::string get_host() const
138 {
139 return host;
140 }
141
142 [[nodiscard]] std::string get_port() const
143 {
144 return port;
145 }
146
147 [[nodiscard]] std::string get_peer_name() const
148 {
149 sockaddr_storage sa = {};
150 int name_len = sizeof(sa);
151 if (
152 uv_tcp_getpeername(
153 &uv_handle, reinterpret_cast<sockaddr*>(&sa), &name_len) < 0)
154 {
155 LOG_FAIL_FMT("uv_tcp_getpeername failed");
156 return "";
157 }
158 switch (sa.ss_family)
159 {
160 case AF_INET:
161 {
162 char tmp[INET_ADDRSTRLEN];
163 auto* sa4 = reinterpret_cast<sockaddr_in*>(&sa);
164 uv_ip4_name(sa4, tmp, sizeof(tmp));
165 return tmp;
166 }
167 case AF_INET6:
168 {
169 char tmp[INET6_ADDRSTRLEN];
170 auto* sa6 = reinterpret_cast<sockaddr_in6*>(&sa);
171 uv_ip6_name(sa6, tmp, sizeof(tmp));
172 return tmp;
173 }
174 default:
175 return fmt::format("unknown family: {}", sa.ss_family);
176 }
177 }
178
179 [[nodiscard]] std::optional<std::string> get_listen_name() const
180 {
181 return listen_name;
182 }
183
185 {
186 int rc = 0;
187 if ((rc = uv_tcp_bind(&uv_handle, client_addr_base->ai_addr, 0)) < 0)
188 {
189 assert_status(BINDING, BINDING_FAILED);
190 LOG_FAIL_FMT("uv_tcp_bind failed: {}", uv_strerror(rc));
191 behaviour->on_bind_failed();
192 }
193 else
194 {
195 assert_status(BINDING, CONNECTING_RESOLVING);
196 if (addr_current != nullptr)
197 {
198 connect_resolved();
199 }
200 else
201 {
202 resolve(this->host, this->port, true);
203 }
204 }
205 }
206
207 // NOLINTEND(cppcoreguidelines-virtual-class-destructor)
208
210 uv_getaddrinfo_t* req, int rc, struct addrinfo* /*res*/)
211 {
212 static_cast<TCPImpl*>(req->data)->on_client_resolved(req, rc);
213 }
214
215 void on_client_resolved(uv_getaddrinfo_t* req, int rc)
216 {
217 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) == 0)
218 {
219 if (rc < 0)
220 {
221 assert_status(BINDING, BINDING_FAILED);
222 LOG_DEBUG_FMT("TCP client resolve failed: {}", uv_strerror(rc));
223 behaviour->on_bind_failed();
224 }
225 else
226 {
227 client_addr_base = req->addrinfo;
228 client_bind();
229 }
230 }
231
232 delete req; // NOLINT(cppcoreguidelines-owning-memory)
233 }
234
236 void start(int64_t /*id*/) {}
237
239 const std::string& host_,
240 const std::string& port_,
241 const std::optional<std::string>& client_host_ = std::nullopt)
242 {
243 // If a client host is set, bind to this first. Otherwise, connect
244 // straight away.
245 if (client_host_.has_value())
246 {
247 client_host = client_host_;
248 host = host_;
249 port = port_;
250
251 if (client_addr_base != nullptr)
252 {
253 uv_freeaddrinfo(client_addr_base);
254 client_addr_base = nullptr;
255 }
256
257 status = BINDING;
258 if (!DNS::resolve(
259 client_host.value(), "0", this, on_client_resolved, false))
260 {
261 LOG_DEBUG_FMT("Bind to '{}' failed", client_host.value());
262 status = BINDING_FAILED;
263 return false;
264 }
265 }
266 else
267 {
268 assert_status(FRESH, CONNECTING_RESOLVING);
269 return resolve(host_, port_, true);
270 }
271
272 return true;
273 }
274
276 {
277 switch (status)
278 {
279 case FRESH:
280 case BINDING:
281 case LISTENING_RESOLVING:
282 case LISTENING:
283 case CONNECTING_RESOLVING:
284 case CONNECTING:
285 case CONNECTED:
286 case LISTENING_FAILED:
287 case RECONNECTING:
288 {
290 "Unexpected status during reconnect, ignoring: {}", status);
291 break;
292 }
293 case BINDING_FAILED:
294 {
295 // Try again, from the start.
296 LOG_DEBUG_FMT("Reconnect from initial state");
297 assert_status(BINDING_FAILED, BINDING);
298 return connect(host, port, client_host);
299 }
300 case RESOLVING_FAILED:
301 case CONNECTING_FAILED:
302 {
303 // Try again, starting with DNS.
304 LOG_DEBUG_FMT("Reconnect from DNS");
305 status = CONNECTING_RESOLVING;
306 return resolve(host, port, true);
307 }
308
309 case DISCONNECTED:
310 {
311 // It's possible there was a request to close the uv_handle in the
312 // meanwhile; in that case we abort the reconnection attempt.
313 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) == 0)
314 {
315 // Close and reset the uv_handle before trying again with the same
316 // addr_current that succeeded previously.
317 LOG_DEBUG_FMT("Reconnect from resolved address");
318 status = RECONNECTING;
319 uv_close(reinterpret_cast<uv_handle_t*>(&uv_handle), on_reconnect);
320 }
321 return true;
322 }
323
324 default:
325 {
326 throw std::logic_error(
327 fmt::format("Unexpected status during reconnect: {}", status));
328 }
329 }
330
331 return false;
332 }
333
334 bool listen(
335 const std::string& host_,
336 const std::string& port_,
337 const std::optional<std::string>& name = std::nullopt)
338 {
339 assert_status(FRESH, LISTENING_RESOLVING);
340 bool ret = resolve(host_, port_, false);
341 listen_name = name;
342 return ret;
343 }
344
345 bool write(size_t len, const uint8_t* data, sockaddr /*addr*/ = {})
346 {
347 auto* req = new uv_write_t; // NOLINT(cppcoreguidelines-owning-memory)
348 auto* copy = new char[len]; // NOLINT(cppcoreguidelines-owning-memory)
349 if (data != nullptr)
350 {
351 memcpy(copy, data, len);
352 }
353 req->data = copy;
354
355 switch (status)
356 {
357 case BINDING:
358 case BINDING_FAILED:
359 case CONNECTING_RESOLVING:
360 case CONNECTING:
361 case RESOLVING_FAILED:
362 case CONNECTING_FAILED:
363 case RECONNECTING:
364 {
365 pending_writes.emplace_back(req, len, sockaddr{}, free_write);
366 break;
367 }
368
369 case CONNECTED:
370 {
371 return send_write(req, len);
372 }
373
374 case DISCONNECTED:
375 {
376 LOG_DEBUG_FMT("Disconnected: Ignoring write of size {}", len);
377 free_write(req);
378 break;
379 }
380
381 case FRESH:
382 case LISTENING_RESOLVING:
383 case LISTENING:
384 case LISTENING_FAILED:
385 default:
386 {
387 free_write(req);
388 throw std::logic_error(
389 fmt::format("Unexpected status during write: {}", status));
390 }
391 }
392
393 return true;
394 }
395
396 private:
397 bool init()
398 {
399 assert_status(FRESH, FRESH);
400
401 int rc = 0;
402 if ((rc = uv_tcp_init(uv_default_loop(), &uv_handle)) < 0)
403 {
404 LOG_FAIL_FMT("uv_tcp_init failed: {}", uv_strerror(rc));
405 return false;
406 }
407
408 if ((rc = uv_tcp_nodelay(&uv_handle, 1)) < 0)
409 {
410 LOG_FAIL_FMT("uv_tcp_nodelay failed: {}", uv_strerror(rc));
411 return false;
412 }
413
414 if (is_client)
415 {
416 uv_os_sock_t sock = 0;
417 if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1)
418 {
420 "socket creation failed: {}",
421 std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
422 return false;
423 }
424
425 if (connection_timeout.has_value())
426 {
427 const unsigned int ms = connection_timeout->count();
428 const auto ret =
429 setsockopt(sock, IPPROTO_TCP, TCP_USER_TIMEOUT, &ms, sizeof(ms));
430 if (ret != 0)
431 {
433 "Failed to set socket option (TCP_USER_TIMEOUT): {}",
434 std::strerror(errno)); // NOLINT(concurrency-mt-unsafe)
435 return false;
436 }
437 }
438
439 if ((rc = uv_tcp_open(&uv_handle, sock)) < 0)
440 {
441 LOG_FAIL_FMT("uv_tcp_open failed: {}", uv_strerror(rc));
442 return false;
443 }
444 }
445
446 if ((rc = uv_tcp_keepalive(&uv_handle, 1, 30)) < 0)
447 {
448 LOG_FAIL_FMT("uv_tcp_keepalive failed: {}", uv_strerror(rc));
449 return false;
450 }
451
452 uv_handle.data = this;
453 return true;
454 }
455
456 bool send_write(uv_write_t* req, size_t len)
457 {
458 auto* copy = static_cast<char*>(req->data);
459
460 uv_buf_t buf;
461 buf.base = copy;
462 buf.len = len;
463
464 int rc = 0;
465
466 if (
467 (rc = uv_write(
468 req,
469 reinterpret_cast<uv_stream_t*>(&uv_handle),
470 &buf,
471 1,
472 on_write)) < 0)
473 {
474 free_write(req);
475 LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
476 assert_status(CONNECTED, DISCONNECTED);
477 behaviour->on_disconnect();
478 return false;
479 }
480
481 return true;
482 }
483
484 void update_resolved_address(int address_family, sockaddr* sa)
485 {
486 auto [h, p] = addr_to_str(sa, address_family);
487 host = h;
488 port = p;
489 LOG_TRACE_FMT("TCP update address to {}:{}", host, port);
490 }
491
492 void listen_resolved()
493 {
494 int rc = 0;
495
496 while (addr_current != nullptr)
497 {
498 update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
499
500 if ((rc = uv_tcp_bind(&uv_handle, addr_current->ai_addr, 0)) < 0)
501 {
502 addr_current = addr_current->ai_next;
504 "uv_tcp_bind failed on {}: {}",
505 get_address_name(),
506 uv_strerror(rc));
507 continue;
508 }
509
510 if (
511 (rc = uv_listen(
512 reinterpret_cast<uv_stream_t*>(&uv_handle), backlog, on_accept)) <
513 0)
514 {
516 "uv_listen failed on {}: {}", get_address_name(), uv_strerror(rc));
517 addr_current = addr_current->ai_next;
518 continue;
519 }
520
521 // If bound on port 0 (ie - asking the OS to assign a port), then we
522 // need to call uv_tcp_getsockname to retrieve the bound port
523 // (addr_current will not contain it)
524 if (!port_assigned())
525 {
526 sockaddr_storage sa_storage{};
527 auto* const sa = reinterpret_cast<sockaddr*>(&sa_storage);
528 int sa_len = sizeof(sa_storage);
529 if ((rc = uv_tcp_getsockname(&uv_handle, sa, &sa_len)) != 0)
530 {
531 LOG_FAIL_FMT("uv_tcp_getsockname failed: {}", uv_strerror(rc));
532 }
533 update_resolved_address(addr_current->ai_family, sa);
534 }
535
536 assert_status(LISTENING_RESOLVING, LISTENING);
537 behaviour->on_listening(host, port);
538 return;
539 }
540
541 assert_status(LISTENING_RESOLVING, LISTENING_FAILED);
542 behaviour->on_listen_failed();
543 }
544
545 bool connect_resolved()
546 {
547 auto* req = new uv_connect_t; // NOLINT(cppcoreguidelines-owning-memory)
548 int rc = 0;
549
550 while (addr_current != nullptr)
551 {
552 if (
553 (rc = uv_tcp_connect(
554 req, &uv_handle, addr_current->ai_addr, on_connect)) < 0)
555 {
556 LOG_DEBUG_FMT("uv_tcp_connect retry: {}", uv_strerror(rc));
557 addr_current = addr_current->ai_next;
558 continue;
559 }
560
561 assert_status(CONNECTING_RESOLVING, CONNECTING);
562 return true;
563 }
564
565 assert_status(CONNECTING_RESOLVING, CONNECTING_FAILED);
566 delete req; // NOLINT(cppcoreguidelines-owning-memory)
567
568 // This should show even when verbose logs are off
570 "Unable to connect: all resolved addresses failed: {}:{}", host, port);
571
572 behaviour->on_connect_failed();
573 return false;
574 }
575
576 void assert_status(Status from, Status to)
577 {
578 if (status != from)
579 {
580 throw std::logic_error(fmt::format(
581 "Trying to transition from {} to {} but current status is {}",
582 from,
583 to,
584 status));
585 }
586
587 status = to;
588 }
589
590 bool resolve(
591 const std::string& host_, const std::string& port_, bool async = true)
592 {
593 host = host_;
594 port = port_;
595
596 if (addr_base != nullptr)
597 {
598 uv_freeaddrinfo(addr_base);
599 addr_base = nullptr;
600 addr_current = nullptr;
601 }
602
603 if (!DNS::resolve(host, port, this, on_resolved, async))
604 {
605 LOG_DEBUG_FMT("Resolving '{}' failed", host);
606 status = RESOLVING_FAILED;
607 return false;
608 }
609
610 return true;
611 }
612
613 static void on_resolved(uv_getaddrinfo_t* req, int rc, struct addrinfo* res)
614 {
615 std::unique_lock<ccf::pal::Mutex> guard(pending_resolve_requests_mtx);
616 pending_resolve_requests.erase(req);
617
618 if (req->data != nullptr)
619 {
620 static_cast<TCPImpl*>(req->data)->on_resolved(req, rc);
621 }
622 else
623 {
624 // The TCPImpl that submitted the request has been destroyed, but we
625 // need to clean up the request object.
626 uv_freeaddrinfo(res);
627 delete req; // NOLINT(cppcoreguidelines-owning-memory)
628 }
629 }
630
631 void on_resolved(uv_getaddrinfo_t* req, int rc)
632 {
633 // It is possible that on_resolved is triggered after there has been a
634 // request to close uv_handle. In this scenario, we should not try to
635 // do anything with the handle and return immediately (otherwise,
636 // uv_close cb will abort).
637 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) != 0)
638 {
639 LOG_DEBUG_FMT("on_resolved: closing");
640 uv_freeaddrinfo(req->addrinfo);
641 delete req; // NOLINT(cppcoreguidelines-owning-memory)
642 return;
643 }
644
645 if (rc < 0)
646 {
647 status = RESOLVING_FAILED;
648 LOG_DEBUG_FMT("TCP resolve failed: {}", uv_strerror(rc));
649 behaviour->on_resolve_failed();
650 }
651 else
652 {
653 addr_base = req->addrinfo;
654 addr_current = addr_base;
655
656 switch (status)
657 {
658 case CONNECTING_RESOLVING:
659 {
660 connect_resolved();
661 break;
662 }
663
664 case LISTENING_RESOLVING:
665 {
666 listen_resolved();
667 break;
668 }
669
670 case FRESH:
671 case LISTENING:
672 case BINDING:
673 case BINDING_FAILED:
674 case CONNECTING:
675 case CONNECTED:
676 case DISCONNECTED:
677 case RESOLVING_FAILED:
678 case LISTENING_FAILED:
679 case CONNECTING_FAILED:
680 case RECONNECTING:
681 default:
682 {
683 throw std::logic_error(
684 fmt::format("Unexpected status during on_resolved: {}", status));
685 }
686 }
687 }
688
689 delete req; // NOLINT(cppcoreguidelines-owning-memory)
690 }
691
692 static void on_accept(uv_stream_t* handle, int rc)
693 {
694 static_cast<TCPImpl*>(handle->data)->on_accept(rc);
695 }
696
697 void on_accept(int rc)
698 {
699 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) != 0)
700 {
701 LOG_DEBUG_FMT("on_accept: closing");
702 return;
703 }
704
705 if (rc < 0)
706 {
707 LOG_DEBUG_FMT("on_accept failed: {}", uv_strerror(rc));
708 return;
709 }
710
711 TCP peer;
712
713 if (
714 (rc = uv_accept(
715 reinterpret_cast<uv_stream_t*>(&uv_handle),
716 reinterpret_cast<uv_stream_t*>(&peer->uv_handle))) < 0)
717 {
718 LOG_DEBUG_FMT("uv_accept failed: {}", uv_strerror(rc));
719 return;
720 }
721
722 peer->assert_status(FRESH, CONNECTED);
723
724 if (!peer->read_start())
725 {
726 return;
727 }
728
729 behaviour->on_accept(peer);
730 }
731
732 static void on_connect(uv_connect_t* req, int rc)
733 {
734 auto* self = static_cast<TCPImpl*>(req->handle->data);
735 delete req; // NOLINT(cppcoreguidelines-owning-memory)
736
737 if (rc == UV_ECANCELED)
738 {
739 // Break reconnection loop early if cancelled
740 LOG_FAIL_FMT("on_connect: cancelled");
741 return;
742 }
743
744 self->on_connect(rc);
745 }
746
747 void on_connect(int rc)
748 {
749 if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&uv_handle)) != 0)
750 {
751 LOG_DEBUG_FMT("on_connect: closing");
752 return;
753 }
754
755 if (rc < 0)
756 {
757 // Try again on the next address.
758 LOG_DEBUG_FMT("uv_tcp_connect async retry: {}", uv_strerror(rc));
759 addr_current = addr_current->ai_next;
760 assert_status(CONNECTING, CONNECTING_RESOLVING);
761 connect_resolved();
762 }
763 else
764 {
765 assert_status(CONNECTING, CONNECTED);
766
767 if (!read_start())
768 {
769 return;
770 }
771
772 for (auto& w : pending_writes) // NOLINT(readability-qualified-auto)
773 {
774 send_write(w.req, w.len);
775 w.req = nullptr;
776 }
777
778 PendingWrites().swap(pending_writes);
779 behaviour->on_connect();
780 }
781 }
782
783 bool read_start()
784 {
785 int rc = 0;
786
787 if (
788 (rc = uv_read_start(
789 reinterpret_cast<uv_stream_t*>(&uv_handle), on_alloc, on_read)) < 0)
790 {
791 assert_status(CONNECTED, DISCONNECTED);
792 LOG_FAIL_FMT("uv_read_start failed: {}", uv_strerror(rc));
793
794 if (behaviour)
795 {
796 behaviour->on_disconnect();
797 }
798
799 return false;
800 }
801
802 return true;
803 }
804
805 static void on_alloc(
806 uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
807 {
808 static_cast<TCPImpl*>(handle->data)->on_alloc(suggested_size, buf);
809 }
810
811 void on_alloc(size_t suggested_size, uv_buf_t* buf)
812 {
813 auto alloc_size = std::min(suggested_size, max_read_size);
814
815 alloc_size = std::min(alloc_size, remaining_read_quota);
816 remaining_read_quota -= alloc_size;
817 if (alloc_size != 0)
818 {
820 "Allocating {} bytes for TCP read ({} of quota remaining)",
821 alloc_size,
822 remaining_read_quota);
823 }
824
825 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
826 buf->base = new char[alloc_size];
827 buf->len = alloc_size;
828 }
829
830 void on_free(const uv_buf_t* buf)
831 {
832 delete[] buf->base; // NOLINT(cppcoreguidelines-owning-memory)
833 }
834
835 static void on_read(uv_stream_t* handle, ssize_t sz, const uv_buf_t* buf)
836 {
837 static_cast<TCPImpl*>(handle->data)->on_read(sz, buf);
838 }
839
840 void on_read(ssize_t sz, const uv_buf_t* buf)
841 {
842 if (sz == 0)
843 {
844 on_free(buf);
845 return;
846 }
847
848 if (sz == UV_ENOBUFS)
849 {
850 if (!alloc_quota_logged)
851 {
852 LOG_DEBUG_FMT("TCP on_read reached allocation quota");
853 alloc_quota_logged = true;
854 }
855 on_free(buf);
856 return;
857 }
858
859 if (sz < 0)
860 {
861 assert_status(CONNECTED, DISCONNECTED);
862 on_free(buf);
863 uv_read_stop(reinterpret_cast<uv_stream_t*>(&uv_handle));
864
865 LOG_DEBUG_FMT("TCP on_read: {}", uv_strerror(static_cast<int>(sz)));
866 behaviour->on_disconnect();
867 return;
868 }
869
870 auto* p = reinterpret_cast<uint8_t*>(buf->base);
871 const bool read_good = behaviour->on_read(static_cast<size_t>(sz), p, {});
872
873 if (p != nullptr)
874 {
875 on_free(buf);
876 }
877
878 if (!read_good)
879 {
880 behaviour->on_disconnect();
881 return;
882 }
883 }
884
885 static void on_write(uv_write_t* req, int /*status*/)
886 {
887 free_write(req);
888 }
889
890 static void free_write(uv_write_t* req)
891 {
892 if (req == nullptr)
893 {
894 return;
895 }
896
897 auto* copy = static_cast<char*>(req->data);
898 delete[] copy; // NOLINT(cppcoreguidelines-owning-memory)
899 delete req; // NOLINT(cppcoreguidelines-owning-memory)
900 }
901
902 static void on_reconnect(uv_handle_t* handle)
903 {
904 static_cast<TCPImpl*>(handle->data)->on_reconnect();
905 }
906
907 void on_reconnect()
908 {
909 assert_status(RECONNECTING, FRESH);
910
911 if (!init())
912 {
913 assert_status(FRESH, CONNECTING_FAILED);
914 behaviour->on_connect_failed();
915 return;
916 }
917
918 if (client_addr_base != nullptr)
919 {
920 assert_status(FRESH, BINDING);
921 client_bind();
922 }
923 else
924 {
925 assert_status(FRESH, CONNECTING_RESOLVING);
926 connect_resolved();
927 }
928 }
929 };
930
932 {
933 public:
935
937 {
939 }
940 };
941
943}
static bool resolve(const std::string &host_, const std::string &service, void *ud, uv_getaddrinfo_cb cb, bool async)
Definition dns.h:19
void before_io()
Definition tcp.h:936
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
Definition tcp.h:23
void client_bind()
Definition tcp.h:184
static void on_client_resolved(uv_getaddrinfo_t *req, int rc, struct addrinfo *)
Definition tcp.h:209
bool listen(const std::string &host_, const std::string &port_, const std::optional< std::string > &name=std::nullopt)
Definition tcp.h:334
std::string get_port() const
Definition tcp.h:142
std::string get_host() const
Definition tcp.h:137
void set_behaviour(std::unique_ptr< SocketBehaviour< TCP > > b)
Definition tcp.h:132
std::optional< std::string > get_listen_name() const
Definition tcp.h:179
bool write(size_t len, const uint8_t *data, sockaddr={})
Definition tcp.h:345
void on_client_resolved(uv_getaddrinfo_t *req, int rc)
Definition tcp.h:215
std::string get_peer_name() const
Definition tcp.h:147
bool reconnect()
Definition tcp.h:275
static void reset_read_quota()
Definition tcp.h:126
void start(int64_t)
This is to mimic UDP's implementation. TCP's start is on_accept.
Definition tcp.h:236
bool connect(const std::string &host_, const std::string &port_, const std::optional< std::string > &client_host_=std::nullopt)
Definition tcp.h:238
Definition proxy.h:15
Definition proxy.h:84
uv_tcp_t uv_handle
Definition proxy.h:90
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition after_io.h:8
std::pair< std::string, std::string > addr_to_str(const sockaddr *addr, int address_family=AF_INET)
Definition socket.h:84
proxy_ptr< TCPImpl > TCP
Definition tcp.h:20
auto * handle
Definition kv_helpers.h:87
Definition configuration.h:14