CCF
Loading...
Searching...
No Matches
enclave.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4#include "ccf/app_interface.h"
6#include "ccf/ds/logger.h"
8#include "ccf/node_context.h"
10#include "ccf/pal/mem.h"
11#include "crypto/openssl/hash.h"
12#include "ds/oversized.h"
13#include "ds/work_beacon.h"
16#include "interface.h"
18#include "kv/ledger_chunker.h"
21#include "node/network_state.h"
22#include "node/node_state.h"
23#include "node/node_types.h"
27#include "node/rpc/forwarder.h"
35#include "rpc_map.h"
36#include "rpc_sessions.h"
37
38#include <openssl/engine.h>
39
40namespace ccf
41{
42 class Enclave
43 {
44 private:
45 std::unique_ptr<ringbuffer::Circuit> circuit;
46 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory;
47 std::unique_ptr<oversized::WriterFactory> writer_factory;
48 ccf::ds::WorkBeaconPtr work_beacon;
49 ccf::NetworkState network;
50 std::shared_ptr<RPCMap> rpc_map;
51 std::shared_ptr<RPCSessions> rpcsessions;
52 std::unique_ptr<ccf::NodeState> node;
53 ringbuffer::WriterPtr to_host = nullptr;
54 std::chrono::high_resolution_clock::time_point last_tick_time;
55
56 StartType start_type;
57
58 struct NodeContext : public ccf::AbstractNodeContext
59 {
60 const ccf::NodeId this_node;
61
62 NodeContext(const ccf::NodeId& id) : this_node(id) {}
63
64 ccf::NodeId get_node_id() const override
65 {
66 return this_node;
67 }
68 };
69
70 std::unique_ptr<NodeContext> context = nullptr;
71
72 std::shared_ptr<ccf::historical::StateCache> historical_state_cache =
73 nullptr;
74 std::shared_ptr<ccf::indexing::Indexer> indexer = nullptr;
75 std::shared_ptr<ccf::indexing::EnclaveLFSAccess> lfs_access = nullptr;
76 std::shared_ptr<ccf::HostProcesses> host_processes = nullptr;
77
78 public:
80 std::unique_ptr<ringbuffer::Circuit> circuit_,
81 std::unique_ptr<ringbuffer::WriterFactory> basic_writer_factory_,
82 std::unique_ptr<oversized::WriterFactory> writer_factory_,
83 size_t sig_tx_interval,
84 size_t sig_ms_interval,
85 size_t chunk_threshold,
86 const ccf::consensus::Configuration& consensus_config,
87 const ccf::crypto::CurveID& curve_id,
88 const ccf::ds::WorkBeaconPtr& work_beacon_) :
89 circuit(std::move(circuit_)),
90 basic_writer_factory(std::move(basic_writer_factory_)),
91 writer_factory(std::move(writer_factory_)),
92 work_beacon(work_beacon_),
93 network(),
94 rpc_map(std::make_shared<RPCMap>()),
95 rpcsessions(std::make_shared<RPCSessions>(*writer_factory, rpc_map))
96 {
98
99 to_host = writer_factory->create_writer_to_outside();
100
101 LOG_TRACE_FMT("Creating ledger secrets");
102 network.ledger_secrets = std::make_shared<ccf::LedgerSecrets>();
103
104 network.tables->set_chunker(
105 std::make_shared<ccf::kv::LedgerChunker>(chunk_threshold));
106
107 LOG_TRACE_FMT("Creating node");
108 node = std::make_unique<ccf::NodeState>(
109 *writer_factory, network, rpcsessions, curve_id);
110
111 LOG_TRACE_FMT("Creating context");
112 context = std::make_unique<NodeContext>(node->get_node_id());
113
114 LOG_TRACE_FMT("Creating context subsystems");
115 historical_state_cache = std::make_shared<ccf::historical::StateCache>(
116 *network.tables,
117 network.ledger_secrets,
118 writer_factory->create_writer_to_outside());
119 context->install_subsystem(historical_state_cache);
120
121 indexer = std::make_shared<ccf::indexing::Indexer>(
122 std::make_shared<ccf::indexing::HistoricalTransactionFetcher>(
123 historical_state_cache));
124 context->install_subsystem(indexer);
125
126 lfs_access = std::make_shared<ccf::indexing::EnclaveLFSAccess>(
127 writer_factory->create_writer_to_outside());
128 context->install_subsystem(lfs_access);
129
130 context->install_subsystem(std::make_shared<ccf::HostProcesses>(*node));
131 context->install_subsystem(std::make_shared<ccf::NodeOperation>(*node));
132 context->install_subsystem(
133 std::make_shared<ccf::GovernanceEffects>(*node));
134
135 context->install_subsystem(
136 std::make_shared<ccf::NetworkIdentitySubsystem>(
137 *node, network.identity));
138
139 context->install_subsystem(
140 std::make_shared<ccf::NodeConfigurationSubsystem>(*node));
141
142 context->install_subsystem(std::make_shared<ccf::ACMESubsystem>(*node));
143
144 auto cpss = std::make_shared<ccf::CustomProtocolSubsystem>(*node);
145 context->install_subsystem(cpss);
146 rpcsessions->set_custom_protocol_subsystem(cpss);
147
148 static constexpr size_t max_interpreter_cache_size = 10;
149 auto interpreter_cache =
150 std::make_shared<ccf::js::InterpreterCache>(max_interpreter_cache_size);
151 context->install_subsystem(interpreter_cache);
152
153 context->install_subsystem(
154 std::make_shared<ccf::AbstractCOSESignaturesConfigSubsystem>(*node));
155
156 LOG_TRACE_FMT("Creating RPC actors / ffi");
157 rpc_map->register_frontend<ccf::ActorsType::members>(
158 std::make_unique<ccf::MemberRpcFrontend>(network, *context));
159
160 rpc_map->register_frontend<ccf::ActorsType::users>(
161 std::make_unique<ccf::UserRpcFrontend>(
162 network, ccf::make_user_endpoints(*context), *context));
163
164 rpc_map->register_frontend<ccf::ActorsType::nodes>(
165 std::make_unique<ccf::NodeRpcFrontend>(network, *context));
166
167 // Note: for ACME challenges, the well-known frontend should really only
168 // listen on the interface specified in the ACMEClientConfig, but we don't
169 // have support for frontends restricted to particular interfaces yet.
170 rpc_map->register_frontend<ccf::ActorsType::acme_challenge>(
171 std::make_unique<ccf::ACMERpcFrontend>(network, *context));
172
173 LOG_TRACE_FMT("Initialize node");
174 node->initialize(
175 consensus_config,
176 rpc_map,
177 rpcsessions,
178 indexer,
179 sig_tx_interval,
180 sig_ms_interval);
181 }
182
184 {
185 LOG_TRACE_FMT("Shutting down enclave");
187 }
188
190 StartType start_type_,
191 const ccf::StartupConfig& ccf_config_,
192 std::vector<uint8_t>&& startup_snapshot,
193 std::vector<uint8_t>& node_cert,
194 std::vector<uint8_t>& service_cert)
195 {
196 start_type = start_type_;
197
198 rpcsessions->update_listening_interface_options(ccf_config_.network);
199
200 node->set_n2n_message_limit(ccf_config_.node_to_node_message_limit);
201
202 historical_state_cache->set_soft_cache_limit(
203 ccf_config_.historical_cache_soft_limit);
204
205 // If we haven't heard from a node for multiple elections, then cleanup
206 // their node-to-node channel
207 const auto idle_timeout =
208 std::chrono::milliseconds(ccf_config_.consensus.election_timeout) * 4;
209 node->set_n2n_idle_timeout(idle_timeout);
210
211 ccf::NodeCreateInfo create_info;
212 try
213 {
215 "Creating node with start_type {}", start_type_to_str(start_type));
216 create_info =
217 node->create(start_type, ccf_config_, std::move(startup_snapshot));
218 }
219 catch (const std::exception& e)
220 {
221 LOG_FAIL_FMT("Error starting node: {}", e.what());
223 }
224
225 // Copy node and service certs out
226 node_cert = create_info.self_signed_node_cert.raw();
227
228 if (start_type == StartType::Start || start_type == StartType::Recover)
229 {
230 // When starting a node in start or recover modes, fresh network secrets
231 // are created and the associated certificate can be passed to the host
232 service_cert = create_info.service_cert.raw();
233 }
234
236 }
237
238 bool run_main()
239 {
241 LOG_DEBUG_FMT("Running main thread");
242
243 {
244 messaging::BufferProcessor bp("Enclave");
245
246 // reconstruct oversized messages sent to the enclave
248
249 lfs_access->register_message_handlers(bp.get_dispatcher());
250
252 bp, AdminMessage::stop, [&bp](const uint8_t*, size_t) {
253 bp.set_finished();
255 });
256
258 bp, AdminMessage::stop_notice, [this](const uint8_t*, size_t) {
259 node->stop_notice();
260 });
261
262 last_tick_time = decltype(last_tick_time)::clock::now();
263
265 bp,
266 AdminMessage::tick,
267 [this, &disp = bp.get_dispatcher()](const uint8_t*, size_t) {
268 const auto message_counts = disp.retrieve_message_counts();
269 const auto j = disp.convert_message_counts(message_counts);
270 RINGBUFFER_WRITE_MESSAGE(
271 AdminMessage::work_stats, to_host, j.dump());
272
273 const auto time_now = decltype(last_tick_time)::clock::now();
274
275 const auto elapsed_ms =
276 std::chrono::duration_cast<std::chrono::milliseconds>(
277 time_now - last_tick_time);
278 if (elapsed_ms.count() > 0)
279 {
280 last_tick_time += elapsed_ms;
281
282 node->tick(elapsed_ms);
283 historical_state_cache->tick(elapsed_ms);
284 ::threading::ThreadMessaging::instance().tick(elapsed_ms);
285 // When recovering, no signature should be emitted while the
286 // public ledger is being read
287 if (!node->is_reading_public_ledger())
288 {
289 for (auto& [actor, frontend] : rpc_map->frontends())
290 {
291 frontend->tick(elapsed_ms);
292 }
293 }
294 node->tick_end();
295 }
296 });
297
299 bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
300 try
301 {
302 node->recv_node_inbound(data, size);
303 }
304 catch (const std::exception& e)
305 {
307 "Ignoring node_inbound message due to exception: {}", e.what());
308 }
309 });
310
312 bp,
313 ::consensus::ledger_entry_range,
314 [this](const uint8_t* data, size_t size) {
315 const auto [from_seqno, to_seqno, purpose, body] =
316 ringbuffer::read_message<::consensus::ledger_entry_range>(
317 data, size);
318 switch (purpose)
319 {
320 case ::consensus::LedgerRequestPurpose::Recovery:
321 {
322 if (node->is_reading_public_ledger())
323 {
324 node->recover_public_ledger_entries(body);
325 }
326 else if (node->is_reading_private_ledger())
327 {
328 node->recover_private_ledger_entries(body);
329 }
330 else
331 {
332 auto [s, _, __] = node->state();
334 "Cannot recover ledger entry: Unexpected node state {}", s);
335 }
336 break;
337 }
338 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
339 {
340 historical_state_cache->handle_ledger_entries(
341 from_seqno, to_seqno, body);
342 break;
343 }
344 default:
345 {
346 LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
347 }
348 }
349 });
350
352 bp,
353 ::consensus::ledger_no_entry_range,
354 [this](const uint8_t* data, size_t size) {
355 const auto [from_seqno, to_seqno, purpose] =
356 ringbuffer::read_message<::consensus::ledger_no_entry_range>(
357 data, size);
358 switch (purpose)
359 {
360 case ::consensus::LedgerRequestPurpose::Recovery:
361 {
362 node->recover_ledger_end();
363 break;
364 }
365 case ::consensus::LedgerRequestPurpose::HistoricalQuery:
366 {
367 historical_state_cache->handle_no_entry_range(
368 from_seqno, to_seqno);
369 break;
370 }
371 default:
372 {
373 LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
374 }
375 }
376 });
377
379 bp,
380 ::consensus::snapshot_allocated,
381 [this](const uint8_t* data, size_t size) {
382 const auto [snapshot_span, generation_count] =
383 ringbuffer::read_message<::consensus::snapshot_allocated>(
384 data, size);
385
386 node->write_snapshot(snapshot_span, generation_count);
387 });
388
389 rpcsessions->register_message_handlers(bp.get_dispatcher());
390
391 // Maximum number of inbound ringbuffer messages which will be
392 // processed in a single iteration
393 static constexpr size_t max_messages = 256;
394
395 while (!bp.get_finished())
396 {
397 // Wait until the host indicates that some ringbuffer messages are
398 // available, but wake at least every 100ms to check thread messages
399 work_beacon->wait_for_work_with_timeout(
400 std::chrono::milliseconds(100));
401
402 // First, read some messages from the ringbuffer
403 auto read = bp.read_n(max_messages, circuit->read_from_outside());
404
405 // Then, execute some thread messages
406 size_t thread_msg = 0;
407 while (thread_msg < max_messages &&
409 {
410 thread_msg++;
411 }
412
413 // If no messages were read from the ringbuffer and no thread
414 // messages were executed, idle
415 if (read == 0 && thread_msg == 0)
416 {
417 std::this_thread::yield();
418 }
419 }
420
421 LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
422 RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, to_host);
423
425
426 return true;
427 }
428 }
429
430 struct Msg
431 {
432 uint64_t tid;
433 };
434
435 static void init_thread_cb(std::unique_ptr<::threading::Tmsg<Msg>> msg)
436 {
437 LOG_DEBUG_FMT("First thread CB:{}", msg->data.tid);
438 }
439
441 {
443 LOG_DEBUG_FMT("Running worker thread");
444
445 {
446 auto msg = std::make_unique<::threading::Tmsg<Msg>>(&init_thread_cb);
449 msg->data.tid, std::move(msg));
450
453 }
454
455 return true;
456 }
457 };
458}
Definition enclave.h:43
static void init_thread_cb(std::unique_ptr<::threading::Tmsg< Msg > > msg)
Definition enclave.h:435
CreateNodeStatus create_new_node(StartType start_type_, const ccf::StartupConfig &ccf_config_, std::vector< uint8_t > &&startup_snapshot, std::vector< uint8_t > &node_cert, std::vector< uint8_t > &service_cert)
Definition enclave.h:189
Enclave(std::unique_ptr< ringbuffer::Circuit > circuit_, std::unique_ptr< ringbuffer::WriterFactory > basic_writer_factory_, std::unique_ptr< oversized::WriterFactory > writer_factory_, size_t sig_tx_interval, size_t sig_ms_interval, size_t chunk_threshold, const ccf::consensus::Configuration &consensus_config, const ccf::crypto::CurveID &curve_id, const ccf::ds::WorkBeaconPtr &work_beacon_)
Definition enclave.h:79
~Enclave()
Definition enclave.h:183
bool run_main()
Definition enclave.h:238
bool run_worker()
Definition enclave.h:440
Definition rpc_map.h:11
Definition rpc_sessions.h:46
std::vector< uint8_t > raw() const
Definition pem.h:71
Definition messaging.h:211
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:219
void set_finished(bool v=true)
Definition messaging.h:230
Definition oversized.h:29
void run()
Definition thread_messaging.h:301
static ThreadMessaging & instance()
Definition thread_messaging.h:283
bool run_one()
Definition thread_messaging.h:311
void add_task(uint16_t tid, std::unique_ptr< Tmsg< Payload > > msg)
Definition thread_messaging.h:318
void set_finished(bool v=true)
Definition thread_messaging.h:296
CreateNodeStatus
Definition enclave_interface_types.h:8
@ OK
Definition enclave_interface_types.h:10
@ InternalError
Definition enclave_interface_types.h:13
StartType
Definition enclave_interface_types.h:92
@ Recover
Definition enclave_interface_types.h:95
@ Start
Definition enclave_interface_types.h:93
constexpr char const * start_type_to_str(StartType type)
Definition enclave_interface_types.h:98
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
void openssl_sha256_init()
Definition hash.cpp:61
CurveID
Definition curve.h:18
void openssl_sha256_shutdown()
Definition hash.cpp:87
std::shared_ptr< WorkBeacon > WorkBeaconPtr
Definition work_beacon.h:60
uint16_t get_current_thread_id()
Definition thread_local.cpp:15
Definition app_interface.h:14
std::unique_ptr< ccf::endpoints::EndpointRegistry > make_user_endpoints(ccf::AbstractNodeContext &context)
Definition js_generic.cpp:9
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
STL namespace.
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition node_context.h:12
size_t node_to_node_message_limit
Definition startup_config.h:26
ccf::NodeInfoNetwork network
Definition startup_config.h:31
ccf::ds::SizeString historical_cache_soft_limit
Definition startup_config.h:28
ccf::consensus::Configuration consensus
Definition startup_config.h:30
Definition enclave.h:431
uint64_t tid
Definition enclave.h:432
Definition network_state.h:12
std::shared_ptr< LedgerSecrets > ledger_secrets
Definition network_state.h:14
std::unique_ptr< NetworkIdentity > identity
Definition network_state.h:13
std::shared_ptr< ccf::kv::Store > tables
Definition network_tables.h:47
Definition node_state.h:76
ccf::crypto::Pem service_cert
Definition node_state.h:78
ccf::crypto::Pem self_signed_node_cert
Definition node_state.h:77
Definition startup_config.h:106
Definition consensus_config.h:11
ccf::ds::TimeString election_timeout
Definition consensus_config.h:13
Definition thread_messaging.h:27