31#include <unordered_map>
37 static constexpr size_t max_open_sessions_soft_default = 1000;
38 static constexpr size_t max_open_sessions_hard_default = 1010;
41 class RPCSessions :
public std::enable_shared_from_this<RPCSessions>,
46 struct ListenInterface
48 size_t open_sessions = 0;
49 size_t peak_sessions = 0;
50 size_t max_open_sessions_soft = 0;
51 size_t max_open_sessions_hard = 0;
57 std::map<ListenInterfaceID, ListenInterface> listening_interfaces;
61 std::shared_ptr<RPCMap> rpc_map;
62 std::unordered_map<ListenInterfaceID, std::shared_ptr<::tls::Cert>> certs;
63 std::shared_ptr<CustomProtocolSubsystem> custom_protocol_subsystem =
65 std::shared_ptr<CommitCallbackSubsystem> commit_callbacks_subsystem =
71 std::pair<ListenInterfaceID, std::shared_ptr<ccf::Session>>>
73 size_t sessions_peak = 0;
77 std::atomic<ccf::tls::ConnID> next_client_session_id = -1;
79 template <
typename Base>
80 class NoMoreSessionsImpl :
public Base
83 template <
typename... Ts>
84 NoMoreSessionsImpl(Ts&&... ts) : Base(std::forward<Ts>(ts)...)
87 void handle_incoming_data_thread(std::vector<uint8_t>&& data)
override
89 Base::tls_io->recv_buffered(data.data(), data.size());
95 HTTP_STATUS_SERVICE_UNAVAILABLE,
96 ccf::errors::SessionCapExhausted,
97 "Service is currently busy and unable to serve new connections"});
100 Base::tls_io->close();
107 auto id = next_client_session_id--;
108 const auto initial = id;
110 if (next_client_session_id > 0)
112 next_client_session_id = -1;
115 while (sessions.find(
id) != sessions.end())
126 throw std::runtime_error(
127 "Exhausted all IDs for enclave client sessions");
134 ListenInterface& get_interface_from_interface_id(
137 auto it = listening_interfaces.find(
id);
138 if (it != listening_interfaces.end())
143 throw std::logic_error(
144 fmt::format(
"No RPC interface for interface ID {}",
id));
147 std::shared_ptr<ccf::Session> make_server_session(
148 const std::string& app_protocol,
151 std::unique_ptr<tls::Context>&& ctx,
154 if (app_protocol ==
"HTTP2")
156 return std::make_shared<::http::HTTP2ServerSession>(
162 parser_configuration,
165 if (app_protocol ==
"HTTP1")
167 return std::make_shared<::http::HTTPServerSession>(
173 parser_configuration,
175 commit_callbacks_subsystem);
177 if (custom_protocol_subsystem)
179 return custom_protocol_subsystem->create_session(
180 app_protocol,
id, std::move(ctx));
183 throw std::runtime_error(fmt::format(
184 "unknown protocol '{}' and custom protocol subsystem missing",
191 std::shared_ptr<RPCMap> rpc_map_) :
192 writer_factory(writer_factory),
193 rpc_map(
std::move(rpc_map_))
199 std::shared_ptr<CustomProtocolSubsystem> cpss)
201 custom_protocol_subsystem = cpss;
205 std::shared_ptr<CommitCallbackSubsystem> fcss)
207 commit_callbacks_subsystem = fcss;
212 std::lock_guard<ccf::pal::Mutex> guard(lock);
213 get_interface_from_interface_id(
id).errors.parsing++;
219 std::lock_guard<ccf::pal::Mutex> guard(lock);
220 get_interface_from_interface_id(
id).errors.request_payload_too_large++;
226 std::lock_guard<ccf::pal::Mutex> guard(lock);
227 get_interface_from_interface_id(
id).errors.request_header_too_large++;
233 std::lock_guard<ccf::pal::Mutex> guard(lock);
237 auto& li = listening_interfaces[name];
239 li.max_open_sessions_soft = interface.max_open_sessions_soft.value_or(
240 max_open_sessions_soft_default);
242 li.max_open_sessions_hard = interface.max_open_sessions_hard.value_or(
243 max_open_sessions_hard_default);
245 li.endorsement = interface.endorsement.value_or(endorsement_default);
247 li.http_configuration =
250 li.app_protocol = interface.app_protocol.value_or(
"HTTP1");
253 "Setting max open sessions on interface \"{}\" ({}) to [{}, "
254 "{}] and endorsement authority to {}",
256 interface.bind_address,
257 li.max_open_sessions_soft,
258 li.max_open_sessions_hard,
259 li.endorsement.authority);
266 std::lock_guard<ccf::pal::Mutex> guard(lock);
268 sm.
active = sessions.size();
269 sm.
peak = sessions_peak;
271 for (
const auto& [name, interface] : listening_interfaces)
274 interface.open_sessions,
275 interface.peak_sessions,
276 interface.max_open_sessions_soft,
277 interface.max_open_sessions_hard,
290 if (listening_interfaces.empty())
292 throw std::logic_error(
"No listening interface for this node");
295 return listening_interfaces.begin()->second.app_protocol;
319 auto cert = std::make_shared<::tls::Cert>(
320 nullptr, cert_, pk, std::nullopt,
false);
322 std::lock_guard<ccf::pal::Mutex> guard(lock);
324 for (
auto& [listen_interface_id, interface] : listening_interfaces)
326 if (interface.endorsement.authority == authority)
328 certs.insert_or_assign(listen_interface_id, cert);
338 std::lock_guard<ccf::pal::Mutex> guard(lock);
340 if (sessions.find(
id) != sessions.end())
342 throw std::logic_error(
343 fmt::format(
"Duplicate conn ID received inside enclave: {}",
id));
346 auto it = listening_interfaces.find(listen_interface_id);
347 if (it == listening_interfaces.end())
349 throw std::logic_error(fmt::format(
350 "Can't accept new RPC session {} - comes from unknown listening "
353 listen_interface_id));
356 auto& per_listen_interface = it->second;
360 certs.find(listen_interface_id) == certs.end())
363 "Refusing TLS session {} inside the enclave - interface {} "
364 "has no TLS certificate yet",
366 listen_interface_id);
369 ::tcp::tcp_stop, to_host,
id, std::string(
"Session refused"));
372 per_listen_interface.open_sessions >=
373 per_listen_interface.max_open_sessions_hard)
376 "Refusing TLS session {} inside the enclave - already have {} "
377 "sessions from interface {} and limit is {}",
379 per_listen_interface.open_sessions,
381 per_listen_interface.max_open_sessions_hard);
384 ::tcp::tcp_stop, to_host,
id, std::string(
"Session refused"));
387 per_listen_interface.open_sessions >=
388 per_listen_interface.max_open_sessions_soft)
391 "Soft refusing session {} (returning 503) inside the enclave - "
392 "already have {} sessions from interface {} and limit is {}",
394 per_listen_interface.open_sessions,
396 per_listen_interface.max_open_sessions_soft);
398 auto ctx = std::make_unique<::tls::Server>(certs[listen_interface_id]);
399 std::shared_ptr<Session> capped_session;
400 if (per_listen_interface.app_protocol ==
"HTTP2")
403 std::make_shared<NoMoreSessionsImpl<::http::HTTP2ServerSession>>(
409 per_listen_interface.http_configuration,
415 std::make_shared<NoMoreSessionsImpl<::http::HTTPServerSession>>(
421 per_listen_interface.http_configuration,
423 commit_callbacks_subsystem);
425 sessions.insert(std::make_pair(
426 id, std::make_pair(listen_interface_id, std::move(capped_session))));
427 per_listen_interface.open_sessions++;
428 per_listen_interface.peak_sessions = std::max(
429 per_listen_interface.peak_sessions,
430 per_listen_interface.open_sessions);
435 "Accepting a session {} inside the enclave from interface \"{}\"",
437 listen_interface_id);
442 if (per_listen_interface.app_protocol ==
"QUIC")
444 auto session = std::make_shared<QUICSessionImpl>(
445 rpc_map,
id, listen_interface_id, writer_factory);
446 sessions.insert(std::make_pair(
447 id, std::make_pair(listen_interface_id, std::move(session))));
449 else if (custom_protocol_subsystem)
455 std::make_pair(
id, std::make_pair(listen_interface_id,
nullptr)));
459 throw std::runtime_error(
460 "unknown UDP protocol and custom protocol subsystem missing");
462 per_listen_interface.open_sessions++;
463 per_listen_interface.peak_sessions = std::max(
464 per_listen_interface.peak_sessions,
465 per_listen_interface.open_sessions);
469 std::unique_ptr<tls::Context> ctx;
473 ctx = std::make_unique<nontls::PlaintextServer>();
477 ctx = std::make_unique<::tls::Server>(
478 certs[listen_interface_id],
479 per_listen_interface.app_protocol ==
"HTTP2");
482 auto session = make_server_session(
483 per_listen_interface.app_protocol,
487 per_listen_interface.http_configuration);
489 sessions.insert(std::make_pair(
490 id, std::make_pair(listen_interface_id, std::move(session))));
491 per_listen_interface.open_sessions++;
492 per_listen_interface.peak_sessions = std::max(
493 per_listen_interface.peak_sessions,
494 per_listen_interface.open_sessions);
498 sessions_peak = std::max(sessions_peak, sessions.size());
503 std::lock_guard<ccf::pal::Mutex> guard(lock);
505 auto search = sessions.find(
id);
506 if (search == sessions.end())
511 return search->second.second;
516 bool terminate_after_send,
517 std::vector<uint8_t>&& data)
override
520 if (session ==
nullptr)
522 LOG_DEBUG_FMT(
"Refusing to reply to unknown session {}",
id);
528 session->send_data(std::move(data));
530 if (terminate_after_send)
532 session->close_session();
540 std::lock_guard<ccf::pal::Mutex> guard(lock);
541 LOG_DEBUG_FMT(
"Closing a session inside the enclave: {}",
id);
542 const auto search = sessions.find(
id);
543 if (search != sessions.end())
545 auto it = listening_interfaces.find(search->second.first);
546 if (it != listening_interfaces.end())
548 it->second.open_sessions--;
550 sessions.erase(search);
561 const std::shared_ptr<::tls::Cert>& cert,
562 const std::string& app_protocol =
"HTTP1")
564 std::lock_guard<ccf::pal::Mutex> guard(lock);
565 auto ctx = std::make_unique<::tls::Client>(cert);
566 auto id = get_next_client_id();
568 LOG_DEBUG_FMT(
"Creating a new client session inside the enclave: {}",
id);
573 if (app_protocol ==
"HTTP2")
575 auto session = std::make_shared<::http::HTTP2ClientSession>(
576 id, writer_factory, std::move(ctx));
577 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
578 sessions_peak = std::max(sessions_peak, sessions.size());
581 if (app_protocol ==
"HTTP1")
583 auto session = std::make_shared<::http::HTTPClientSession>(
584 id, writer_factory, std::move(ctx));
585 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
586 sessions_peak = std::max(sessions_peak, sessions.size());
590 throw std::runtime_error(
"unsupported client application protocol");
595 std::lock_guard<ccf::pal::Mutex> guard(lock);
596 auto id = get_next_client_id();
597 auto session = std::make_shared<::http::UnencryptedHTTPClientSession>(
599 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
600 sessions_peak = std::max(sessions_peak, sessions.size());
608 disp, ::tcp::tcp_start, [
this](
const uint8_t* data,
size_t size) {
609 auto [new_tls_id, listen_interface_name] =
610 ringbuffer::read_message<::tcp::tcp_start>(data, size);
611 accept(new_tls_id, listen_interface_name);
615 disp, ::tcp::tcp_inbound, [
this](
const uint8_t* data,
size_t size) {
616 auto id = serialized::peek<ccf::tls::ConnID>(data, size);
619 if (session ==
nullptr)
622 "Ignoring tls_inbound for unknown or refused session: {}",
id);
626 session->handle_incoming_data({data, size});
630 disp, ::tcp::tcp_close, [
this](
const uint8_t* data,
size_t size) {
631 auto [id] = ringbuffer::read_message<::tcp::tcp_close>(data, size);
636 disp, udp::udp_start, [
this](
const uint8_t* data,
size_t size) {
637 auto [new_id, listen_interface_name] =
638 ringbuffer::read_message<udp::udp_start>(data, size);
639 accept(new_id, listen_interface_name,
true);
643 disp, udp::udp_inbound, [
this](
const uint8_t* data,
size_t size) {
644 auto id = serialized::peek<int64_t>(data, size);
646 std::shared_ptr<Session> session;
648 std::lock_guard<ccf::pal::Mutex> guard(lock);
650 auto search = sessions.find(
id);
651 if (search == sessions.end())
654 "Ignoring udp::udp_inbound for unknown or refused session: {}",
659 if (!search->second.second && custom_protocol_subsystem)
665 const auto& conn_id = search->first;
666 const auto& interface_id = search->second.first;
668 auto iit = listening_interfaces.find(interface_id);
669 if (iit == listening_interfaces.end())
672 "Failure to create custom protocol session because of "
673 "unknown interface '{}', ignoring udp::udp_inbound for "
680 const auto&
interface = iit->second;
682 search->second.second =
683 custom_protocol_subsystem->create_session(
684 interface.app_protocol, conn_id,
nullptr);
686 if (!search->second.second)
689 "Failure to create custom protocol session, ignoring "
690 "udp::udp_inbound for session: {}",
695 catch (
const std::exception& ex)
698 "Failure to create custom protocol session: {}", ex.what());
703 session = search->second.second;
706 session->handle_incoming_data({data, size});
Definition forwarder_types.h:14
Definition rpc_sessions.h:44
RPCSessions(ringbuffer::AbstractWriterFactory &writer_factory, std::shared_ptr< RPCMap > rpc_map_)
Definition rpc_sessions.h:189
ccf::SessionMetrics get_session_metrics()
Definition rpc_sessions.h:263
void report_request_payload_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:216
void report_request_header_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:223
std::shared_ptr< ClientSession > create_client(const std::shared_ptr<::tls::Cert > &cert, const std::string &app_protocol="HTTP1")
Definition rpc_sessions.h:560
void set_node_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:298
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_sessions.h:604
void set_network_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:304
void remove_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:538
void set_commit_callbacks_subsystem(std::shared_ptr< CommitCallbackSubsystem > fcss)
Definition rpc_sessions.h:204
void update_listening_interface_options(const ccf::NodeInfoNetwork &node_info)
Definition rpc_sessions.h:230
void set_custom_protocol_subsystem(std::shared_ptr< CustomProtocolSubsystem > cpss)
Definition rpc_sessions.h:198
bool reply_async(ccf::tls::ConnID id, bool terminate_after_send, std::vector< uint8_t > &&data) override
Definition rpc_sessions.h:514
std::shared_ptr< ClientSession > create_unencrypted_client()
Definition rpc_sessions.h:593
std::shared_ptr< Session > find_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:501
void report_parsing_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:210
void set_cert(ccf::Authority authority, const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:310
ccf::ApplicationProtocol get_app_protocol_main_interface() const
Definition rpc_sessions.h:284
void accept(ccf::tls::ConnID id, const ListenInterfaceID &listen_interface_id, bool udp=false)
Definition rpc_sessions.h:333
Definition error_reporter.h:10
Definition messaging.h:38
Definition quic_session.h:414
Definition ring_buffer_types.h:157
virtual WriterPtr create_writer_to_outside()=0
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:292
std::mutex Mutex
Definition locking.h:12
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
Definition app_interface.h:13
std::string ApplicationProtocol
Definition node_info_network.h:29
std::string ListenInterfaceID
Definition rpc_context.h:21
Authority
Definition node_info_network.h:16
@ ready
Definition tls_session.h:19
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
Definition msg_types.h:10
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition node_info_network.h:32
Definition odata_error.h:58
RpcInterfaces rpc_interfaces
RPC interfaces.
Definition node_info_network.h:151
Definition node_info_network.h:179
Definition session_metrics.h:15
Definition session_metrics.h:13
size_t peak
Definition session_metrics.h:31
std::map< std::string, PerInterface > interfaces
Definition session_metrics.h:32
size_t active
Definition session_metrics.h:30
Definition http_configuration.h:24