39#include <openssl/engine.h>
46 std::unique_ptr<ringbuffer::Circuit> circuit;
47 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory;
48 std::unique_ptr<oversized::WriterFactory> writer_factory;
51 std::shared_ptr<RPCMap> rpc_map;
52 std::shared_ptr<RPCSessions> rpcsessions;
53 std::unique_ptr<ccf::NodeState> node;
55 std::chrono::high_resolution_clock::time_point last_tick_time;
56 std::atomic<bool> worker_stop_signal =
false;
64 NodeContext(
ccf::NodeId id) : this_node(std::move(
id)) {}
66 [[nodiscard]]
ccf::NodeId get_node_id()
const override
72 std::unique_ptr<NodeContext> context =
nullptr;
74 std::shared_ptr<ccf::historical::StateCache> historical_state_cache =
76 std::shared_ptr<ccf::indexing::Indexer> indexer =
nullptr;
77 std::shared_ptr<ccf::indexing::EnclaveLFSAccess> lfs_access =
nullptr;
81 std::unique_ptr<ringbuffer::Circuit> circuit_,
82 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory_,
83 std::unique_ptr<oversized::WriterFactory> writer_factory_,
84 size_t sig_tx_interval,
85 size_t sig_ms_interval,
86 size_t chunk_threshold,
91 circuit(
std::move(circuit_)),
92 basic_writer_factory(
std::move(basic_writer_factory_)),
93 writer_factory(
std::move(writer_factory_)),
94 work_beacon(
std::move(work_beacon_)),
96 rpcsessions(
std::make_shared<
RPCSessions>(*writer_factory, rpc_map))
98 to_host = writer_factory->create_writer_to_outside();
103 network.
tables->set_chunker(
104 std::make_shared<ccf::kv::LedgerChunker>(chunk_threshold));
107 node = std::make_unique<ccf::NodeState>(
108 *writer_factory, network, rpcsessions, curve_id);
111 context = std::make_unique<NodeContext>(node->get_node_id());
114 historical_state_cache = std::make_shared<ccf::historical::StateCache>(
117 writer_factory->create_writer_to_outside());
118 context->install_subsystem(historical_state_cache);
120 indexer = std::make_shared<ccf::indexing::Indexer>(
121 std::make_shared<ccf::indexing::HistoricalTransactionFetcher>(
122 historical_state_cache));
123 context->install_subsystem(indexer);
125 lfs_access = std::make_shared<ccf::indexing::EnclaveLFSAccess>(
126 writer_factory->create_writer_to_outside());
127 context->install_subsystem(lfs_access);
129 context->install_subsystem(std::make_shared<ccf::NodeOperation>(*node));
130 context->install_subsystem(
131 std::make_shared<ccf::GovernanceEffects>(*node));
133 context->install_subsystem(
134 std::make_shared<ccf::NetworkIdentitySubsystem>(
135 *node, network.
identity, historical_state_cache));
137 context->install_subsystem(
138 std::make_shared<ccf::NodeConfigurationSubsystem>(*node));
140 auto cpss = std::make_shared<ccf::CustomProtocolSubsystem>(*node);
141 context->install_subsystem(cpss);
142 rpcsessions->set_custom_protocol_subsystem(cpss);
144 auto ledger_subsystem =
145 std::make_shared<ccf::ReadLedgerSubsystem>(ledger_);
146 context->install_subsystem(ledger_subsystem);
148 static constexpr size_t max_interpreter_cache_size = 10;
149 auto interpreter_cache =
150 std::make_shared<ccf::js::InterpreterCache>(max_interpreter_cache_size);
151 context->install_subsystem(interpreter_cache);
153 context->install_subsystem(
154 std::make_shared<ccf::AbstractCOSESignaturesConfigSubsystem>(*node));
156 auto commit_callbacks = std::make_shared<ccf::CommitCallbackSubsystem>();
157 context->install_subsystem(commit_callbacks);
158 rpcsessions->set_commit_callbacks_subsystem(commit_callbacks);
160 auto signature_cache = std::make_shared<ccf::SignatureCacheSubsystem>();
161 context->install_subsystem(signature_cache);
165 std::make_unique<ccf::MemberRpcFrontend>(network, *context));
168 std::make_unique<ccf::UserRpcFrontend>(
172 std::make_unique<ccf::NodeRpcFrontend>(network, *context));
194 std::vector<uint8_t>& node_cert,
195 std::vector<uint8_t>& service_cert)
197 start_type = start_type_;
199 rpcsessions->update_listening_interface_options(ccf_config_.
network);
203 historical_state_cache->set_soft_cache_limit(
208 const auto idle_timeout =
210 node->set_n2n_idle_timeout(idle_timeout);
217 create_info = node->create(start_type, ccf_config_);
219 catch (
const std::exception& e)
251 bp, AdminMessage::stop, [
this, &bp](
const uint8_t*,
size_t) {
253 this->worker_stop_signal.store(
true);
257 bp, AdminMessage::stop_notice, [
this](
const uint8_t*,
size_t) {
261 last_tick_time =
decltype(last_tick_time)::clock::now();
267 const auto time_now = decltype(last_tick_time)::clock::now();
269 const auto elapsed_ms =
270 std::chrono::duration_cast<std::chrono::milliseconds>(
271 time_now - last_tick_time);
272 if (elapsed_ms.count() > 0)
274 last_tick_time += elapsed_ms;
276 node->tick(elapsed_ms);
277 historical_state_cache->tick(elapsed_ms);
278 ccf::tasks::tick(elapsed_ms);
281 if (!node->is_reading_public_ledger())
283 for (auto& [actor, frontend] : rpc_map->frontends())
285 frontend->tick(elapsed_ms);
293 bp, ccf::node_inbound, [
this](
const uint8_t* data,
size_t size) {
296 node->recv_node_inbound(data, size);
298 catch (
const std::exception& e)
301 "Ignoring node_inbound message due to exception: {}", e.what());
307 ::consensus::ledger_entry_range,
308 [
this](
const uint8_t* data,
size_t size) {
309 const auto [from_seqno, to_seqno, purpose, body] =
310 ringbuffer::read_message<::consensus::ledger_entry_range>(
314 case ::consensus::LedgerRequestPurpose::Recovery:
316 if (node->is_reading_public_ledger())
318 node->recover_public_ledger_entries(body);
320 else if (node->is_reading_private_ledger())
322 node->recover_private_ledger_entries(body);
326 auto [s, _, __] = node->state();
328 "Cannot recover ledger entry: Unexpected node state {}", s);
332 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
334 historical_state_cache->handle_ledger_entries(
335 from_seqno, to_seqno, body);
347 ::consensus::ledger_no_entry_range,
348 [
this](
const uint8_t* data,
size_t size) {
349 const auto [from_seqno, to_seqno, purpose] =
350 ringbuffer::read_message<::consensus::ledger_no_entry_range>(
354 case ::consensus::LedgerRequestPurpose::Recovery:
356 node->recover_ledger_end();
359 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
361 historical_state_cache->handle_no_entry_range(
362 from_seqno, to_seqno);
374 ::consensus::snapshot_allocated,
375 [
this](
const uint8_t* data,
size_t size) {
376 const auto [snapshot_span, generation_count] =
377 ringbuffer::read_message<::consensus::snapshot_allocated>(
380 node->write_snapshot(snapshot_span, generation_count);
383 rpcsessions->register_message_handlers(bp.get_dispatcher());
387 static constexpr size_t max_messages = 256;
389 while (!bp.get_finished())
393 work_beacon->wait_for_work_with_timeout(
394 std::chrono::milliseconds(100));
397 auto read = bp.read_n(max_messages, circuit->read_from_outside());
402 size_t tasks_done = 0;
403 while (task !=
nullptr)
407 if (tasks_done >= max_messages)
411 task = job_board.get_task();
416 if (read == 0 && tasks_done == 0)
418 std::this_thread::yield();
422 LOG_INFO_FMT(
"Enclave stopped successfully. Stopping host...");
435 const auto timeout = std::chrono::milliseconds(100);
437 while (!worker_stop_signal.load())
439 auto task = job_board.wait_for_task(timeout);
CreateNodeStatus create_new_node(StartType start_type_, const ccf::StartupConfig &ccf_config_, std::vector< uint8_t > &node_cert, std::vector< uint8_t > &service_cert)
Definition enclave.h:191
~Enclave()
Definition enclave.h:186
bool run_main()
Definition enclave.h:238
bool run_worker()
Definition enclave.h:429
Enclave(std::unique_ptr< ringbuffer::Circuit > circuit_, std::unique_ptr< ringbuffer::WriterFactory > basic_writer_factory_, std::unique_ptr< oversized::WriterFactory > writer_factory_, size_t sig_tx_interval, size_t sig_ms_interval, size_t chunk_threshold, const ccf::consensus::Configuration &consensus_config, const ccf::crypto::CurveID &curve_id, ccf::ds::WorkBeaconPtr work_beacon_, asynchost::Ledger &ledger_)
Definition enclave.h:80
Definition rpc_sessions.h:44
std::vector< uint8_t > raw() const
Definition pem.h:71
Definition messaging.h:187
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:195
void set_finished(bool v=true)
Definition messaging.h:206
Definition oversized.h:30
CreateNodeStatus
Definition enclave_interface_types.h:8
@ OK
Definition enclave_interface_types.h:10
@ InternalError
Definition enclave_interface_types.h:13
StartType
Definition enclave_interface_types.h:92
@ Recover
Definition enclave_interface_types.h:95
@ Start
Definition enclave_interface_types.h:93
constexpr char const * start_type_to_str(StartType type)
Definition enclave_interface_types.h:98
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:292
CurveID
Definition curve.h:18
std::shared_ptr< WorkBeacon > WorkBeaconPtr
Definition work_beacon.h:60
JobBoard & get_main_job_board()
Definition task_system.cpp:53
void try_do_task(BaseTask &task, bool abort_on_throw=true)
Definition worker.h:22
std::shared_ptr< BaseTask > Task
Definition task.h:36
Definition app_interface.h:13
std::unique_ptr< ccf::endpoints::EndpointRegistry > make_user_endpoints(ccf::AbstractNodeContext &context)
Definition js_generic.cpp:9
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition node_context.h:12
size_t node_to_node_message_limit
Definition startup_config.h:28
ccf::NodeInfoNetwork network
Definition startup_config.h:33
ccf::ds::SizeString historical_cache_soft_limit
Definition startup_config.h:30
ccf::consensus::Configuration consensus
Definition startup_config.h:32
Definition network_state.h:12
std::shared_ptr< LedgerSecrets > ledger_secrets
Definition network_state.h:14
std::unique_ptr< NetworkIdentity > identity
Definition network_state.h:13
std::shared_ptr< ccf::kv::Store > tables
Definition network_tables.h:46
Definition node_state.h:84
ccf::crypto::Pem service_cert
Definition node_state.h:86
ccf::crypto::Pem self_signed_node_cert
Definition node_state.h:85
Definition startup_config.h:148
Definition consensus_config.h:11
ccf::ds::TimeString election_timeout
Definition consensus_config.h:13