CCF
Loading...
Searching...
No Matches
enclave.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4#include "ccf/app_interface.h"
6#include "ccf/node_context.h"
8#include "ccf/pal/mem.h"
10#include "ds/internal_logger.h"
11#include "ds/oversized.h"
12#include "ds/work_beacon.h"
15#include "interface.h"
17#include "kv/ledger_chunker.h"
19#include "node/network_state.h"
20#include "node/node_state.h"
21#include "node/node_types.h"
24#include "node/rpc/forwarder.h"
31#include "rpc_map.h"
32#include "rpc_sessions.h"
33
34#include <openssl/engine.h>
35
36namespace ccf
37{
38 class Enclave
39 {
40 private:
41 std::unique_ptr<ringbuffer::Circuit> circuit;
42 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory;
43 std::unique_ptr<oversized::WriterFactory> writer_factory;
44 ccf::ds::WorkBeaconPtr work_beacon;
45 ccf::NetworkState network;
46 std::shared_ptr<RPCMap> rpc_map;
47 std::shared_ptr<RPCSessions> rpcsessions;
48 std::unique_ptr<ccf::NodeState> node;
49 ringbuffer::WriterPtr to_host = nullptr;
50 std::chrono::high_resolution_clock::time_point last_tick_time;
51 std::atomic<bool> worker_stop_signal = false;
52
53 StartType start_type{};
54
55 struct NodeContext : public ccf::AbstractNodeContext
56 {
57 const ccf::NodeId this_node;
58
59 NodeContext(ccf::NodeId id) : this_node(std::move(id)) {}
60
61 [[nodiscard]] ccf::NodeId get_node_id() const override
62 {
63 return this_node;
64 }
65 };
66
67 std::unique_ptr<NodeContext> context = nullptr;
68
69 std::shared_ptr<ccf::historical::StateCache> historical_state_cache =
70 nullptr;
71 std::shared_ptr<ccf::indexing::Indexer> indexer = nullptr;
72 std::shared_ptr<ccf::indexing::EnclaveLFSAccess> lfs_access = nullptr;
73
74 public:
76 std::unique_ptr<ringbuffer::Circuit> circuit_,
77 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory_,
78 std::unique_ptr<oversized::WriterFactory> writer_factory_,
79 size_t sig_tx_interval,
80 size_t sig_ms_interval,
81 size_t chunk_threshold,
82 const ccf::consensus::Configuration& consensus_config,
83 const ccf::crypto::CurveID& curve_id,
84 ccf::ds::WorkBeaconPtr work_beacon_) :
85 circuit(std::move(circuit_)),
86 basic_writer_factory(std::move(basic_writer_factory_)),
87 writer_factory(std::move(writer_factory_)),
88 work_beacon(std::move(work_beacon_)),
89 rpc_map(std::make_shared<RPCMap>()),
90 rpcsessions(std::make_shared<RPCSessions>(*writer_factory, rpc_map))
91 {
92 to_host = writer_factory->create_writer_to_outside();
93
94 LOG_TRACE_FMT("Creating ledger secrets");
95 network.ledger_secrets = std::make_shared<ccf::LedgerSecrets>();
96
97 network.tables->set_chunker(
98 std::make_shared<ccf::kv::LedgerChunker>(chunk_threshold));
99
100 LOG_TRACE_FMT("Creating node");
101 node = std::make_unique<ccf::NodeState>(
102 *writer_factory, network, rpcsessions, curve_id);
103
104 LOG_TRACE_FMT("Creating context");
105 context = std::make_unique<NodeContext>(node->get_node_id());
106
107 LOG_TRACE_FMT("Creating context subsystems");
108 historical_state_cache = std::make_shared<ccf::historical::StateCache>(
109 *network.tables,
110 network.ledger_secrets,
111 writer_factory->create_writer_to_outside());
112 context->install_subsystem(historical_state_cache);
113
114 indexer = std::make_shared<ccf::indexing::Indexer>(
115 std::make_shared<ccf::indexing::HistoricalTransactionFetcher>(
116 historical_state_cache));
117 context->install_subsystem(indexer);
118
119 lfs_access = std::make_shared<ccf::indexing::EnclaveLFSAccess>(
120 writer_factory->create_writer_to_outside());
121 context->install_subsystem(lfs_access);
122
123 context->install_subsystem(std::make_shared<ccf::NodeOperation>(*node));
124 context->install_subsystem(
125 std::make_shared<ccf::GovernanceEffects>(*node));
126
127 context->install_subsystem(
128 std::make_shared<ccf::NetworkIdentitySubsystem>(
129 *node, network.identity, historical_state_cache));
130
131 context->install_subsystem(
132 std::make_shared<ccf::NodeConfigurationSubsystem>(*node));
133
134 auto cpss = std::make_shared<ccf::CustomProtocolSubsystem>(*node);
135 context->install_subsystem(cpss);
136 rpcsessions->set_custom_protocol_subsystem(cpss);
137
138 static constexpr size_t max_interpreter_cache_size = 10;
139 auto interpreter_cache =
140 std::make_shared<ccf::js::InterpreterCache>(max_interpreter_cache_size);
141 context->install_subsystem(interpreter_cache);
142
143 context->install_subsystem(
144 std::make_shared<ccf::AbstractCOSESignaturesConfigSubsystem>(*node));
145
146 LOG_TRACE_FMT("Creating RPC actors / ffi");
147 rpc_map->register_frontend<ccf::ActorsType::members>(
148 std::make_unique<ccf::MemberRpcFrontend>(network, *context));
149
150 rpc_map->register_frontend<ccf::ActorsType::users>(
151 std::make_unique<ccf::UserRpcFrontend>(
152 network, ccf::make_user_endpoints(*context), *context));
153
154 rpc_map->register_frontend<ccf::ActorsType::nodes>(
155 std::make_unique<ccf::NodeRpcFrontend>(network, *context));
156
157 LOG_TRACE_FMT("Initialize node");
158 node->initialize(
159 consensus_config,
160 rpc_map,
161 rpcsessions,
162 indexer,
163 sig_tx_interval,
164 sig_ms_interval);
165 }
166
168 {
169 LOG_TRACE_FMT("Shutting down enclave");
170 }
171
173 StartType start_type_,
174 const ccf::StartupConfig& ccf_config_,
175 std::vector<uint8_t>&& startup_snapshot,
176 std::vector<uint8_t>& node_cert,
177 std::vector<uint8_t>& service_cert)
178 {
179 start_type = start_type_;
180
181 rpcsessions->update_listening_interface_options(ccf_config_.network);
182
183 node->set_n2n_message_limit(ccf_config_.node_to_node_message_limit);
184
185 historical_state_cache->set_soft_cache_limit(
186 ccf_config_.historical_cache_soft_limit);
187
188 // If we haven't heard from a node for multiple elections, then cleanup
189 // their node-to-node channel
190 const auto idle_timeout =
191 std::chrono::milliseconds(ccf_config_.consensus.election_timeout) * 4;
192 node->set_n2n_idle_timeout(idle_timeout);
193
194 ccf::NodeCreateInfo create_info;
195 try
196 {
198 "Creating node with start_type {}", start_type_to_str(start_type));
199 create_info =
200 node->create(start_type, ccf_config_, std::move(startup_snapshot));
201 }
202 catch (const std::exception& e)
203 {
204 LOG_FAIL_FMT("Error starting node: {}", e.what());
206 }
207
208 // Copy node and service certs out
209 node_cert = create_info.self_signed_node_cert.raw();
210
211 if (start_type == StartType::Start || start_type == StartType::Recover)
212 {
213 // When starting a node in start or recover modes, fresh network secrets
214 // are created and the associated certificate can be passed to the host
215 service_cert = create_info.service_cert.raw();
216 }
217
219 }
220
221 bool run_main()
222 {
223 LOG_DEBUG_FMT("Running main thread");
224
225 {
226 messaging::BufferProcessor bp("Enclave");
227
228 // reconstruct oversized messages sent to the enclave
230
231 lfs_access->register_message_handlers(bp.get_dispatcher());
232
234 bp, AdminMessage::stop, [this, &bp](const uint8_t*, size_t) {
235 bp.set_finished();
236 this->worker_stop_signal.store(true);
237 });
238
240 bp, AdminMessage::stop_notice, [this](const uint8_t*, size_t) {
241 node->stop_notice();
242 });
243
244 last_tick_time = decltype(last_tick_time)::clock::now();
245
247 bp,
248 AdminMessage::tick,
249 [this, &disp = bp.get_dispatcher()](const uint8_t*, size_t) {
250 const auto message_counts = disp.retrieve_message_counts();
251 const auto j = disp.convert_message_counts(message_counts);
252 RINGBUFFER_WRITE_MESSAGE(
253 AdminMessage::work_stats, to_host, j.dump());
254
255 const auto time_now = decltype(last_tick_time)::clock::now();
256
257 const auto elapsed_ms =
258 std::chrono::duration_cast<std::chrono::milliseconds>(
259 time_now - last_tick_time);
260 if (elapsed_ms.count() > 0)
261 {
262 last_tick_time += elapsed_ms;
263
264 node->tick(elapsed_ms);
265 historical_state_cache->tick(elapsed_ms);
266 ccf::tasks::tick(elapsed_ms);
267 // When recovering, no signature should be emitted while the
268 // public ledger is being read
269 if (!node->is_reading_public_ledger())
270 {
271 for (auto& [actor, frontend] : rpc_map->frontends())
272 {
273 frontend->tick(elapsed_ms);
274 }
275 }
276 node->tick_end();
277 }
278 });
279
281 bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
282 try
283 {
284 node->recv_node_inbound(data, size);
285 }
286 catch (const std::exception& e)
287 {
289 "Ignoring node_inbound message due to exception: {}", e.what());
290 }
291 });
292
294 bp,
295 ::consensus::ledger_entry_range,
296 [this](const uint8_t* data, size_t size) {
297 const auto [from_seqno, to_seqno, purpose, body] =
298 ringbuffer::read_message<::consensus::ledger_entry_range>(
299 data, size);
300 switch (purpose)
301 {
302 case ::consensus::LedgerRequestPurpose::Recovery:
303 {
304 if (node->is_reading_public_ledger())
305 {
306 node->recover_public_ledger_entries(body);
307 }
308 else if (node->is_reading_private_ledger())
309 {
310 node->recover_private_ledger_entries(body);
311 }
312 else
313 {
314 auto [s, _, __] = node->state();
316 "Cannot recover ledger entry: Unexpected node state {}", s);
317 }
318 break;
319 }
320 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
321 {
322 historical_state_cache->handle_ledger_entries(
323 from_seqno, to_seqno, body);
324 break;
325 }
326 default:
327 {
328 LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
329 }
330 }
331 });
332
334 bp,
335 ::consensus::ledger_no_entry_range,
336 [this](const uint8_t* data, size_t size) {
337 const auto [from_seqno, to_seqno, purpose] =
338 ringbuffer::read_message<::consensus::ledger_no_entry_range>(
339 data, size);
340 switch (purpose)
341 {
342 case ::consensus::LedgerRequestPurpose::Recovery:
343 {
344 node->recover_ledger_end();
345 break;
346 }
347 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
348 {
349 historical_state_cache->handle_no_entry_range(
350 from_seqno, to_seqno);
351 break;
352 }
353 default:
354 {
355 LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
356 }
357 }
358 });
359
361 bp,
362 ::consensus::snapshot_allocated,
363 [this](const uint8_t* data, size_t size) {
364 const auto [snapshot_span, generation_count] =
365 ringbuffer::read_message<::consensus::snapshot_allocated>(
366 data, size);
367
368 node->write_snapshot(snapshot_span, generation_count);
369 });
370
371 rpcsessions->register_message_handlers(bp.get_dispatcher());
372
373 // Maximum number of inbound ringbuffer messages which will be
374 // processed in a single iteration
375 static constexpr size_t max_messages = 256;
376
377 while (!bp.get_finished())
378 {
379 // Wait until the host indicates that some ringbuffer messages are
380 // available, but wake at least every 100ms to check thread messages
381 work_beacon->wait_for_work_with_timeout(
382 std::chrono::milliseconds(100));
383
384 // First, read some messages from the ringbuffer
385 auto read = bp.read_n(max_messages, circuit->read_from_outside());
386
387 // Then, execute some tasks
388 auto& job_board = ccf::tasks::get_main_job_board();
389 ccf::tasks::Task task = job_board.get_task();
390 size_t tasks_done = 0;
391 while (task != nullptr)
392 {
393 task->do_task();
394 ++tasks_done;
395 if (tasks_done >= max_messages)
396 {
397 break;
398 }
399 task = job_board.get_task();
400 }
401
402 // If no messages were read from the ringbuffer and tasks were
403 // executed, idle
404 if (read == 0 && tasks_done == 0)
405 {
406 std::this_thread::yield();
407 }
408 }
409
410 LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
411 RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, to_host);
412
413 return true;
414 }
415 }
416
418 {
419 LOG_DEBUG_FMT("Running worker thread");
420
421 {
422 auto& job_board = ccf::tasks::get_main_job_board();
423 const auto timeout = std::chrono::milliseconds(100);
424
425 while (!worker_stop_signal.load())
426 {
427 auto task = job_board.wait_for_task(timeout);
428 if (task != nullptr)
429 {
430 task->do_task();
431 }
432 }
433 }
434
435 return true;
436 }
437 };
438}
Definition enclave.h:39
CreateNodeStatus create_new_node(StartType start_type_, const ccf::StartupConfig &ccf_config_, std::vector< uint8_t > &&startup_snapshot, std::vector< uint8_t > &node_cert, std::vector< uint8_t > &service_cert)
Definition enclave.h:172
~Enclave()
Definition enclave.h:167
bool run_main()
Definition enclave.h:221
Enclave(std::unique_ptr< ringbuffer::Circuit > circuit_, std::unique_ptr< ringbuffer::WriterFactory > basic_writer_factory_, std::unique_ptr< oversized::WriterFactory > writer_factory_, size_t sig_tx_interval, size_t sig_ms_interval, size_t chunk_threshold, const ccf::consensus::Configuration &consensus_config, const ccf::crypto::CurveID &curve_id, ccf::ds::WorkBeaconPtr work_beacon_)
Definition enclave.h:75
bool run_worker()
Definition enclave.h:417
Definition rpc_map.h:11
Definition rpc_sessions.h:44
std::vector< uint8_t > raw() const
Definition pem.h:71
Definition messaging.h:211
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:219
void set_finished(bool v=true)
Definition messaging.h:230
Definition oversized.h:30
CreateNodeStatus
Definition enclave_interface_types.h:8
@ OK
Definition enclave_interface_types.h:10
@ InternalError
Definition enclave_interface_types.h:13
StartType
Definition enclave_interface_types.h:92
@ Recover
Definition enclave_interface_types.h:95
@ Start
Definition enclave_interface_types.h:93
constexpr char const * start_type_to_str(StartType type)
Definition enclave_interface_types.h:98
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
CurveID
Definition curve.h:18
std::shared_ptr< WorkBeacon > WorkBeaconPtr
Definition work_beacon.h:60
JobBoard & get_main_job_board()
Definition task_system.cpp:53
std::shared_ptr< BaseTask > Task
Definition task.h:36
Definition app_interface.h:14
std::unique_ptr< ccf::endpoints::EndpointRegistry > make_user_endpoints(ccf::AbstractNodeContext &context)
Definition js_generic.cpp:9
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
STL namespace.
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition node_context.h:12
size_t node_to_node_message_limit
Definition startup_config.h:26
ccf::NodeInfoNetwork network
Definition startup_config.h:31
ccf::ds::SizeString historical_cache_soft_limit
Definition startup_config.h:28
ccf::consensus::Configuration consensus
Definition startup_config.h:30
Definition network_state.h:12
std::shared_ptr< LedgerSecrets > ledger_secrets
Definition network_state.h:14
std::unique_ptr< NetworkIdentity > identity
Definition network_state.h:13
std::shared_ptr< ccf::kv::Store > tables
Definition network_tables.h:46
Definition node_state.h:75
ccf::crypto::Pem service_cert
Definition node_state.h:77
ccf::crypto::Pem self_signed_node_cert
Definition node_state.h:76
Definition startup_config.h:106
Definition consensus_config.h:11
ccf::ds::TimeString election_timeout
Definition consensus_config.h:13