CCF
Loading...
Searching...
No Matches
enclave.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4#include "ccf/app_interface.h"
6#include "ccf/node_context.h"
8#include "ccf/pal/mem.h"
10#include "ds/internal_logger.h"
11#include "ds/oversized.h"
12#include "ds/work_beacon.h"
13#include "host/ledger.h"
16#include "interface.h"
18#include "kv/ledger_chunker.h"
21#include "node/network_state.h"
22#include "node/node_state.h"
23#include "node/node_types.h"
26#include "node/rpc/forwarder.h"
35#include "rpc_map.h"
36#include "rpc_sessions.h"
37#include "tasks/worker.h"
38
39#include <openssl/engine.h>
40
41namespace ccf
42{
43 class Enclave
44 {
45 private:
46 std::unique_ptr<ringbuffer::Circuit> circuit;
47 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory;
48 std::unique_ptr<oversized::WriterFactory> writer_factory;
49 ccf::ds::WorkBeaconPtr work_beacon;
50 ccf::NetworkState network;
51 std::shared_ptr<RPCMap> rpc_map;
52 std::shared_ptr<RPCSessions> rpcsessions;
53 std::unique_ptr<ccf::NodeState> node;
54 ringbuffer::WriterPtr to_host = nullptr;
55 std::chrono::high_resolution_clock::time_point last_tick_time;
56 std::atomic<bool> worker_stop_signal = false;
57
58 StartType start_type{};
59
60 struct NodeContext : public ccf::AbstractNodeContext
61 {
62 const ccf::NodeId this_node;
63
64 NodeContext(ccf::NodeId id) : this_node(std::move(id)) {}
65
66 [[nodiscard]] ccf::NodeId get_node_id() const override
67 {
68 return this_node;
69 }
70 };
71
72 std::unique_ptr<NodeContext> context = nullptr;
73
74 std::shared_ptr<ccf::historical::StateCache> historical_state_cache =
75 nullptr;
76 std::shared_ptr<ccf::indexing::Indexer> indexer = nullptr;
77 std::shared_ptr<ccf::indexing::EnclaveLFSAccess> lfs_access = nullptr;
78
79 public:
81 std::unique_ptr<ringbuffer::Circuit> circuit_,
82 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory_,
83 std::unique_ptr<oversized::WriterFactory> writer_factory_,
84 size_t sig_tx_interval,
85 size_t sig_ms_interval,
86 size_t chunk_threshold,
87 const ccf::consensus::Configuration& consensus_config,
88 const ccf::crypto::CurveID& curve_id,
89 ccf::ds::WorkBeaconPtr work_beacon_,
90 asynchost::Ledger& ledger_) :
91 circuit(std::move(circuit_)),
92 basic_writer_factory(std::move(basic_writer_factory_)),
93 writer_factory(std::move(writer_factory_)),
94 work_beacon(std::move(work_beacon_)),
95 rpc_map(std::make_shared<RPCMap>()),
96 rpcsessions(std::make_shared<RPCSessions>(*writer_factory, rpc_map))
97 {
98 to_host = writer_factory->create_writer_to_outside();
99
100 LOG_TRACE_FMT("Creating ledger secrets");
101 network.ledger_secrets = std::make_shared<ccf::LedgerSecrets>();
102
103 network.tables->set_chunker(
104 std::make_shared<ccf::kv::LedgerChunker>(chunk_threshold));
105
106 LOG_TRACE_FMT("Creating node");
107 node = std::make_unique<ccf::NodeState>(
108 *writer_factory, network, rpcsessions, curve_id);
109
110 LOG_TRACE_FMT("Creating context");
111 context = std::make_unique<NodeContext>(node->get_node_id());
112
113 LOG_TRACE_FMT("Creating context subsystems");
114 historical_state_cache = std::make_shared<ccf::historical::StateCache>(
115 *network.tables,
116 network.ledger_secrets,
117 writer_factory->create_writer_to_outside());
118 context->install_subsystem(historical_state_cache);
119
120 indexer = std::make_shared<ccf::indexing::Indexer>(
121 std::make_shared<ccf::indexing::HistoricalTransactionFetcher>(
122 historical_state_cache));
123 context->install_subsystem(indexer);
124
125 lfs_access = std::make_shared<ccf::indexing::EnclaveLFSAccess>(
126 writer_factory->create_writer_to_outside());
127 context->install_subsystem(lfs_access);
128
129 context->install_subsystem(std::make_shared<ccf::NodeOperation>(*node));
130 context->install_subsystem(
131 std::make_shared<ccf::GovernanceEffects>(*node));
132
133 context->install_subsystem(
134 std::make_shared<ccf::NetworkIdentitySubsystem>(
135 *node, network.identity, historical_state_cache));
136
137 context->install_subsystem(
138 std::make_shared<ccf::NodeConfigurationSubsystem>(*node));
139
140 auto cpss = std::make_shared<ccf::CustomProtocolSubsystem>(*node);
141 context->install_subsystem(cpss);
142 rpcsessions->set_custom_protocol_subsystem(cpss);
143
144 auto ledger_subsystem =
145 std::make_shared<ccf::ReadLedgerSubsystem>(ledger_);
146 context->install_subsystem(ledger_subsystem);
147
148 static constexpr size_t max_interpreter_cache_size = 10;
149 auto interpreter_cache =
150 std::make_shared<ccf::js::InterpreterCache>(max_interpreter_cache_size);
151 context->install_subsystem(interpreter_cache);
152
153 context->install_subsystem(
154 std::make_shared<ccf::AbstractCOSESignaturesConfigSubsystem>(*node));
155
156 auto commit_callbacks = std::make_shared<ccf::CommitCallbackSubsystem>();
157 context->install_subsystem(commit_callbacks);
158 rpcsessions->set_commit_callbacks_subsystem(commit_callbacks);
159
160 auto signature_cache = std::make_shared<ccf::SignatureCacheSubsystem>();
161 context->install_subsystem(signature_cache);
162
163 LOG_TRACE_FMT("Creating RPC actors / ffi");
164 rpc_map->register_frontend<ccf::ActorsType::members>(
165 std::make_unique<ccf::MemberRpcFrontend>(network, *context));
166
167 rpc_map->register_frontend<ccf::ActorsType::users>(
168 std::make_unique<ccf::UserRpcFrontend>(
169 network, ccf::make_user_endpoints(*context), *context));
170
171 rpc_map->register_frontend<ccf::ActorsType::nodes>(
172 std::make_unique<ccf::NodeRpcFrontend>(network, *context));
173
174 LOG_TRACE_FMT("Initialize node");
175 node->initialize(
176 consensus_config,
177 rpc_map,
178 rpcsessions,
179 indexer,
180 commit_callbacks,
181 signature_cache,
182 sig_tx_interval,
183 sig_ms_interval);
184 }
185
187 {
188 LOG_TRACE_FMT("Shutting down enclave");
189 }
190
192 StartType start_type_,
193 const ccf::StartupConfig& ccf_config_,
194 std::vector<uint8_t>& node_cert,
195 std::vector<uint8_t>& service_cert)
196 {
197 start_type = start_type_;
198
199 rpcsessions->update_listening_interface_options(ccf_config_.network);
200
201 node->set_n2n_message_limit(ccf_config_.node_to_node_message_limit);
202
203 historical_state_cache->set_soft_cache_limit(
204 ccf_config_.historical_cache_soft_limit);
205
206 // If we haven't heard from a node for multiple elections, then cleanup
207 // their node-to-node channel
208 const auto idle_timeout =
209 std::chrono::milliseconds(ccf_config_.consensus.election_timeout) * 4;
210 node->set_n2n_idle_timeout(idle_timeout);
211
212 ccf::NodeCreateInfo create_info;
213 try
214 {
216 "Creating node with start_type {}", start_type_to_str(start_type));
217 create_info = node->create(start_type, ccf_config_);
218 }
219 catch (const std::exception& e)
220 {
221 LOG_FAIL_FMT("Error starting node: {}", e.what());
223 }
224
225 // Copy node and service certs out
226 node_cert = create_info.self_signed_node_cert.raw();
227
228 if (start_type == StartType::Start || start_type == StartType::Recover)
229 {
230 // When starting a node in start or recover modes, fresh network secrets
231 // are created and the associated certificate can be passed to the host
232 service_cert = create_info.service_cert.raw();
233 }
234
236 }
237
238 bool run_main()
239 {
240 LOG_DEBUG_FMT("Running main thread");
241
242 {
243 messaging::BufferProcessor bp("Enclave");
244
245 // reconstruct oversized messages sent to the enclave
247
248 lfs_access->register_message_handlers(bp.get_dispatcher());
249
251 bp, AdminMessage::stop, [this, &bp](const uint8_t*, size_t) {
252 bp.set_finished();
253 this->worker_stop_signal.store(true);
254 });
255
257 bp, AdminMessage::stop_notice, [this](const uint8_t*, size_t) {
258 node->stop_notice();
259 });
260
261 last_tick_time = decltype(last_tick_time)::clock::now();
262
264 bp,
265 AdminMessage::tick,
266 [this, &disp = bp.get_dispatcher()](const uint8_t*, size_t) {
267 const auto time_now = decltype(last_tick_time)::clock::now();
268
269 const auto elapsed_ms =
270 std::chrono::duration_cast<std::chrono::milliseconds>(
271 time_now - last_tick_time);
272 if (elapsed_ms.count() > 0)
273 {
274 last_tick_time += elapsed_ms;
275
276 node->tick(elapsed_ms);
277 historical_state_cache->tick(elapsed_ms);
278 ccf::tasks::tick(elapsed_ms);
279 // When recovering, no signature should be emitted while the
280 // public ledger is being read
281 if (!node->is_reading_public_ledger())
282 {
283 for (auto& [actor, frontend] : rpc_map->frontends())
284 {
285 frontend->tick(elapsed_ms);
286 }
287 }
288 node->tick_end();
289 }
290 });
291
293 bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
294 try
295 {
296 node->recv_node_inbound(data, size);
297 }
298 catch (const std::exception& e)
299 {
301 "Ignoring node_inbound message due to exception: {}", e.what());
302 }
303 });
304
306 bp,
307 ::consensus::ledger_entry_range,
308 [this](const uint8_t* data, size_t size) {
309 const auto [from_seqno, to_seqno, purpose, body] =
310 ringbuffer::read_message<::consensus::ledger_entry_range>(
311 data, size);
312 switch (purpose)
313 {
314 case ::consensus::LedgerRequestPurpose::Recovery:
315 {
316 if (node->is_reading_public_ledger())
317 {
318 node->recover_public_ledger_entries(body);
319 }
320 else if (node->is_reading_private_ledger())
321 {
322 node->recover_private_ledger_entries(body);
323 }
324 else
325 {
326 auto [s, _, __] = node->state();
328 "Cannot recover ledger entry: Unexpected node state {}", s);
329 }
330 break;
331 }
332 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
333 {
334 historical_state_cache->handle_ledger_entries(
335 from_seqno, to_seqno, body);
336 break;
337 }
338 default:
339 {
340 LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
341 }
342 }
343 });
344
346 bp,
347 ::consensus::ledger_no_entry_range,
348 [this](const uint8_t* data, size_t size) {
349 const auto [from_seqno, to_seqno, purpose] =
350 ringbuffer::read_message<::consensus::ledger_no_entry_range>(
351 data, size);
352 switch (purpose)
353 {
354 case ::consensus::LedgerRequestPurpose::Recovery:
355 {
356 node->recover_ledger_end();
357 break;
358 }
359 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
360 {
361 historical_state_cache->handle_no_entry_range(
362 from_seqno, to_seqno);
363 break;
364 }
365 default:
366 {
367 LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
368 }
369 }
370 });
371
373 bp,
374 ::consensus::snapshot_allocated,
375 [this](const uint8_t* data, size_t size) {
376 const auto [snapshot_span, generation_count] =
377 ringbuffer::read_message<::consensus::snapshot_allocated>(
378 data, size);
379
380 node->write_snapshot(snapshot_span, generation_count);
381 });
382
383 rpcsessions->register_message_handlers(bp.get_dispatcher());
384
385 // Maximum number of inbound ringbuffer messages which will be
386 // processed in a single iteration
387 static constexpr size_t max_messages = 256;
388
389 while (!bp.get_finished())
390 {
391 // Wait until the host indicates that some ringbuffer messages are
392 // available, but wake at least every 100ms to check thread messages
393 work_beacon->wait_for_work_with_timeout(
394 std::chrono::milliseconds(100));
395
396 // First, read some messages from the ringbuffer
397 auto read = bp.read_n(max_messages, circuit->read_from_outside());
398
399 // Then, execute some tasks
400 auto& job_board = ccf::tasks::get_main_job_board();
401 ccf::tasks::Task task = job_board.get_task();
402 size_t tasks_done = 0;
403 while (task != nullptr)
404 {
406 ++tasks_done;
407 if (tasks_done >= max_messages)
408 {
409 break;
410 }
411 task = job_board.get_task();
412 }
413
414 // If no messages were read from the ringbuffer and tasks were
415 // executed, idle
416 if (read == 0 && tasks_done == 0)
417 {
418 std::this_thread::yield();
419 }
420 }
421
422 LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
423 RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, to_host);
424
425 return true;
426 }
427 }
428
430 {
431 LOG_DEBUG_FMT("Running worker thread");
432
433 {
434 auto& job_board = ccf::tasks::get_main_job_board();
435 const auto timeout = std::chrono::milliseconds(100);
436
437 while (!worker_stop_signal.load())
438 {
439 auto task = job_board.wait_for_task(timeout);
440 if (task != nullptr)
441 {
443 }
444 }
445 }
446
447 return true;
448 }
449 };
450}
Definition ledger.h:749
Definition enclave.h:44
CreateNodeStatus create_new_node(StartType start_type_, const ccf::StartupConfig &ccf_config_, std::vector< uint8_t > &node_cert, std::vector< uint8_t > &service_cert)
Definition enclave.h:191
~Enclave()
Definition enclave.h:186
bool run_main()
Definition enclave.h:238
bool run_worker()
Definition enclave.h:429
Enclave(std::unique_ptr< ringbuffer::Circuit > circuit_, std::unique_ptr< ringbuffer::WriterFactory > basic_writer_factory_, std::unique_ptr< oversized::WriterFactory > writer_factory_, size_t sig_tx_interval, size_t sig_ms_interval, size_t chunk_threshold, const ccf::consensus::Configuration &consensus_config, const ccf::crypto::CurveID &curve_id, ccf::ds::WorkBeaconPtr work_beacon_, asynchost::Ledger &ledger_)
Definition enclave.h:80
Definition rpc_map.h:11
Definition rpc_sessions.h:44
std::vector< uint8_t > raw() const
Definition pem.h:71
Definition messaging.h:187
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:195
void set_finished(bool v=true)
Definition messaging.h:206
Definition oversized.h:30
CreateNodeStatus
Definition enclave_interface_types.h:8
@ OK
Definition enclave_interface_types.h:10
@ InternalError
Definition enclave_interface_types.h:13
StartType
Definition enclave_interface_types.h:92
@ Recover
Definition enclave_interface_types.h:95
@ Start
Definition enclave_interface_types.h:93
constexpr char const * start_type_to_str(StartType type)
Definition enclave_interface_types.h:98
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:292
CurveID
Definition curve.h:18
std::shared_ptr< WorkBeacon > WorkBeaconPtr
Definition work_beacon.h:60
JobBoard & get_main_job_board()
Definition task_system.cpp:53
void try_do_task(BaseTask &task, bool abort_on_throw=true)
Definition worker.h:22
std::shared_ptr< BaseTask > Task
Definition task.h:36
Definition app_interface.h:13
std::unique_ptr< ccf::endpoints::EndpointRegistry > make_user_endpoints(ccf::AbstractNodeContext &context)
Definition js_generic.cpp:9
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
STL namespace.
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition node_context.h:12
size_t node_to_node_message_limit
Definition startup_config.h:28
ccf::NodeInfoNetwork network
Definition startup_config.h:33
ccf::ds::SizeString historical_cache_soft_limit
Definition startup_config.h:30
ccf::consensus::Configuration consensus
Definition startup_config.h:32
Definition network_state.h:12
std::shared_ptr< LedgerSecrets > ledger_secrets
Definition network_state.h:14
std::unique_ptr< NetworkIdentity > identity
Definition network_state.h:13
std::shared_ptr< ccf::kv::Store > tables
Definition network_tables.h:46
Definition node_state.h:84
ccf::crypto::Pem service_cert
Definition node_state.h:86
ccf::crypto::Pem self_signed_node_cert
Definition node_state.h:85
Definition startup_config.h:148
Definition consensus_config.h:11
ccf::ds::TimeString election_timeout
Definition consensus_config.h:13