38#include <openssl/engine.h>
45 std::unique_ptr<ringbuffer::Circuit> circuit;
46 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory;
47 std::unique_ptr<oversized::WriterFactory> writer_factory;
50 std::shared_ptr<RPCMap> rpc_map;
51 std::shared_ptr<RPCSessions> rpcsessions;
52 std::unique_ptr<ccf::NodeState> node;
54 std::chrono::high_resolution_clock::time_point last_tick_time;
62 NodeContext(
const ccf::NodeId&
id) : this_node(
id) {}
70 std::unique_ptr<NodeContext> context =
nullptr;
72 std::shared_ptr<ccf::historical::StateCache> historical_state_cache =
74 std::shared_ptr<ccf::indexing::Indexer> indexer =
nullptr;
75 std::shared_ptr<ccf::indexing::EnclaveLFSAccess> lfs_access =
nullptr;
76 std::shared_ptr<ccf::HostProcesses> host_processes =
nullptr;
80 std::unique_ptr<ringbuffer::Circuit> circuit_,
81 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory_,
82 std::unique_ptr<oversized::WriterFactory> writer_factory_,
83 size_t sig_tx_interval,
84 size_t sig_ms_interval,
85 size_t chunk_threshold,
89 circuit(
std::move(circuit_)),
90 basic_writer_factory(
std::move(basic_writer_factory_)),
91 writer_factory(
std::move(writer_factory_)),
92 work_beacon(work_beacon_),
95 rpcsessions(
std::make_shared<
RPCSessions>(*writer_factory, rpc_map))
99 to_host = writer_factory->create_writer_to_outside();
104 network.
tables->set_chunker(
105 std::make_shared<ccf::kv::LedgerChunker>(chunk_threshold));
108 node = std::make_unique<ccf::NodeState>(
109 *writer_factory, network, rpcsessions, curve_id);
112 context = std::make_unique<NodeContext>(node->get_node_id());
115 historical_state_cache = std::make_shared<ccf::historical::StateCache>(
118 writer_factory->create_writer_to_outside());
119 context->install_subsystem(historical_state_cache);
121 indexer = std::make_shared<ccf::indexing::Indexer>(
122 std::make_shared<ccf::indexing::HistoricalTransactionFetcher>(
123 historical_state_cache));
124 context->install_subsystem(indexer);
126 lfs_access = std::make_shared<ccf::indexing::EnclaveLFSAccess>(
127 writer_factory->create_writer_to_outside());
128 context->install_subsystem(lfs_access);
130 context->install_subsystem(std::make_shared<ccf::HostProcesses>(*node));
131 context->install_subsystem(std::make_shared<ccf::NodeOperation>(*node));
132 context->install_subsystem(
133 std::make_shared<ccf::GovernanceEffects>(*node));
135 context->install_subsystem(
136 std::make_shared<ccf::NetworkIdentitySubsystem>(
139 context->install_subsystem(
140 std::make_shared<ccf::NodeConfigurationSubsystem>(*node));
142 context->install_subsystem(std::make_shared<ccf::ACMESubsystem>(*node));
144 auto cpss = std::make_shared<ccf::CustomProtocolSubsystem>(*node);
145 context->install_subsystem(cpss);
146 rpcsessions->set_custom_protocol_subsystem(cpss);
148 static constexpr size_t max_interpreter_cache_size = 10;
149 auto interpreter_cache =
150 std::make_shared<ccf::js::InterpreterCache>(max_interpreter_cache_size);
151 context->install_subsystem(interpreter_cache);
153 context->install_subsystem(
154 std::make_shared<ccf::AbstractCOSESignaturesConfigSubsystem>(*node));
158 std::make_unique<ccf::MemberRpcFrontend>(network, *context));
161 std::make_unique<ccf::UserRpcFrontend>(
165 std::make_unique<ccf::NodeRpcFrontend>(network, *context));
171 std::make_unique<ccf::ACMERpcFrontend>(network, *context));
192 std::vector<uint8_t>&& startup_snapshot,
193 std::vector<uint8_t>& node_cert,
194 std::vector<uint8_t>& service_cert)
196 start_type = start_type_;
198 rpcsessions->update_listening_interface_options(ccf_config_.
network);
202 historical_state_cache->set_soft_cache_limit(
207 const auto idle_timeout =
209 node->set_n2n_idle_timeout(idle_timeout);
217 node->create(start_type, ccf_config_, std::move(startup_snapshot));
219 catch (
const std::exception& e)
252 bp, AdminMessage::stop, [&bp](
const uint8_t*,
size_t) {
258 bp, AdminMessage::stop_notice, [
this](
const uint8_t*,
size_t) {
262 last_tick_time =
decltype(last_tick_time)::clock::now();
268 const auto message_counts = disp.retrieve_message_counts();
269 const auto j = disp.convert_message_counts(message_counts);
270 RINGBUFFER_WRITE_MESSAGE(
271 AdminMessage::work_stats, to_host, j.dump());
273 const auto time_now = decltype(last_tick_time)::clock::now();
275 const auto elapsed_ms =
276 std::chrono::duration_cast<std::chrono::milliseconds>(
277 time_now - last_tick_time);
278 if (elapsed_ms.count() > 0)
280 last_tick_time += elapsed_ms;
282 node->tick(elapsed_ms);
283 historical_state_cache->tick(elapsed_ms);
284 ::threading::ThreadMessaging::instance().tick(elapsed_ms);
287 if (!node->is_reading_public_ledger())
289 for (auto& [actor, frontend] : rpc_map->frontends())
291 frontend->tick(elapsed_ms);
299 bp, ccf::node_inbound, [
this](
const uint8_t* data,
size_t size) {
302 node->recv_node_inbound(data, size);
304 catch (
const std::exception& e)
307 "Ignoring node_inbound message due to exception: {}", e.what());
313 ::consensus::ledger_entry_range,
314 [
this](
const uint8_t* data,
size_t size) {
315 const auto [from_seqno, to_seqno, purpose, body] =
316 ringbuffer::read_message<::consensus::ledger_entry_range>(
320 case ::consensus::LedgerRequestPurpose::Recovery:
322 if (node->is_reading_public_ledger())
324 node->recover_public_ledger_entries(body);
326 else if (node->is_reading_private_ledger())
328 node->recover_private_ledger_entries(body);
332 auto [s, _, __] = node->state();
334 "Cannot recover ledger entry: Unexpected node state {}", s);
338 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
340 historical_state_cache->handle_ledger_entries(
341 from_seqno, to_seqno, body);
353 ::consensus::ledger_no_entry_range,
354 [
this](
const uint8_t* data,
size_t size) {
355 const auto [from_seqno, to_seqno, purpose] =
356 ringbuffer::read_message<::consensus::ledger_no_entry_range>(
360 case ::consensus::LedgerRequestPurpose::Recovery:
362 node->recover_ledger_end();
365 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
367 historical_state_cache->handle_no_entry_range(
368 from_seqno, to_seqno);
380 ::consensus::snapshot_allocated,
381 [
this](
const uint8_t* data,
size_t size) {
382 const auto [snapshot_span, generation_count] =
383 ringbuffer::read_message<::consensus::snapshot_allocated>(
386 node->write_snapshot(snapshot_span, generation_count);
389 rpcsessions->register_message_handlers(bp.get_dispatcher());
393 static constexpr size_t max_messages = 256;
395 while (!bp.get_finished())
399 work_beacon->wait_for_work_with_timeout(
400 std::chrono::milliseconds(100));
403 auto read = bp.read_n(max_messages, circuit->read_from_outside());
406 size_t thread_msg = 0;
407 while (thread_msg < max_messages &&
415 if (read == 0 && thread_msg == 0)
417 std::this_thread::yield();
421 LOG_INFO_FMT(
"Enclave stopped successfully. Stopping host...");
446 auto msg = std::make_unique<::threading::Tmsg<Msg>>(&init_thread_cb);
449 msg->data.tid, std::move(msg));
static void init_thread_cb(std::unique_ptr<::threading::Tmsg< Msg > > msg)
Definition enclave.h:435
CreateNodeStatus create_new_node(StartType start_type_, const ccf::StartupConfig &ccf_config_, std::vector< uint8_t > &&startup_snapshot, std::vector< uint8_t > &node_cert, std::vector< uint8_t > &service_cert)
Definition enclave.h:189
Enclave(std::unique_ptr< ringbuffer::Circuit > circuit_, std::unique_ptr< ringbuffer::WriterFactory > basic_writer_factory_, std::unique_ptr< oversized::WriterFactory > writer_factory_, size_t sig_tx_interval, size_t sig_ms_interval, size_t chunk_threshold, const ccf::consensus::Configuration &consensus_config, const ccf::crypto::CurveID &curve_id, const ccf::ds::WorkBeaconPtr &work_beacon_)
Definition enclave.h:79
~Enclave()
Definition enclave.h:183
bool run_main()
Definition enclave.h:238
bool run_worker()
Definition enclave.h:440
Definition rpc_sessions.h:46
std::vector< uint8_t > raw() const
Definition pem.h:71
Definition messaging.h:211
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:219
void set_finished(bool v=true)
Definition messaging.h:230
Definition oversized.h:29
void run()
Definition thread_messaging.h:301
static ThreadMessaging & instance()
Definition thread_messaging.h:283
bool run_one()
Definition thread_messaging.h:311
void add_task(uint16_t tid, std::unique_ptr< Tmsg< Payload > > msg)
Definition thread_messaging.h:318
void set_finished(bool v=true)
Definition thread_messaging.h:296
CreateNodeStatus
Definition enclave_interface_types.h:8
@ OK
Definition enclave_interface_types.h:10
@ InternalError
Definition enclave_interface_types.h:13
StartType
Definition enclave_interface_types.h:92
@ Recover
Definition enclave_interface_types.h:95
@ Start
Definition enclave_interface_types.h:93
constexpr char const * start_type_to_str(StartType type)
Definition enclave_interface_types.h:98
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
void openssl_sha256_init()
Definition hash.cpp:61
CurveID
Definition curve.h:18
void openssl_sha256_shutdown()
Definition hash.cpp:87
std::shared_ptr< WorkBeacon > WorkBeaconPtr
Definition work_beacon.h:60
uint16_t get_current_thread_id()
Definition thread_local.cpp:15
Definition app_interface.h:14
std::unique_ptr< ccf::endpoints::EndpointRegistry > make_user_endpoints(ccf::AbstractNodeContext &context)
Definition js_generic.cpp:9
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition node_context.h:12
size_t node_to_node_message_limit
Definition startup_config.h:26
ccf::NodeInfoNetwork network
Definition startup_config.h:31
ccf::ds::SizeString historical_cache_soft_limit
Definition startup_config.h:28
ccf::consensus::Configuration consensus
Definition startup_config.h:30
uint64_t tid
Definition enclave.h:432
Definition network_state.h:12
std::shared_ptr< LedgerSecrets > ledger_secrets
Definition network_state.h:14
std::unique_ptr< NetworkIdentity > identity
Definition network_state.h:13
std::shared_ptr< ccf::kv::Store > tables
Definition network_tables.h:47
Definition node_state.h:76
ccf::crypto::Pem service_cert
Definition node_state.h:78
ccf::crypto::Pem self_signed_node_cert
Definition node_state.h:77
Definition startup_config.h:106
Definition consensus_config.h:11
ccf::ds::TimeString election_timeout
Definition consensus_config.h:13
Definition thread_messaging.h:27