76 std::unique_ptr<ringbuffer::Circuit> circuit_,
77 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory_,
78 std::unique_ptr<oversized::WriterFactory> writer_factory_,
79 size_t sig_tx_interval,
80 size_t sig_ms_interval,
81 size_t chunk_threshold,
85 circuit(
std::move(circuit_)),
86 basic_writer_factory(
std::move(basic_writer_factory_)),
87 writer_factory(
std::move(writer_factory_)),
88 work_beacon(
std::move(work_beacon_)),
90 rpcsessions(
std::make_shared<
RPCSessions>(*writer_factory, rpc_map))
92 to_host = writer_factory->create_writer_to_outside();
97 network.
tables->set_chunker(
98 std::make_shared<ccf::kv::LedgerChunker>(chunk_threshold));
101 node = std::make_unique<ccf::NodeState>(
102 *writer_factory, network, rpcsessions, curve_id);
105 context = std::make_unique<NodeContext>(node->get_node_id());
108 historical_state_cache = std::make_shared<ccf::historical::StateCache>(
111 writer_factory->create_writer_to_outside());
112 context->install_subsystem(historical_state_cache);
114 indexer = std::make_shared<ccf::indexing::Indexer>(
115 std::make_shared<ccf::indexing::HistoricalTransactionFetcher>(
116 historical_state_cache));
117 context->install_subsystem(indexer);
119 lfs_access = std::make_shared<ccf::indexing::EnclaveLFSAccess>(
120 writer_factory->create_writer_to_outside());
121 context->install_subsystem(lfs_access);
123 context->install_subsystem(std::make_shared<ccf::NodeOperation>(*node));
124 context->install_subsystem(
125 std::make_shared<ccf::GovernanceEffects>(*node));
127 context->install_subsystem(
128 std::make_shared<ccf::NetworkIdentitySubsystem>(
129 *node, network.
identity, historical_state_cache));
131 context->install_subsystem(
132 std::make_shared<ccf::NodeConfigurationSubsystem>(*node));
134 auto cpss = std::make_shared<ccf::CustomProtocolSubsystem>(*node);
135 context->install_subsystem(cpss);
136 rpcsessions->set_custom_protocol_subsystem(cpss);
138 static constexpr size_t max_interpreter_cache_size = 10;
139 auto interpreter_cache =
140 std::make_shared<ccf::js::InterpreterCache>(max_interpreter_cache_size);
141 context->install_subsystem(interpreter_cache);
143 context->install_subsystem(
144 std::make_shared<ccf::AbstractCOSESignaturesConfigSubsystem>(*node));
148 std::make_unique<ccf::MemberRpcFrontend>(network, *context));
151 std::make_unique<ccf::UserRpcFrontend>(
155 std::make_unique<ccf::NodeRpcFrontend>(network, *context));
175 std::vector<uint8_t>&& startup_snapshot,
176 std::vector<uint8_t>& node_cert,
177 std::vector<uint8_t>& service_cert)
179 start_type = start_type_;
181 rpcsessions->update_listening_interface_options(ccf_config_.
network);
185 historical_state_cache->set_soft_cache_limit(
190 const auto idle_timeout =
192 node->set_n2n_idle_timeout(idle_timeout);
200 node->create(start_type, ccf_config_, std::move(startup_snapshot));
202 catch (
const std::exception& e)
234 bp, AdminMessage::stop, [
this, &bp](
const uint8_t*,
size_t) {
236 this->worker_stop_signal.store(
true);
240 bp, AdminMessage::stop_notice, [
this](
const uint8_t*,
size_t) {
244 last_tick_time =
decltype(last_tick_time)::clock::now();
250 const auto message_counts = disp.retrieve_message_counts();
251 const auto j = disp.convert_message_counts(message_counts);
252 RINGBUFFER_WRITE_MESSAGE(
253 AdminMessage::work_stats, to_host, j.dump());
255 const auto time_now = decltype(last_tick_time)::clock::now();
257 const auto elapsed_ms =
258 std::chrono::duration_cast<std::chrono::milliseconds>(
259 time_now - last_tick_time);
260 if (elapsed_ms.count() > 0)
262 last_tick_time += elapsed_ms;
264 node->tick(elapsed_ms);
265 historical_state_cache->tick(elapsed_ms);
266 ccf::tasks::tick(elapsed_ms);
269 if (!node->is_reading_public_ledger())
271 for (auto& [actor, frontend] : rpc_map->frontends())
273 frontend->tick(elapsed_ms);
281 bp, ccf::node_inbound, [
this](
const uint8_t* data,
size_t size) {
284 node->recv_node_inbound(data, size);
286 catch (
const std::exception& e)
289 "Ignoring node_inbound message due to exception: {}", e.what());
295 ::consensus::ledger_entry_range,
296 [
this](
const uint8_t* data,
size_t size) {
297 const auto [from_seqno, to_seqno, purpose, body] =
298 ringbuffer::read_message<::consensus::ledger_entry_range>(
302 case ::consensus::LedgerRequestPurpose::Recovery:
304 if (node->is_reading_public_ledger())
306 node->recover_public_ledger_entries(body);
308 else if (node->is_reading_private_ledger())
310 node->recover_private_ledger_entries(body);
314 auto [s, _, __] = node->state();
316 "Cannot recover ledger entry: Unexpected node state {}", s);
320 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
322 historical_state_cache->handle_ledger_entries(
323 from_seqno, to_seqno, body);
335 ::consensus::ledger_no_entry_range,
336 [
this](
const uint8_t* data,
size_t size) {
337 const auto [from_seqno, to_seqno, purpose] =
338 ringbuffer::read_message<::consensus::ledger_no_entry_range>(
342 case ::consensus::LedgerRequestPurpose::Recovery:
344 node->recover_ledger_end();
347 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
349 historical_state_cache->handle_no_entry_range(
350 from_seqno, to_seqno);
362 ::consensus::snapshot_allocated,
363 [
this](
const uint8_t* data,
size_t size) {
364 const auto [snapshot_span, generation_count] =
365 ringbuffer::read_message<::consensus::snapshot_allocated>(
368 node->write_snapshot(snapshot_span, generation_count);
371 rpcsessions->register_message_handlers(bp.get_dispatcher());
375 static constexpr size_t max_messages = 256;
377 while (!bp.get_finished())
381 work_beacon->wait_for_work_with_timeout(
382 std::chrono::milliseconds(100));
385 auto read = bp.read_n(max_messages, circuit->read_from_outside());
390 size_t tasks_done = 0;
391 while (task !=
nullptr)
395 if (tasks_done >= max_messages)
399 task = job_board.get_task();
404 if (read == 0 && tasks_done == 0)
406 std::this_thread::yield();
410 LOG_INFO_FMT(
"Enclave stopped successfully. Stopping host...");