CCF
Loading...
Searching...
No Matches
forwarder.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "enclave/rpc_map.h"
8#include "kv/kv_types.h"
9#include "node/node_to_node.h"
10#include "tasks/basic_task.h"
11#include "tasks/task_system.h"
12
13namespace ccf
14{
15 class RpcContextImpl;
16
18 {
19 public:
20 virtual ~ForwardedRpcHandler() = default;
21
22 virtual void process_forwarded(
23 std::shared_ptr<ccf::RpcContextImpl> fwd_ctx) = 0;
24 };
25
26 template <typename ChannelProxy>
28 {
29 private:
30 std::weak_ptr<ccf::AbstractRPCResponder> rpcresponder;
31 std::shared_ptr<ChannelProxy> n2n_channels;
32 std::weak_ptr<ccf::RPCMap> rpc_map;
33 NodeId self;
34
35 using ForwardedCommandId = ForwardedHeader_v2::ForwardedCommandId;
36 ForwardedCommandId next_command_id = 0;
37
38 std::unordered_map<ForwardedCommandId, ccf::tasks::Task> timeout_tasks;
39 ccf::pal::Mutex timeout_tasks_lock;
40
41 using IsCallerCertForwarded = bool;
42
43 void send_timeout_error_response(
44 NodeId to,
45 size_t client_session_id,
46 const std::chrono::milliseconds& timeout)
47 {
48 auto rpc_responder_shared = rpcresponder.lock();
49 if (rpc_responder_shared)
50 {
51 auto response = ::http::Response(HTTP_STATUS_GATEWAY_TIMEOUT);
52 auto body = fmt::format(
53 "Request was forwarded to node {}, but no response was received "
54 "after {}ms",
55 to,
56 timeout.count());
57 response.set_body(body);
58 response.set_header(
59 http::headers::CONTENT_TYPE, http::headervalues::contenttype::TEXT);
60 rpc_responder_shared->reply_async(
61 client_session_id, false, response.build_response());
62 }
63 }
64
65 public:
67 std::weak_ptr<ccf::AbstractRPCResponder> rpcresponder,
68 std::shared_ptr<ChannelProxy> n2n_channels,
69 std::weak_ptr<ccf::RPCMap> rpc_map_) :
70 rpcresponder(std::move(rpcresponder)),
71 n2n_channels(std::move(n2n_channels)),
72 rpc_map(std::move(rpc_map_))
73 {}
74
75 void initialize(const NodeId& self_)
76 {
77 self = self_;
78 }
79
81 std::shared_ptr<ccf::RpcContextImpl> rpc_ctx,
82 const NodeId& to,
83 const std::vector<uint8_t>& caller_cert,
84 const std::chrono::milliseconds& timeout) override
85 {
86 auto session_ctx = rpc_ctx->get_session_context();
87
88 IsCallerCertForwarded include_caller = false;
89 const auto& raw_request = rpc_ctx->get_serialised_request();
90 auto client_session_id = session_ctx->client_session_id;
91 size_t size = sizeof(client_session_id) + sizeof(IsCallerCertForwarded) +
92 raw_request.size();
93 if (!caller_cert.empty())
94 {
95 size += sizeof(size_t) + caller_cert.size();
96 include_caller = true;
97 }
98
99 std::vector<uint8_t> plain(size);
100 auto* data_ = plain.data();
101 auto size_ = plain.size();
102 serialized::write(data_, size_, client_session_id);
103 serialized::write(data_, size_, include_caller);
104 if (include_caller)
105 {
106 serialized::write(data_, size_, caller_cert.size());
107 serialized::write(data_, size_, caller_cert.data(), caller_cert.size());
108 }
109 serialized::write(data_, size_, raw_request.data(), raw_request.size());
110
111 ForwardedCommandId command_id = 0;
112 {
113 std::lock_guard<ccf::pal::Mutex> guard(timeout_tasks_lock);
114 command_id = next_command_id++;
115 auto task =
116 ccf::tasks::make_basic_task([this, to, client_session_id, timeout]() {
117 this->send_timeout_error_response(to, client_session_id, timeout);
118 });
119 timeout_tasks[command_id] = task;
120 ccf::tasks::add_delayed_task(task, timeout);
121 }
122
123 const auto view_opt = session_ctx->active_view;
124 if (!view_opt.has_value())
125 {
126 throw std::logic_error(
127 "Expected active_view to be set before forwarding");
128 }
129 ForwardedCommandHeader_v3 header(command_id, view_opt.value());
130
131 return n2n_channels->send_encrypted(
132 to, NodeMsgType::forwarded_msg, plain, header);
133 }
134
135 template <typename TFwdHdr>
136 std::shared_ptr<::http::HttpRpcContext> recv_forwarded_command(
137 const NodeId& from, const uint8_t* data, size_t size)
138 {
139 std::pair<TFwdHdr, std::vector<uint8_t>> r;
140 try
141 {
142 LOG_TRACE_FMT("Receiving forwarded command of {} bytes", size);
143 LOG_TRACE_FMT(" => {:02x}", fmt::join(data, data + size, ""));
144
145 r = n2n_channels->template recv_encrypted<TFwdHdr>(from, data, size);
146 }
147 catch (const std::logic_error& err)
148 {
149 LOG_FAIL_FMT("Invalid forwarded command");
150 LOG_DEBUG_FMT("Invalid forwarded command: {}", err.what());
151 return nullptr;
152 }
153
154 std::vector<uint8_t> caller_cert;
155 const auto& plain_ = r.second;
156 auto data_ = plain_.data();
157 auto size_ = plain_.size();
158 auto client_session_id = serialized::read<size_t>(data_, size_);
159 auto includes_caller =
160 serialized::read<IsCallerCertForwarded>(data_, size_);
161 if (includes_caller)
162 {
163 auto caller_size = serialized::read<size_t>(data_, size_);
164 // NOLINTNEXTLINE(readability-suspicious-call-argument)
165 caller_cert = serialized::read(data_, size_, caller_size);
166 }
167 std::vector<uint8_t> raw_request = serialized::read(data_, size_, size_);
168
169 auto session =
170 std::make_shared<ccf::SessionContext>(client_session_id, caller_cert);
171 session->is_forwarded = true;
172
173 if constexpr (std::is_same_v<TFwdHdr, ForwardedCommandHeader_v3>)
174 {
175 ccf::View view = r.first.active_view;
176 session->active_view = view;
177 }
178
179 try
180 {
182 session, raw_request, r.first.frame_format);
183 }
184 catch (const ::http::RequestTooLargeException& rexc)
185 {
186 LOG_FAIL_FMT("Forwarded request exceeded limit: {}", rexc.what());
187 return nullptr;
188 }
189 catch (const std::exception& err)
190 {
191 LOG_FAIL_FMT("Invalid forwarded request");
192 LOG_DEBUG_FMT("Invalid forwarded request: {}", err.what());
193 return nullptr;
194 }
195 }
196
197 template <typename TFwdHdr>
199 size_t client_session_id,
200 const NodeId& from_node,
201 const TFwdHdr& header,
202 const std::vector<uint8_t>& data)
203 {
204 std::vector<uint8_t> plain(sizeof(client_session_id) + data.size());
205 auto* data_ = plain.data();
206 auto size_ = plain.size();
207 serialized::write(data_, size_, client_session_id);
208 serialized::write(data_, size_, data.data(), data.size());
209
210 if (!n2n_channels->send_encrypted(
211 from_node, NodeMsgType::forwarded_msg, plain, header))
212 {
213 LOG_FAIL_FMT("Failed to send forwarded response to {}", from_node);
214 }
215 }
216
218 {
220 std::vector<uint8_t> response_body;
222 };
223
224 template <typename TFwdHdr>
225 std::optional<ForwardedResponseResult> recv_forwarded_response(
226 const NodeId& from, const uint8_t* data, size_t size)
227 {
228 std::pair<TFwdHdr, std::vector<uint8_t>> r;
229 try
230 {
231 LOG_TRACE_FMT("Receiving response of {} bytes", size);
232 LOG_TRACE_FMT(" => {:02x}", fmt::join(data, data + size, ""));
233
234 r = n2n_channels->template recv_encrypted<TFwdHdr>(from, data, size);
235 }
236 catch (const std::logic_error& err)
237 {
238 LOG_FAIL_FMT("Invalid forwarded response");
239 LOG_DEBUG_FMT("Invalid forwarded response: {}", err.what());
240 return std::nullopt;
241 }
242
244 if constexpr (std::is_same_v<TFwdHdr, ForwardedResponseHeader_v3>)
245 {
246 ret.should_terminate_session = r.first.terminate_session;
247 }
248
249 const auto& plain_ = r.second;
250 auto data_ = plain_.data();
251 auto size_ = plain_.size();
252 ret.client_session_id = serialized::read<size_t>(data_, size_);
253 ret.response_body = serialized::read(data_, size_, size_);
254
255 return ret;
256 }
257
258 std::shared_ptr<ForwardedRpcHandler> get_forwarder_handler(
259 std::shared_ptr<::http::HttpRpcContext>& ctx)
260 {
261 if (ctx == nullptr)
262 {
263 LOG_FAIL_FMT("Failed to receive forwarded command");
264 return nullptr;
265 }
266
267 std::shared_ptr<ccf::RPCMap> rpc_map_shared = rpc_map.lock();
268 if (rpc_map_shared == nullptr)
269 {
270 LOG_FAIL_FMT("Failed to obtain RPCMap");
271 return nullptr;
272 }
273
274 std::shared_ptr<ccf::RpcHandler> search =
275 ::http::fetch_rpc_handler(ctx, rpc_map_shared);
276
277 auto fwd_handler = std::dynamic_pointer_cast<ForwardedRpcHandler>(search);
278 if (!fwd_handler)
279 {
281 "Failed to process forwarded command: handler is not a "
282 "ForwardedRpcHandler");
283 return nullptr;
284 }
285
286 return fwd_handler;
287 }
288
289 void recv_message(const ccf::NodeId& from, const uint8_t* data, size_t size)
290 {
291 try
292 {
293 const auto forwarded_msg = serialized::peek<ForwardedMsg>(data, size);
295 "recv_message({}, {} bytes) (type={})",
296 from,
297 size,
298 (size_t)forwarded_msg);
299
300 switch (forwarded_msg)
301 {
303 {
304 auto ctx =
305 recv_forwarded_command<ForwardedHeader_v1>(from, data, size);
306
307 auto fwd_handler = get_forwarder_handler(ctx);
308 if (fwd_handler == nullptr)
309 {
310 return;
311 }
312
313 // frame_format is deliberately unset, the forwarder ignores it
314 // and expects the same format they forwarded.
315 ForwardedHeader_v1 response_header{
317
318 LOG_DEBUG_FMT("Sending forwarded response to {}", from);
319 fwd_handler->process_forwarded(ctx);
320
322 ctx->get_session_context()->client_session_id,
323 from,
324 response_header,
325 ctx->serialise_response());
326 break;
327 }
328
330 {
331 auto ctx =
332 recv_forwarded_command<ForwardedHeader_v2>(from, data, size);
333
334 auto fwd_handler = get_forwarder_handler(ctx);
335 if (fwd_handler == nullptr)
336 {
337 return;
338 }
339
340 const auto forwarded_hdr_v2 =
341 serialized::peek<ForwardedHeader_v2>(data, size);
342 const auto cmd_id = forwarded_hdr_v2.id;
343
344 fwd_handler->process_forwarded(ctx);
345
346 // frame_format is deliberately unset, the forwarder ignores it
347 // and expects the same format they forwarded.
348 ForwardedHeader_v2 response_header{
350
351 LOG_DEBUG_FMT("Sending forwarded response to {}", from);
352
354 ctx->get_session_context()->client_session_id,
355 from,
356 response_header,
357 ctx->serialise_response());
358 break;
359 }
360
362 {
363 auto ctx = recv_forwarded_command<ForwardedCommandHeader_v3>(
364 from, data, size);
365
366 auto fwd_handler = get_forwarder_handler(ctx);
367 if (fwd_handler == nullptr)
368 {
369 return;
370 }
371
372 const auto forwarded_hdr_v3 =
373 serialized::peek<ForwardedCommandHeader_v3>(data, size);
374 const auto cmd_id = forwarded_hdr_v3.id;
375
376 fwd_handler->process_forwarded(ctx);
377
378 // frame_format is deliberately unset, the forwarder ignores it
379 // and expects the same format they forwarded.
380 ForwardedResponseHeader_v3 response_header(
381 cmd_id, ctx->terminate_session);
382
383 LOG_DEBUG_FMT("Sending forwarded response to {}", from);
384
386 ctx->get_session_context()->client_session_id,
387 from,
388 response_header,
389 ctx->serialise_response());
390 break;
391 }
392
395 {
396 const auto forwarded_hdr_v2 =
397 serialized::peek<ForwardedHeader_v2>(data, size);
398 const auto cmd_id = forwarded_hdr_v2.id;
399
400 // Cancel and delete the corresponding timeout task, so it will no
401 // longer trigger a timeout error
402 std::lock_guard<ccf::pal::Mutex> guard(timeout_tasks_lock);
403 auto it = timeout_tasks.find(cmd_id);
404 if (it != timeout_tasks.end())
405 {
406 it->second->cancel_task();
407 it = timeout_tasks.erase(it);
408 }
409 else
410 {
412 "Response for {} received too late - already sent timeout "
413 "error to client",
414 cmd_id);
415 return;
416 }
417 // Deliberate fall-through
418 }
419
421 {
422 std::optional<ForwardedResponseResult> rep;
424 {
425 rep = recv_forwarded_response<ForwardedResponseHeader_v3>(
426 from, data, size);
427 }
429 {
430 rep =
431 recv_forwarded_response<ForwardedHeader_v2>(from, data, size);
432 }
433 else
434 {
435 rep =
436 recv_forwarded_response<ForwardedHeader_v1>(from, data, size);
437 }
438
439 if (!rep.has_value())
440 {
441 return;
442 }
443
445 "Sending forwarded response to RPC endpoint {}",
446 rep->client_session_id);
447
448 auto rpc_responder_shared = rpcresponder.lock();
449 if (
450 rpc_responder_shared &&
451 !rpc_responder_shared->reply_async(
452 rep->client_session_id,
453 rep->should_terminate_session,
454 std::move(rep->response_body)))
455 {
456 return;
457 }
458
459 break;
460 }
461
462 default:
463 {
464 LOG_FAIL_FMT("Unknown frontend msg type: {}", forwarded_msg);
465 break;
466 }
467 }
468 }
469 catch (const std::exception& e)
470 {
471 LOG_FAIL_FMT("Exception in {}", __PRETTY_FUNCTION__);
472 LOG_DEBUG_FMT("Error: {}", e.what());
473 return;
474 }
475 }
476 };
477}
Definition forwarder_types.h:22
Definition forwarder.h:18
virtual ~ForwardedRpcHandler()=default
virtual void process_forwarded(std::shared_ptr< ccf::RpcContextImpl > fwd_ctx)=0
Definition forwarder.h:28
void initialize(const NodeId &self_)
Definition forwarder.h:75
std::shared_ptr<::http::HttpRpcContext > recv_forwarded_command(const NodeId &from, const uint8_t *data, size_t size)
Definition forwarder.h:136
void recv_message(const ccf::NodeId &from, const uint8_t *data, size_t size)
Definition forwarder.h:289
Forwarder(std::weak_ptr< ccf::AbstractRPCResponder > rpcresponder, std::shared_ptr< ChannelProxy > n2n_channels, std::weak_ptr< ccf::RPCMap > rpc_map_)
Definition forwarder.h:66
void send_forwarded_response(size_t client_session_id, const NodeId &from_node, const TFwdHdr &header, const std::vector< uint8_t > &data)
Definition forwarder.h:198
std::shared_ptr< ForwardedRpcHandler > get_forwarder_handler(std::shared_ptr<::http::HttpRpcContext > &ctx)
Definition forwarder.h:258
std::optional< ForwardedResponseResult > recv_forwarded_response(const NodeId &from, const uint8_t *data, size_t size)
Definition forwarder.h:225
bool forward_command(std::shared_ptr< ccf::RpcContextImpl > rpc_ctx, const NodeId &to, const std::vector< uint8_t > &caller_cert, const std::chrono::milliseconds &timeout) override
Definition forwarder.h:80
Definition http_builder.h:200
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
std::mutex Mutex
Definition locking.h:12
Task make_basic_task(Ts &&... ts)
Definition basic_task.h:33
void add_delayed_task(Task task, std::chrono::milliseconds delay)
Definition task_system.cpp:70
Definition app_interface.h:13
std::shared_ptr<::http::HttpRpcContext > make_fwd_rpc_context(std::shared_ptr< ccf::SessionContext > s, const std::vector< uint8_t > &packed, ccf::FrameFormat frame_format)
Definition http_rpc_context.h:391
@ forwarded_cmd_v3
Definition node_types.h:52
@ forwarded_cmd_v2
Definition node_types.h:46
@ forwarded_response_v1
Definition node_types.h:42
@ forwarded_response_v3
Definition node_types.h:53
@ forwarded_response_v2
Definition node_types.h:47
@ forwarded_cmd_v1
Definition node_types.h:41
uint64_t View
Definition tx_id.h:23
@ forwarded_msg
Definition node_types.h:24
void write(uint8_t *&data, size_t &size, const T &v)
Definition serialized.h:105
T read(const uint8_t *&data, size_t &size)
Definition serialized.h:58
STL namespace.
Definition node_types.h:80
Definition node_types.h:68
Definition node_types.h:74
size_t ForwardedCommandId
Definition node_types.h:75
Definition node_types.h:96
Definition forwarder.h:218
size_t client_session_id
Definition forwarder.h:219
std::vector< uint8_t > response_body
Definition forwarder.h:220
bool should_terminate_session
Definition forwarder.h:221