CCF
Loading...
Searching...
No Matches
ledger.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
6#include "ccf/ds/nonstd.h"
7#include "ccf/pal/locking.h"
9#include "ds/files.h"
10#include "ds/messaging.h"
11#include "ds/serialized.h"
12#include "kv/kv_types.h"
14#include "time_bound_logger.h"
15
16#include <cstdint>
17#include <cstdio>
18#include <filesystem>
19#include <list>
20#include <map>
21#include <string>
22#include <sys/types.h>
23#include <unistd.h>
24#include <uv.h>
25#include <vector>
26
27namespace fs = std::filesystem;
28
29namespace asynchost
30{
31 static constexpr size_t ledger_max_read_cache_files_default = 5;
32
33 static constexpr auto ledger_committed_suffix = "committed";
34 static constexpr auto ledger_start_idx_delimiter = "_";
35 static constexpr auto ledger_last_idx_delimiter = "-";
36 static constexpr auto ledger_recovery_file_suffix = "recovery";
37 static constexpr auto ledger_ignored_file_suffix = "ignored";
38
39 static inline size_t get_start_idx_from_file_name(
40 const std::string& file_name)
41 {
42 auto pos = file_name.find(ledger_start_idx_delimiter);
43 if (pos == std::string::npos)
44 {
45 throw std::logic_error(fmt::format(
46 "Ledger file name {} does not contain a start seqno", file_name));
47 }
48
49 return std::stol(file_name.substr(pos + 1));
50 }
51
52 static inline std::optional<size_t> get_last_idx_from_file_name(
53 const std::string& file_name)
54 {
55 auto pos = file_name.find(ledger_last_idx_delimiter);
56 if (pos == std::string::npos)
57 {
58 // Non-committed file names do not contain a last idx
59 return std::nullopt;
60 }
61
62 return std::stol(file_name.substr(pos + 1));
63 }
64
65 static inline bool is_ledger_file_name_committed(const std::string& file_name)
66 {
67 return file_name.ends_with(ledger_committed_suffix);
68 }
69
70 static inline bool is_ledger_file_name_recovery(const std::string& file_name)
71 {
72 return file_name.ends_with(ledger_recovery_file_suffix);
73 }
74
75 static inline bool is_ledger_file_name_ignored(const std::string& file_name)
76 {
77 return file_name.ends_with(ledger_ignored_file_suffix);
78 }
79
80 static inline bool is_ledger_file_ignored(const std::string& file_name)
81 {
82 // Catch-all for all files that should be ignored
83 return is_ledger_file_name_recovery(file_name) ||
84 is_ledger_file_name_ignored(file_name);
85 }
86
87 static inline fs::path remove_suffix(
88 std::string_view file_name, const std::string& suffix)
89 {
90 if (file_name.ends_with(suffix))
91 {
92 file_name.remove_suffix(suffix.size());
93 }
94 return file_name;
95 }
96
97 static inline fs::path remove_recovery_suffix(std::string_view file_name)
98 {
99 return remove_suffix(
100 file_name, fmt::format(".{}", ledger_recovery_file_suffix));
101 }
102
103 static std::optional<std::string> get_file_name_with_idx(
104 const std::string& dir, size_t idx, bool allow_recovery_files)
105 {
106 std::optional<std::string> match = std::nullopt;
107 for (auto const& f : fs::directory_iterator(dir))
108 {
109 // If any file, based on its name, contains idx. Only committed
110 // (i.e. those with a last idx) are considered here.
111 auto f_name = f.path().filename();
112 if (
113 is_ledger_file_name_ignored(f_name) ||
114 (!allow_recovery_files && is_ledger_file_name_recovery(f_name)))
115 {
116 continue;
117 }
118
119 size_t start_idx = 0;
120 std::optional<size_t> last_idx = std::nullopt;
121 try
122 {
123 start_idx = get_start_idx_from_file_name(f_name);
124 last_idx = get_last_idx_from_file_name(f_name);
125 }
126 catch (const std::exception& e)
127 {
128 // Ignoring invalid ledger file
129 continue;
130 }
131 if (idx >= start_idx && last_idx.has_value() && idx <= last_idx.value())
132 {
133 match = f_name;
134 break;
135 }
136 }
137
138 return match;
139 }
140
142 {
143 std::vector<uint8_t> data;
144 size_t end_idx;
145 };
146
148 {
149 private:
150 using positions_offset_header_t = size_t;
151 static constexpr auto file_name_prefix = "ledger";
152
153 const fs::path dir;
154 fs::path file_name;
155
156 // This uses C stdio instead of fstream because an fstream
157 // cannot be truncated.
158 FILE* file = nullptr;
159 ccf::pal::Mutex file_lock;
160
161 size_t start_idx = 1;
162 size_t total_len = 0; // Points to end of last written entry
163 std::vector<uint32_t> positions;
164
165 bool completed = false;
166 bool committed = false;
167
168 bool recovery = false;
169
170 // This flag is set when an existing ledger is recovered and started (init)
171 // from an old idx. In this case, further ledger files (i.e. those which
172 // contain entries later than init idx), remain on disk and new entries are
173 // checked against the existing ones, until a divergence is found.
174 bool from_existing_file = false;
175
176 public:
177 // Used when creating a new (empty) ledger file
178 LedgerFile(const fs::path& dir, size_t start_idx, bool recovery = false) :
179 dir(dir),
180 file_name(fmt::format("{}_{}", file_name_prefix, start_idx)),
181 start_idx(start_idx),
182 recovery(recovery)
183 {
184 if (recovery)
185 {
186 file_name =
187 fmt::format("{}.{}", file_name.string(), ledger_recovery_file_suffix);
188 }
189
190 auto file_path = dir / file_name;
191 if (fs::exists(file_path))
192 {
193 throw std::logic_error(fmt::format(
194 "Cannot create new ledger file {} in main ledger directory {} as it "
195 "already exists",
196 file_name,
197 dir));
198 }
199 file = fopen(file_path.c_str(), "w+b");
200 if (!file)
201 {
202 throw std::logic_error(fmt::format(
203 "Unable to open ledger file {}: {}", file_path, strerror(errno)));
204 }
205
206 // Header reserved for the offset to the position table
207 fseeko(file, sizeof(positions_offset_header_t), SEEK_SET);
208 total_len = sizeof(positions_offset_header_t);
209 }
210
211 // Used when recovering an existing ledger file
213 const std::string& dir,
214 const std::string& file_name_,
215 bool from_existing_file_ = false) :
216 dir(dir),
217 file_name(file_name_),
218 completed(false),
219 from_existing_file(from_existing_file_)
220 {
221 auto file_path = (fs::path(dir) / fs::path(file_name));
222
223 committed = is_ledger_file_name_committed(file_name);
224 start_idx = get_start_idx_from_file_name(file_name);
225
226 const auto mode = committed ? "rb" : "r+b";
227
228 file = fopen(file_path.c_str(), mode);
229
230 if (!file)
231 {
232 throw std::logic_error(fmt::format(
233 "Unable to open ledger file {}: {}", file_path, strerror(errno)));
234 }
235
236 // First, get full size of file
237 fseeko(file, 0, SEEK_END);
238 size_t total_file_size = ftello(file);
239
240 // Second, read offset to header table
241 fseeko(file, 0, SEEK_SET);
242 positions_offset_header_t table_offset = 0;
243 if (fread(&table_offset, sizeof(positions_offset_header_t), 1, file) != 1)
244 {
245 throw std::logic_error(fmt::format(
246 "Failed to read positions offset from ledger file {}", file_path));
247 }
248
249 if (committed && table_offset == 0)
250 {
251 throw std::logic_error(fmt::format(
252 "Committed ledger file {} cannot be read: invalid table offset (0)",
253 file_path));
254 }
255
256 total_len = sizeof(positions_offset_header_t);
257
258 if (from_existing_file)
259 {
260 // When recovering a file from persistence, do not recover entries to
261 // start with as these are expected to be written again at a later
262 // point.
263 return;
264 }
265
266 if (table_offset != 0)
267 {
268 // If the chunk was completed, read positions table from file directly
269 total_len = table_offset;
270 fseeko(file, table_offset, SEEK_SET);
271
272 if (table_offset > total_file_size)
273 {
274 throw std::logic_error(fmt::format(
275 "Invalid table offset {} greater than total file size {}",
276 table_offset,
277 total_file_size));
278 }
279
280 positions.resize(
281 (total_file_size - table_offset) / sizeof(positions.at(0)));
282
283 if (
284 fread(
285 positions.data(),
286 sizeof(positions.at(0)),
287 positions.size(),
288 file) != positions.size())
289 {
290 throw std::logic_error(fmt::format(
291 "Failed to read positions table from ledger file {}", file_path));
292 }
293 completed = true;
294 }
295 else
296 {
297 // If the chunk was not completed, read all entries to reconstruct
298 // positions table
299 total_len = sizeof(positions_offset_header_t);
300 auto len = total_file_size - total_len;
301
302 ccf::kv::SerialisedEntryHeader entry_header = {};
303 size_t current_idx = start_idx;
304 while (len >= ccf::kv::serialised_entry_header_size)
305 {
306 if (
307 fread(
308 &entry_header, ccf::kv::serialised_entry_header_size, 1, file) !=
309 1)
310 {
312 "Failed to read entry header from ledger file {} at seqno {}",
313 file_path,
314 current_idx);
315 return;
316 }
317
318 len -= ccf::kv::serialised_entry_header_size;
319
320 const auto& entry_size = entry_header.size;
321 if (len < entry_size)
322 {
324 "Malformed incomplete ledger file {} at seqno {} (expecting "
325 "entry of size "
326 "{}, remaining {})",
327 file_path,
328 current_idx,
329 entry_size,
330 len);
331
332 return;
333 }
334
335 fseeko(file, entry_size, SEEK_CUR);
336 len -= entry_size;
337
339 "Recovered one entry of size {} at seqno {}",
340 entry_size,
341 current_idx);
342
343 current_idx++;
344 positions.push_back(total_len);
345 total_len += (ccf::kv::serialised_entry_header_size + entry_size);
346 }
347 completed = false;
348 }
349 }
350
352 {
353 if (file)
354 {
355 fclose(file);
356 }
357 }
358
359 size_t get_start_idx() const
360 {
361 return start_idx;
362 }
363
364 size_t get_last_idx() const
365 {
366 return start_idx + positions.size() - 1;
367 }
368
369 size_t get_current_size() const
370 {
371 return total_len;
372 }
373
374 bool is_committed() const
375 {
376 return committed;
377 }
378
379 bool is_complete() const
380 {
381 return completed;
382 }
383
384 bool is_recovery() const
385 {
386 return recovery;
387 }
388
389 // Returns idx of new entry, and boolean to indicate that file was truncated
390 // before writing the entry
391 std::pair<size_t, bool> write_entry(
392 const uint8_t* data, size_t size, bool committable)
393 {
394 fseeko(file, total_len, SEEK_SET);
395
396 bool should_write = true;
397 bool has_truncated = false;
398 if (from_existing_file)
399 {
400 std::vector<uint8_t> entry(size);
401 if (
402 fread(entry.data(), size, 1, file) != 1 ||
403 memcmp(entry.data(), data, size) != 0)
404 {
405 // Divergence between existing and new entry. Truncate this file,
406 // write the new entry and notify the caller for further cleanup.
407 // Note that even if the truncation results in an empty file, we keep
408 // it on disk as a new entry is about to be written.
409 truncate(get_last_idx(), false /* remove_file_if_empty */);
410 has_truncated = true;
411 from_existing_file = false;
412 }
413 else
414 {
415 should_write = false;
416 }
417 }
418
419 if (should_write)
420 {
421 if (fwrite(data, size, 1, file) != 1)
422 {
423 throw std::logic_error("Failed to write entry to ledger");
424 }
425
426 // Committable entries get flushed straight away
427 if (committable && fflush(file) != 0)
428 {
429 throw std::logic_error(fmt::format(
430 "Failed to flush entry to ledger: {}", strerror(errno)));
431 }
432 }
433
434 positions.push_back(total_len);
435 total_len += size;
436
437 return std::make_pair(get_last_idx(), has_truncated);
438 }
439
440 // Return pair containing entries size and index of last entry included
441 std::pair<size_t, size_t> entries_size(
442 size_t from,
443 size_t to,
444 std::optional<size_t> max_size = std::nullopt) const
445 {
446 if ((from < start_idx) || (to < from) || (to > get_last_idx()))
447 {
448 return {0, 0};
449 }
450
451 size_t size = 0;
452
453 // If max_size is set, return entries that fit within it (best effort).
454 while (true)
455 {
456 auto position_to =
457 (to == get_last_idx()) ? total_len : positions.at(to - start_idx + 1);
458 size = position_to - positions.at(from - start_idx);
459
460 if (!max_size.has_value() || size <= max_size.value())
461 {
462 break;
463 }
464 else
465 {
466 if (from == to)
467 {
468 // Request one entry that is too large: no entries are found
470 "Single ledger entry at {} in file {} is too large for remaining "
471 "space (size {} > max {})",
472 from,
473 file_name,
474 size,
475 max_size.value());
476 return {0, 0};
477 }
478 size_t to_ = from + (to - from) / 2;
480 "Requesting ledger entries from {} to {} in file {} but size {} > "
481 "max size {}: now requesting up to {}",
482 from,
483 to,
484 file_name,
485 size,
486 max_size.value(),
487 to_);
488 to = to_;
489 }
490 }
491
492 return {size, to};
493 }
494
495 std::optional<LedgerReadResult> read_entries(
496 size_t from, size_t to, std::optional<size_t> max_size = std::nullopt)
497 {
498 if ((from < start_idx) || (to > get_last_idx()) || (to < from))
499 {
501 "Cannot find entries: {} - {} in ledger file {}",
502 from,
503 to,
504 file_name);
505 return std::nullopt;
506 }
507
509 "Read entries from {} to {} in {} [max size: {}]",
510 from,
511 to,
512 file_name,
513 max_size.value_or(0));
514
515 std::unique_lock<ccf::pal::Mutex> guard(file_lock);
516 auto [size, to_] = entries_size(from, to, max_size);
517 if (size == 0)
518 {
519 return std::nullopt;
520 }
521 std::vector<uint8_t> entries(size);
522 fseeko(file, positions.at(from - start_idx), SEEK_SET);
523
524 if (fread(entries.data(), size, 1, file) != 1)
525 {
526 throw std::logic_error(fmt::format(
527 "Failed to read entry range {} - {} from file {}",
528 from,
529 to,
530 file_name));
531 }
532
533 return LedgerReadResult{entries, to_};
534 }
535
536 bool truncate(size_t idx, bool remove_file_if_empty = true)
537 {
538 if (
539 committed || (idx < start_idx - 1) ||
540 (completed && idx >= get_last_idx()))
541 {
542 return false;
543 }
544
545 if (remove_file_if_empty && idx == start_idx - 1)
546 {
547 // Truncating everything triggers file deletion
548 if (!fs::remove(dir / file_name))
549 {
550 throw std::logic_error(
551 fmt::format("Could not remove file {}", file_name));
552 }
554 "Removed ledger file {} on truncation at {}", file_name, idx);
555 return true;
556 }
557
558 // Reset positions offset header
559 fseeko(file, 0, SEEK_SET);
560 positions_offset_header_t table_offset = 0;
561 if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
562 {
563 throw std::logic_error("Failed to reset positions table offset");
564 }
565
566 completed = false;
567 if (idx != get_last_idx())
568 {
569 total_len = positions.at(idx - start_idx + 1);
570 positions.resize(idx - start_idx + 1);
571 }
572
573 if (fflush(file) != 0)
574 {
575 throw std::logic_error(
576 fmt::format("Failed to flush ledger file: {}", strerror(errno)));
577 }
578
579 if (ftruncate(fileno(file), total_len))
580 {
581 throw std::logic_error(
582 fmt::format("Failed to truncate ledger: {}", strerror(errno)));
583 }
584
585 fseeko(file, total_len, SEEK_SET);
586 LOG_TRACE_FMT("Truncated ledger file {} at seqno {}", file_name, idx);
587 return false;
588 }
589
590 void complete()
591 {
592 if (completed)
593 {
594 return;
595 }
596 // It may happen (e.g. during recovery) that the incomplete ledger gets
597 // truncated on the primary, so we have to make sure that whenever we
598 // complete the file it doesn't contain anything past the last_idx, which
599 // can happen on the follower unless explicitly truncated before
600 // completion.
601 truncate(get_last_idx(), /* remove_file_if_empty = */ false);
602
603 fseeko(file, total_len, SEEK_SET);
604 size_t table_offset = ftello(file);
605
606 if (
607 fwrite(
608 reinterpret_cast<uint8_t*>(positions.data()),
609 sizeof(positions.at(0)),
610 positions.size(),
611 file) != positions.size())
612 {
613 throw std::logic_error("Failed to write positions table to ledger");
614 }
615
616 // Write positions table offset at start of file
617 if (fseeko(file, 0, SEEK_SET) != 0)
618 {
619 throw std::logic_error("Failed to set file offset to 0");
620 }
621
622 if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
623 {
624 throw std::logic_error("Failed to write positions table to ledger");
625 }
626
627 if (fflush(file) != 0)
628 {
629 throw std::logic_error(
630 fmt::format("Failed to flush ledger file: {}", strerror(errno)));
631 }
632
633 LOG_TRACE_FMT("Completed ledger file {}", file_name);
634
635 completed = true;
636 }
637
638 bool rename(const std::string& new_file_name)
639 {
640 auto file_path = dir / file_name;
641 auto new_file_path = dir / new_file_name;
642
643 try
644 {
645 files::rename(file_path, new_file_path);
646 }
647 catch (const std::exception& e)
648 {
649 // If the file cannot be renamed (e.g. file was removed), report an
650 // error and continue
651 LOG_FAIL_FMT("Error renaming ledger file: {}", e.what());
652 }
653 file_name = new_file_name;
654 return true;
655 }
656
657 void open()
658 {
659 auto new_file_name = remove_recovery_suffix(file_name.c_str());
660 rename(new_file_name);
661 recovery = false;
662 LOG_DEBUG_FMT("Open recovery ledger file {}", new_file_name);
663 }
664
665 bool commit(size_t idx)
666 {
667 if (!completed || committed || (idx != get_last_idx()))
668 {
669 // No effect if commit idx is not last idx
670 return false;
671 }
672
673 if (fflush(file) != 0)
674 {
675 throw std::logic_error(
676 fmt::format("Failed to flush ledger file: {}", strerror(errno)));
677 }
678
679 auto committed_file_name = fmt::format(
680 "{}_{}-{}.{}",
681 file_name_prefix,
682 start_idx,
683 get_last_idx(),
684 ledger_committed_suffix);
685
686 if (recovery)
687 {
688 committed_file_name = fmt::format(
689 "{}.{}", committed_file_name, ledger_recovery_file_suffix);
690 }
691
692 if (!rename(committed_file_name))
693 {
694 return false;
695 }
696
697 committed = true;
698 LOG_DEBUG_FMT("Committed ledger file {}", file_name);
699
700 // Committed recovery files stay in the list of active files until the
701 // ledger is open
702 return !recovery;
703 }
704 };
705
706 class Ledger
707 {
708 private:
709 ringbuffer::WriterPtr to_enclave;
710
711 // Main ledger directory (write and read)
712 const fs::path ledger_dir;
713
714 // Ledger directories (read-only)
715 std::vector<fs::path> read_ledger_dirs;
716
717 // Keep tracks of all ledger files for writing.
718 // Current ledger file is always the last one
719 std::list<std::shared_ptr<LedgerFile>> files;
720
721 // Cache of ledger files for reading
722 size_t max_read_cache_files;
723 std::list<std::shared_ptr<LedgerFile>> files_read_cache;
724 ccf::pal::Mutex read_cache_lock;
725
726 size_t last_idx = 0;
727 size_t committed_idx = 0;
728
729 size_t end_of_committed_files_idx = 0;
730
731 // Indicates if the ledger has been initialised at a specific idx and
732 // may still be replaying existing entries.
733 bool use_existing_files = false;
734 // Used to remember the last recovered idx on init so that
735 // use_existing_files can be disabled once this idx is passed
736 std::optional<size_t> last_idx_on_init = std::nullopt;
737
738 // Set during recovery to mark files as temporary until the recovery is
739 // complete
740 std::optional<size_t> recovery_start_idx = std::nullopt;
741
742 auto get_it_contains_idx(size_t idx) const
743 {
744 if (idx == 0)
745 {
746 return files.end();
747 }
748
749 auto f = std::upper_bound(
750 files.begin(),
751 files.end(),
752 idx,
753 [](size_t idx, const std::shared_ptr<LedgerFile>& f) {
754 return (idx <= f->get_last_idx());
755 });
756
757 return f;
758 }
759
760 std::shared_ptr<LedgerFile> get_file_from_cache(size_t idx)
761 {
762 if (idx == 0)
763 {
764 return nullptr;
765 }
766
767 {
768 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
769
770 // First, try to find file from read cache
771 for (auto const& f : files_read_cache)
772 {
773 if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
774 {
775 return f;
776 }
777 }
778 }
779
780 // If the file is not in the cache, find the file from the ledger
781 // directories, inspecting the main ledger directory first
782 // Note: reading recovery chunks from main ledger directory is
783 // acceptable and in fact required to complete private recovery.
784 std::string ledger_dir_;
785 auto match = get_file_name_with_idx(ledger_dir, idx, true);
786 if (match.has_value())
787 {
788 ledger_dir_ = ledger_dir;
789 }
790 else
791 {
792 for (auto const& dir : read_ledger_dirs)
793 {
794 match = get_file_name_with_idx(dir, idx, false);
795 if (match.has_value())
796 {
797 ledger_dir_ = dir;
798 break;
799 }
800 }
801 }
802
803 if (!match.has_value())
804 {
805 return nullptr;
806 }
807
808 // Emplace file in the max-sized read cache, replacing the oldest entry if
809 // the read cache is full
810 std::shared_ptr<LedgerFile> match_file = nullptr;
811 try
812 {
813 match_file = std::make_shared<LedgerFile>(ledger_dir_, match.value());
814 }
815 catch (const std::exception& e)
816 {
818 "Could not open ledger file {} to read seqno {}: {}",
819 match.value(),
820 idx,
821 e.what());
822 return nullptr;
823 }
824
825 {
826 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
827
828 files_read_cache.emplace_back(match_file);
829 if (files_read_cache.size() > max_read_cache_files)
830 {
831 files_read_cache.erase(files_read_cache.begin());
832 }
833 }
834
835 return match_file;
836 }
837
838 std::shared_ptr<LedgerFile> get_file_from_idx(
839 size_t idx, bool read_cache_only = false)
840 {
841 if (idx == 0)
842 {
843 return nullptr;
844 }
845
846 if (!read_cache_only)
847 {
848 // First, check if the file is in the list of files open for writing
849 auto f = std::upper_bound(
850 files.rbegin(),
851 files.rend(),
852 idx,
853 [](size_t idx, const std::shared_ptr<LedgerFile>& f) {
854 return idx >= f->get_start_idx();
855 });
856
857 if (f != files.rend())
858 {
859 return *f;
860 }
861 }
862
863 // Otherwise, return file from read cache
864 return get_file_from_cache(idx);
865 }
866
867 std::shared_ptr<LedgerFile> get_latest_file(
868 bool incomplete_only = true) const
869 {
870 if (files.empty())
871 {
872 return nullptr;
873 }
874 const auto& last_file = files.back();
875 if (incomplete_only && last_file->is_complete())
876 {
877 return nullptr;
878 }
879
880 return last_file;
881 }
882
883 std::optional<LedgerReadResult> read_entries_range(
884 size_t from,
885 size_t to,
886 bool read_cache_only = false,
887 std::optional<size_t> max_entries_size = std::nullopt)
888 {
889 // Note: if max_entries_size is set, this returns contiguous ledger
890 // entries on a best effort basis, so that the returned entries fit in
891 // max_entries_size but without maximising the number of entries returned.
892 if ((from <= 0) || (to < from))
893 {
894 return std::nullopt;
895 }
896
897 // During recovery or other low-knowledge batch operations, we might
898 // request entries past the end of the ledger - truncate to the true end
899 // here.
900 if (to > last_idx)
901 {
902 to = last_idx;
903 }
904
906 rr.end_idx = to;
907
908 size_t idx = from;
909 while (idx <= to)
910 {
911 auto f_from = get_file_from_idx(idx, read_cache_only);
912 if (f_from == nullptr)
913 {
914 LOG_FAIL_FMT("Cannot find ledger file for seqno {}", idx);
915 return std::nullopt;
916 }
917 auto to_ = std::min(f_from->get_last_idx(), to);
918 std::optional<size_t> max_size = std::nullopt;
919 if (max_entries_size.has_value())
920 {
921 max_size = max_entries_size.value() - rr.data.size();
922 }
923 auto v = f_from->read_entries(idx, to_, max_size);
924 if (!v.has_value())
925 {
926 break;
927 }
928 rr.end_idx = v->end_idx;
929 rr.data.insert(
930 rr.data.end(),
931 std::make_move_iterator(v->data.begin()),
932 std::make_move_iterator(v->data.end()));
933 if (v->end_idx != to_)
934 {
935 // If all the entries requested from a file are not returned (i.e.
936 // because the requested entries are larger than max_entries_size),
937 // return immediately to avoid returning non-contiguous entries from a
938 // subsequent ledger file.
939 break;
940 }
941 idx = to_ + 1;
942 }
943
944 if (!rr.data.empty())
945 {
946 return rr;
947 }
948 else
949 {
950 return std::nullopt;
951 }
952 }
953
954 void ignore_ledger_file(const std::string& file_name)
955 {
956 if (is_ledger_file_name_ignored(file_name))
957 {
958 return;
959 }
960
961 auto ignored_file_name =
962 fmt::format("{}.{}", file_name, ledger_ignored_file_suffix);
963 files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name);
964 }
965
966 void delete_ledger_files_after_idx(size_t idx)
967 {
968 // Use with caution! Delete all ledger files later than idx
969 for (auto const& f : fs::directory_iterator(ledger_dir))
970 {
971 auto file_name = f.path().filename();
972 auto start_idx = get_start_idx_from_file_name(file_name);
973 if (start_idx > idx)
974 {
975 if (!fs::remove(ledger_dir / file_name))
976 {
977 throw std::logic_error(
978 fmt::format("Could not remove file {}", file_name));
979 }
981 "Forcing removal of ledger file {} as start idx {} > {}",
982 file_name,
983 start_idx,
984 idx);
985 }
986 }
987 }
988
989 std::shared_ptr<LedgerFile> get_existing_ledger_file_for_idx(size_t idx)
990 {
991 if (!use_existing_files)
992 {
993 return nullptr;
994 }
995
996 for (auto const& f : fs::directory_iterator(ledger_dir))
997 {
998 auto file_name = f.path().filename();
999 if (
1000 idx == get_start_idx_from_file_name(file_name) &&
1001 !is_ledger_file_ignored(file_name))
1002 {
1003 return std::make_shared<LedgerFile>(
1004 ledger_dir, file_name, true /* from_existing_file */);
1005
1006 break;
1007 }
1008 }
1009
1010 return nullptr;
1011 }
1012
1013 public:
1015 const fs::path& ledger_dir,
1016 ringbuffer::AbstractWriterFactory& writer_factory,
1017 size_t max_read_cache_files = ledger_max_read_cache_files_default,
1018 const std::vector<std::string>& read_ledger_dirs_ = {}) :
1019 to_enclave(writer_factory.create_writer_to_inside()),
1020 ledger_dir(ledger_dir),
1021 max_read_cache_files(max_read_cache_files)
1022 {
1023 // Recover last idx from read-only ledger directories
1024 for (const auto& read_dir : read_ledger_dirs_)
1025 {
1026 LOG_INFO_FMT("Recovering read-only ledger directory \"{}\"", read_dir);
1027 if (!fs::is_directory(read_dir))
1028 {
1029 throw std::logic_error(
1030 fmt::format("{} read-only ledger is not a directory", read_dir));
1031 }
1032
1033 read_ledger_dirs.emplace_back(read_dir);
1034
1035 for (auto const& f : fs::directory_iterator(read_dir))
1036 {
1037 auto file_name = f.path().filename();
1038 auto last_idx_ = get_last_idx_from_file_name(file_name);
1039 if (
1040 !last_idx_.has_value() ||
1041 !is_ledger_file_name_committed(file_name) ||
1042 is_ledger_file_name_ignored(file_name))
1043 {
1045 "Read-only ledger file {} is ignored as not committed",
1046 file_name);
1047 continue;
1048 }
1049
1050 if (last_idx_.value() > last_idx)
1051 {
1052 last_idx = last_idx_.value();
1053 committed_idx = last_idx;
1054 end_of_committed_files_idx = last_idx;
1055 }
1056
1058 "Recovering file from read-only ledger directory: {}", file_name);
1059 }
1060 }
1061
1062 if (last_idx > 0)
1063 {
1065 "Recovered read-only ledger directories up to {}, committed up to "
1066 "{} ",
1067 last_idx,
1068 committed_idx);
1069 }
1070
1071 if (fs::is_directory(ledger_dir))
1072 {
1073 // If the ledger directory exists, populate this->files with the
1074 // writeable files from it. These must have no suffix, and must not
1075 // end-before the current committed_idx found from the read-only
1076 // directories
1077 LOG_INFO_FMT("Recovering main ledger directory {}", ledger_dir);
1078
1079 for (auto const& f : fs::directory_iterator(ledger_dir))
1080 {
1081 auto file_name = f.path().filename();
1082
1083 if (is_ledger_file_ignored(file_name))
1084 {
1086 "Ignoring ledger file {} in main ledger directory", file_name);
1087
1088 ignore_ledger_file(file_name);
1089
1090 continue;
1091 }
1092
1093 const auto file_end_idx = get_last_idx_from_file_name(file_name);
1094
1095 if (is_ledger_file_name_committed(file_name))
1096 {
1097 if (!file_end_idx.has_value())
1098 {
1100 "Unexpected file {} in {}: committed but not completed",
1101 file_name,
1102 ledger_dir);
1103 }
1104 else
1105 {
1106 if (file_end_idx.value() > committed_idx)
1107 {
1108 committed_idx = file_end_idx.value();
1109 end_of_committed_files_idx = file_end_idx.value();
1110 }
1111 }
1112
1113 continue;
1114 }
1115
1116 if (file_end_idx.has_value() && file_end_idx.value() <= committed_idx)
1117 {
1119 "Ignoring ledger file {} in main ledger directory - already "
1120 "discovered commit up to {} from read-only directories",
1121 file_name,
1122 committed_idx);
1123
1124 ignore_ledger_file(file_name);
1125
1126 continue;
1127 }
1128
1129 std::shared_ptr<LedgerFile> ledger_file = nullptr;
1130 try
1131 {
1132 ledger_file = std::make_shared<LedgerFile>(ledger_dir, file_name);
1133
1134 // Truncate file to latest recovered index to cleanup entries that
1135 // may have been corrupted (no-op if file isn't corrupted)
1136 if (ledger_file->truncate(ledger_file->get_last_idx()))
1137 {
1138 // If truncation of corrupted entries removes file, file is not
1139 // recovered
1140 LOG_FAIL_FMT("Removed ledger file {}", file_name);
1141 continue;
1142 }
1143 }
1144 catch (const std::exception& e)
1145 {
1147 "Error reading ledger file {}: {}", file_name, e.what());
1148 // Ignore file if it cannot be recovered.
1149 ignore_ledger_file(file_name);
1150 continue;
1151 }
1152
1154 "Recovering file from main ledger directory: {}", file_name);
1155 files.emplace_back(std::move(ledger_file));
1156 }
1157
1158 if (files.empty())
1159 {
1161 "Main ledger directory {} is empty: no ledger file to "
1162 "recover",
1163 ledger_dir);
1164
1165 // If we had any uncommitted files, we wouldn't be in this path and
1166 // we'd populate last_idx below. In this branch, we need to ensure
1167 // last_idx is correctly initialised. Since there are no uncommitted
1168 // files, it must match the last committed_idx we've discovered.
1169 last_idx = committed_idx;
1170 return;
1171 }
1172 else
1173 {
1175 "Main ledger directory {} contains {} restored (writeable) files",
1176 ledger_dir,
1177 files.size());
1178 }
1179
1180 files.sort([](
1181 const std::shared_ptr<LedgerFile>& a,
1182 const std::shared_ptr<LedgerFile>& b) {
1183 return a->get_last_idx() < b->get_last_idx();
1184 });
1185
1186 const auto main_ledger_dir_last_idx =
1187 get_latest_file(false)->get_last_idx();
1188 if (main_ledger_dir_last_idx > last_idx)
1189 {
1190 last_idx = main_ledger_dir_last_idx;
1191 }
1192 }
1193 else
1194 {
1195 if (!fs::create_directory(ledger_dir))
1196 {
1197 throw std::logic_error(fmt::format(
1198 "Error: Could not create ledger directory: {}", ledger_dir));
1199 }
1200 }
1201
1203 "Recovered ledger entries up to {}, committed to {}",
1204 last_idx,
1205 committed_idx);
1206 }
1207
1208 Ledger(const Ledger& that) = delete;
1209
1210 void init(size_t idx, size_t recovery_start_idx_ = 0)
1211 {
1212 TimeBoundLogger log_if_slow(
1213 fmt::format("Initing ledger - seqno={}", idx));
1214
1215 // Used by backup nodes to initialise the ledger when starting from a
1216 // non-empty state, i.e. snapshot. It is assumed that idx is included in a
1217 // committed ledger file.
1218
1219 // To restart from a snapshot cleanly, in the main ledger directory,
1220 // mark all subsequent ledger as non-committed as their contents will be
1221 // replayed.
1222 for (auto const& f : fs::directory_iterator(ledger_dir))
1223 {
1224 auto file_name = f.path().filename();
1225 if (
1226 is_ledger_file_name_committed(file_name) &&
1227 (get_start_idx_from_file_name(file_name) > idx))
1228 {
1229 auto last_idx_file = get_last_idx_from_file_name(file_name);
1230 if (!last_idx_file.has_value())
1231 {
1232 throw std::logic_error(fmt::format(
1233 "Committed ledger file {} does not include last idx in file name",
1234 file_name));
1235 }
1236
1238 "Remove committed suffix from ledger file {} after init at {}: "
1239 "last_idx {}",
1240 file_name,
1241 idx,
1242 last_idx_file.value());
1243
1244 files::rename(
1245 ledger_dir / file_name,
1246 ledger_dir /
1247 remove_suffix(
1248 file_name.string(),
1249 fmt::format(
1250 "{}{}.{}",
1251 ledger_last_idx_delimiter,
1252 last_idx_file.value(),
1253 ledger_committed_suffix)));
1254 }
1255 }
1256
1257 // Close all open write files as the the ledger should
1258 // restart cleanly, from a new chunk.
1259 files.clear();
1260
1261 use_existing_files = true;
1262 last_idx_on_init = last_idx;
1263 last_idx = idx;
1264 committed_idx = idx;
1265 if (recovery_start_idx_ > 0)
1266 {
1267 // Do not set recovery idx and create recovery chunks
1268 // if the ledger is initialised from 0 (i.e. genesis)
1269 recovery_start_idx = recovery_start_idx_;
1270 }
1271
1273 "Set last known/commit seqno to {}, recovery seqno to {}",
1274 idx,
1275 recovery_start_idx_);
1276 }
1277
1279 {
1280 // When the recovery is completed (i.e. service is open), temporary
1281 // recovery ledger chunks are renamed as they can now be recovered.
1282 // Note: this operation cannot be rolled back.
1283 LOG_INFO_FMT("Ledger complete recovery");
1284
1285 for (auto it = files.begin(); it != files.end();)
1286 {
1287 auto& f = *it;
1288 if (f->is_recovery())
1289 {
1290 f->open();
1291
1292 // Recovery files are kept in the list of active files when committed
1293 // so that they can be renamed in a stable order when the service is
1294 // open. Once this is done, they can be removed from the list of
1295 // active files.
1296 if (f->is_committed())
1297 {
1298 it = files.erase(it);
1299 continue;
1300 }
1301 }
1302 ++it;
1303 }
1304
1305 recovery_start_idx.reset();
1306 }
1307
1308 size_t get_last_idx() const
1309 {
1310 return last_idx;
1311 }
1312
1313 void set_recovery_start_idx(size_t idx)
1314 {
1315 recovery_start_idx = idx;
1316 }
1317
1318 std::optional<LedgerReadResult> read_entry(size_t idx)
1319 {
1320 TimeBoundLogger log_if_slow(
1321 fmt::format("Reading ledger entry at {}", idx));
1322
1323 return read_entries_range(idx, idx);
1324 }
1325
1326 std::optional<LedgerReadResult> read_entries(
1327 size_t from,
1328 size_t to,
1329 std::optional<size_t> max_entries_size = std::nullopt)
1330 {
1331 TimeBoundLogger log_if_slow(
1332 fmt::format("Reading ledger entries from {} to {}", from, to));
1333
1334 return read_entries_range(from, to, false, max_entries_size);
1335 }
1336
1337 size_t write_entry(const uint8_t* data, size_t size, bool committable)
1338 {
1339 TimeBoundLogger log_if_slow(fmt::format(
1340 "Writing ledger entry - {} bytes, committable={}", size, committable));
1341
1342 auto header =
1343 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1344
1346 {
1348 "Forcing ledger chunk before entry as required by the entry header "
1349 "flags");
1350
1351 auto file = get_latest_file();
1352 if (file != nullptr)
1353 {
1354 file->complete();
1355 LOG_DEBUG_FMT("Ledger chunk completed at {}", file->get_last_idx());
1356 }
1357 }
1358
1359 bool force_chunk_after =
1361 if (force_chunk_after)
1362 {
1363 if (!committable)
1364 {
1365 throw std::logic_error(
1366 "Ledger chunks cannot end in a non-committable transaction");
1367 }
1369 "Forcing ledger chunk after entry as required by the entry header "
1370 "flags");
1371 }
1372
1373 auto file = get_latest_file();
1374 if (file == nullptr)
1375 {
1376 // If no file is currently open for writing, create a new one
1377 size_t start_idx = last_idx + 1;
1378 if (use_existing_files)
1379 {
1380 // When recovering files from persistence, try to find one on disk
1381 // first
1382 file = get_existing_ledger_file_for_idx(start_idx);
1383 }
1384 if (file == nullptr)
1385 {
1386 bool is_recovery = recovery_start_idx.has_value() &&
1387 start_idx > recovery_start_idx.value();
1388 file =
1389 std::make_shared<LedgerFile>(ledger_dir, start_idx, is_recovery);
1390 }
1391 files.emplace_back(file);
1392 }
1393 auto [last_idx_, has_truncated] =
1394 file->write_entry(data, size, committable);
1395 last_idx = last_idx_;
1396
1397 if (has_truncated)
1398 {
1399 // If a divergence was detected when writing the entry, delete all
1400 // further ledger files to cleanly continue
1401 LOG_INFO_FMT("Found divergent ledger entry at {}", last_idx);
1402 delete_ledger_files_after_idx(last_idx);
1403 use_existing_files = false;
1404 }
1405
1406 if (
1407 use_existing_files && last_idx_on_init.has_value() &&
1408 last_idx > last_idx_on_init.value())
1409 {
1410 use_existing_files = false;
1411 }
1412
1414 "Wrote entry at {} [committable: {}, force chunk after: {}]",
1415 last_idx,
1416 committable,
1417 force_chunk_after);
1418
1419 if (committable && force_chunk_after)
1420 {
1421 file->complete();
1422 LOG_DEBUG_FMT("Ledger chunk completed at {}", last_idx);
1423 }
1424
1425 return last_idx;
1426 }
1427
1428 void truncate(size_t idx)
1429 {
1430 TimeBoundLogger log_if_slow(fmt::format("Truncating ledger at {}", idx));
1431
1432 LOG_DEBUG_FMT("Ledger truncate: {}/{}", idx, last_idx);
1433
1434 // Conservative check to avoid truncating to future indices, or dropping
1435 // committed entries. If the ledger is being initialised from a snapshot
1436 // alone, the first truncation effectively sets the last index.
1437 if (last_idx != 0 && (idx >= last_idx || idx < committed_idx))
1438 {
1440 "Ignoring truncate to {} - last_idx: {}, committed_idx: {}",
1441 idx,
1442 last_idx,
1443 committed_idx);
1444 return;
1445 }
1446
1447 auto f_from = get_it_contains_idx(idx + 1);
1448 auto f_to = get_it_contains_idx(last_idx);
1449 auto f_end = std::next(f_to);
1450
1451 for (auto it = f_from; it != f_end;)
1452 {
1453 // Truncate the first file to the truncation index while the more
1454 // recent files are deleted entirely
1455 auto truncate_idx = (it == f_from) ? idx : (*it)->get_start_idx() - 1;
1456 if ((*it)->truncate(truncate_idx))
1457 {
1458 it = files.erase(it);
1459 }
1460 else
1461 {
1462 it++;
1463 }
1464 }
1465
1466 last_idx = idx;
1467 }
1468
1469 void commit(size_t idx)
1470 {
1471 TimeBoundLogger log_if_slow(
1472 fmt::format("Committing ledger entry {}", idx));
1473
1474 LOG_DEBUG_FMT("Ledger commit: {}/{}", idx, last_idx);
1475
1476 if (idx <= committed_idx || idx > last_idx)
1477 {
1478 return;
1479 }
1480
1481 auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
1482 get_it_contains_idx(committed_idx);
1483 auto f_to = get_it_contains_idx(idx);
1484 auto f_end = std::next(f_to);
1485
1486 for (auto it = f_from; it != f_end;)
1487 {
1488 // Commit all previous file to their latest index while the latest
1489 // file is committed to the committed index
1490 const auto last_idx_in_file = (*it)->get_last_idx();
1491 auto commit_idx = (it == f_to) ? idx : last_idx_in_file;
1492 if (
1493 (*it)->commit(commit_idx) &&
1494 (it != f_to || (idx == last_idx_in_file)))
1495 {
1496 end_of_committed_files_idx = last_idx_in_file;
1497 it = files.erase(it);
1498 }
1499 else
1500 {
1501 it++;
1502 }
1503 }
1504
1505 committed_idx = idx;
1506 }
1507
1508 bool is_in_committed_file(size_t idx)
1509 {
1510 return idx <= end_of_committed_files_idx;
1511 }
1512
1514 {
1515 // Filled on construction
1517 size_t from_idx;
1518 size_t to_idx;
1519 size_t max_size;
1520
1521 // First argument is ledger entries (or nullopt if not found)
1522 // Second argument is uv status code, which may indicate a cancellation
1524 std::function<void(std::optional<LedgerReadResult>&&, int)>;
1526
1527 // Final result
1528 std::optional<LedgerReadResult> read_result = std::nullopt;
1529 };
1530
1531 static void on_ledger_get_async(uv_work_t* req)
1532 {
1533 auto data = static_cast<AsyncLedgerGet*>(req->data);
1534
1535 data->read_result = data->ledger->read_entries_range(
1536 data->from_idx, data->to_idx, true, data->max_size);
1537 }
1538
1539 static void on_ledger_get_async_complete(uv_work_t* req, int status)
1540 {
1541 auto data = static_cast<AsyncLedgerGet*>(req->data);
1542
1543 data->result_cb(std::move(data->read_result), status);
1544
1545 delete data;
1546 delete req;
1547 }
1548
1550 size_t from_idx,
1551 size_t to_idx,
1552 std::optional<LedgerReadResult>&& read_result,
1554 {
1555 if (read_result.has_value())
1556 {
1558 ::consensus::ledger_entry_range,
1559 to_enclave,
1560 from_idx,
1561 read_result->end_idx,
1562 purpose,
1563 read_result->data);
1564 }
1565 else
1566 {
1568 ::consensus::ledger_no_entry_range,
1569 to_enclave,
1570 from_idx,
1571 to_idx,
1572 purpose);
1573 }
1574 }
1575
1578 {
1580 disp,
1581 ::consensus::ledger_init,
1582 [this](const uint8_t* data, size_t size) {
1583 auto idx = serialized::read<::consensus::Index>(data, size);
1584 auto recovery_start_index =
1585 serialized::read<::consensus::Index>(data, size);
1586 init(idx, recovery_start_index);
1587 });
1588
1590 disp,
1591 ::consensus::ledger_append,
1592 [this](const uint8_t* data, size_t size) {
1593 auto committable = serialized::read<bool>(data, size);
1594 write_entry(data, size, committable);
1595 });
1596
1598 disp,
1599 ::consensus::ledger_truncate,
1600 [this](const uint8_t* data, size_t size) {
1601 auto idx = serialized::read<::consensus::Index>(data, size);
1602 auto recovery_mode = serialized::read<bool>(data, size);
1603 truncate(idx);
1604 if (recovery_mode)
1605 {
1607 }
1608 });
1609
1611 disp,
1612 ::consensus::ledger_commit,
1613 [this](const uint8_t* data, size_t size) {
1614 auto idx = serialized::read<::consensus::Index>(data, size);
1615 commit(idx);
1616 });
1617
1619 disp, ::consensus::ledger_open, [this](const uint8_t*, size_t) {
1621 });
1622
1624 disp,
1625 ::consensus::ledger_get_range,
1626 [&](const uint8_t* data, size_t size) {
1627 auto [from_idx, to_idx, purpose] =
1628 ringbuffer::read_message<::consensus::ledger_get_range>(data, size);
1629
1630 // Ledger entries response has metadata so cap total entries size
1631 // accordingly
1632 constexpr size_t write_ledger_range_response_metadata_size = 2048;
1633 auto max_entries_size = to_enclave->get_max_message_size() -
1634 write_ledger_range_response_metadata_size;
1635
1636 if (is_in_committed_file(to_idx))
1637 {
1638 // Start an asynchronous job to do this, since it is committed and
1639 // can be accessed independently (and in parallel)
1640 uv_work_t* work_handle = new uv_work_t;
1641
1642 {
1643 auto job = new AsyncLedgerGet;
1644 job->ledger = this;
1645 job->from_idx = from_idx;
1646 job->to_idx = to_idx;
1647 job->max_size = max_entries_size;
1648 job->result_cb = [this,
1649 from_idx_ = from_idx,
1650 to_idx_ = to_idx,
1651 purpose_ =
1652 purpose](auto&& read_result, int status) {
1653 // NB: Even if status is cancelled (and entry is empty), we
1654 // want to write this result back to the enclave
1656 from_idx_, to_idx_, std::move(read_result), purpose_);
1657 };
1658
1659 work_handle->data = job;
1660 }
1661
1662 uv_queue_work(
1663 uv_default_loop(),
1664 work_handle,
1667 }
1668 else
1669 {
1670 // Read synchronously, since this accesses uncommitted state and
1671 // must accurately reflect changing files
1673 from_idx,
1674 to_idx,
1675 read_entries(from_idx, to_idx, max_entries_size),
1676 purpose);
1677 }
1678 });
1679 }
1680 };
1681}
Definition ledger.h:148
std::pair< size_t, bool > write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:391
LedgerFile(const std::string &dir, const std::string &file_name_, bool from_existing_file_=false)
Definition ledger.h:212
LedgerFile(const fs::path &dir, size_t start_idx, bool recovery=false)
Definition ledger.h:178
bool truncate(size_t idx, bool remove_file_if_empty=true)
Definition ledger.h:536
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt)
Definition ledger.h:495
~LedgerFile()
Definition ledger.h:351
bool commit(size_t idx)
Definition ledger.h:665
bool is_committed() const
Definition ledger.h:374
void complete()
Definition ledger.h:590
bool is_recovery() const
Definition ledger.h:384
std::pair< size_t, size_t > entries_size(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt) const
Definition ledger.h:441
bool rename(const std::string &new_file_name)
Definition ledger.h:638
bool is_complete() const
Definition ledger.h:379
void open()
Definition ledger.h:657
size_t get_last_idx() const
Definition ledger.h:364
size_t get_current_size() const
Definition ledger.h:369
size_t get_start_idx() const
Definition ledger.h:359
Definition ledger.h:707
std::optional< LedgerReadResult > read_entry(size_t idx)
Definition ledger.h:1318
Ledger(const fs::path &ledger_dir, ringbuffer::AbstractWriterFactory &writer_factory, size_t max_read_cache_files=ledger_max_read_cache_files_default, const std::vector< std::string > &read_ledger_dirs_={})
Definition ledger.h:1014
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1326
size_t write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:1337
void truncate(size_t idx)
Definition ledger.h:1428
static void on_ledger_get_async_complete(uv_work_t *req, int status)
Definition ledger.h:1539
void complete_recovery()
Definition ledger.h:1278
bool is_in_committed_file(size_t idx)
Definition ledger.h:1508
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition ledger.h:1576
static void on_ledger_get_async(uv_work_t *req)
Definition ledger.h:1531
Ledger(const Ledger &that)=delete
void write_ledger_get_range_response(size_t from_idx, size_t to_idx, std::optional< LedgerReadResult > &&read_result, ::consensus::LedgerRequestPurpose purpose)
Definition ledger.h:1549
void set_recovery_start_idx(size_t idx)
Definition ledger.h:1313
void init(size_t idx, size_t recovery_start_idx_=0)
Definition ledger.h:1210
size_t get_last_idx() const
Definition ledger.h:1308
void commit(size_t idx)
Definition ledger.h:1469
Definition messaging.h:38
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition after_io.h:8
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::mutex Mutex
Definition locking.h:12
LedgerRequestPurpose
Definition ledger_enclave_types.h:14
Definition files.h:20
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition ledger.h:142
std::vector< uint8_t > data
Definition ledger.h:143
size_t end_idx
Definition ledger.h:144
Definition ledger.h:1514
std::optional< LedgerReadResult > read_result
Definition ledger.h:1528
Ledger * ledger
Definition ledger.h:1516
size_t max_size
Definition ledger.h:1519
ResultCallback result_cb
Definition ledger.h:1525
size_t from_idx
Definition ledger.h:1517
size_t to_idx
Definition ledger.h:1518
std::function< void(std::optional< LedgerReadResult > &&, int)> ResultCallback
Definition ledger.h:1524
Definition time_bound_logger.h:14
Definition serialised_entry_format.h:21
uint64_t size
Definition serialised_entry_format.h:28