CCF
Loading...
Searching...
No Matches
ledger.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
6#include "ccf/pal/locking.h"
8#include "ds/files.h"
10#include "ds/messaging.h"
11#include "ds/serialized.h"
12#include "kv/kv_types.h"
14#include "time_bound_logger.h"
15
16#include <cstdint>
17#include <cstdio>
18#include <filesystem>
19#include <list>
20#include <map>
21#include <string>
22#include <sys/types.h>
23#include <unistd.h>
24#include <uv.h>
25#include <vector>
26
27namespace fs = std::filesystem;
28
29namespace asynchost
30{
31 static constexpr size_t ledger_max_read_cache_files_default = 5;
32
33 static constexpr auto ledger_committed_suffix = "committed";
34 static constexpr auto ledger_start_idx_delimiter = "_";
35 static constexpr auto ledger_last_idx_delimiter = "-";
36 static constexpr auto ledger_recovery_file_suffix = "recovery";
37 static constexpr auto ledger_ignored_file_suffix = "ignored";
38
39 static inline size_t get_start_idx_from_file_name(
40 const std::string& file_name)
41 {
42 auto pos = file_name.find(ledger_start_idx_delimiter);
43 if (pos == std::string::npos)
44 {
45 throw std::logic_error(fmt::format(
46 "Ledger file name {} does not contain a start seqno", file_name));
47 }
48
49 return std::stol(file_name.substr(pos + 1));
50 }
51
52 static inline std::optional<size_t> get_last_idx_from_file_name(
53 const std::string& file_name)
54 {
55 auto pos = file_name.find(ledger_last_idx_delimiter);
56 if (pos == std::string::npos)
57 {
58 // Non-committed file names do not contain a last idx
59 return std::nullopt;
60 }
61
62 return std::stol(file_name.substr(pos + 1));
63 }
64
65 static inline bool is_ledger_file_name_committed(const std::string& file_name)
66 {
67 return file_name.ends_with(ledger_committed_suffix);
68 }
69
70 static inline bool is_ledger_file_name_recovery(const std::string& file_name)
71 {
72 return file_name.ends_with(ledger_recovery_file_suffix);
73 }
74
75 static inline bool is_ledger_file_name_ignored(const std::string& file_name)
76 {
77 return file_name.ends_with(ledger_ignored_file_suffix);
78 }
79
80 static inline bool is_ledger_file_ignored(const std::string& file_name)
81 {
82 // Catch-all for all files that should be ignored
83 return is_ledger_file_name_recovery(file_name) ||
84 is_ledger_file_name_ignored(file_name);
85 }
86
87 static inline fs::path remove_suffix(
88 std::string_view file_name, const std::string& suffix)
89 {
90 if (file_name.ends_with(suffix))
91 {
92 file_name.remove_suffix(suffix.size());
93 }
94 return file_name;
95 }
96
97 static inline fs::path remove_recovery_suffix(std::string_view file_name)
98 {
99 return remove_suffix(
100 file_name, fmt::format(".{}", ledger_recovery_file_suffix));
101 }
102
103 static std::optional<std::string> get_file_name_with_idx(
104 const std::string& dir, size_t idx, bool allow_recovery_files)
105 {
106 std::optional<std::string> match = std::nullopt;
107 for (auto const& f : fs::directory_iterator(dir))
108 {
109 // If any file, based on its name, contains idx. Only committed
110 // (i.e. those with a last idx) are considered here.
111 auto f_name = f.path().filename();
112 if (
113 is_ledger_file_name_ignored(f_name) ||
114 (!allow_recovery_files && is_ledger_file_name_recovery(f_name)))
115 {
116 continue;
117 }
118
119 size_t start_idx = 0;
120 std::optional<size_t> last_idx = std::nullopt;
121 try
122 {
123 start_idx = get_start_idx_from_file_name(f_name);
124 last_idx = get_last_idx_from_file_name(f_name);
125 }
126 catch (const std::exception& e)
127 {
128 // Ignoring invalid ledger file
129 continue;
130 }
131 if (idx >= start_idx && last_idx.has_value() && idx <= last_idx.value())
132 {
133 match = f_name;
134 break;
135 }
136 }
137
138 return match;
139 }
140
142 {
143 std::vector<uint8_t> data;
144 size_t end_idx{};
145 };
146
148 {
149 private:
150 using positions_offset_header_t = size_t;
151 static constexpr auto file_name_prefix = "ledger";
152
153 const fs::path dir;
154 fs::path file_name;
155
156 // This uses C stdio instead of fstream because an fstream
157 // cannot be truncated.
158 FILE* file = nullptr;
159 ccf::pal::Mutex file_lock;
160
161 size_t start_idx = 1;
162 size_t total_len = 0; // Points to end of last written entry
163 std::vector<uint32_t> positions;
164
165 bool completed = false;
166 bool committed = false;
167
168 bool recovery = false;
169
170 // This flag is set when an existing ledger is recovered and started (init)
171 // from an old idx. In this case, further ledger files (i.e. those which
172 // contain entries later than init idx), remain on disk and new entries are
173 // checked against the existing ones, until a divergence is found.
174 bool from_existing_file = false;
175
176 public:
177 // Used when creating a new (empty) ledger file
178 LedgerFile(const fs::path& dir, size_t start_idx, bool recovery = false) :
179 dir(dir),
180 file_name(fmt::format("{}_{}", file_name_prefix, start_idx)),
181 start_idx(start_idx),
182 recovery(recovery)
183 {
184 if (recovery)
185 {
186 file_name =
187 fmt::format("{}.{}", file_name.string(), ledger_recovery_file_suffix);
188 }
189
190 auto file_path = dir / file_name;
191 if (fs::exists(file_path))
192 {
193 throw std::logic_error(fmt::format(
194 "Cannot create new ledger file {} in main ledger directory {} as it "
195 "already exists",
196 file_name,
197 dir));
198 }
199 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
200 file = fopen(file_path.c_str(), "w+b");
201 if (file == nullptr)
202 {
203 throw std::logic_error(fmt::format(
204 "Unable to open ledger file {}: {}",
205 file_path,
206 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
207 }
208
209 // Header reserved for the offset to the position table
210 fseeko(file, sizeof(positions_offset_header_t), SEEK_SET);
211 total_len = sizeof(positions_offset_header_t);
212 }
213
214 // Used when recovering an existing ledger file
216 const std::string& dir,
217 const std::string& file_name_,
218 bool from_existing_file_ = false) :
219 dir(dir),
220 file_name(file_name_),
221 from_existing_file(from_existing_file_)
222 {
223 auto file_path = (fs::path(dir) / fs::path(file_name));
224
225 committed = is_ledger_file_name_committed(file_name);
226 start_idx = get_start_idx_from_file_name(file_name);
227
228 const auto* const mode = committed ? "rb" : "r+b";
229
230 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
231 file = fopen(file_path.c_str(), mode);
232
233 if (file == nullptr)
234 {
235 throw std::logic_error(fmt::format(
236 "Unable to open ledger file {}: {}",
237 file_path,
238 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
239 }
240
241 // First, get full size of file
242 fseeko(file, 0, SEEK_END);
243 size_t total_file_size = ftello(file);
244
245 // Second, read offset to header table
246 fseeko(file, 0, SEEK_SET);
247 positions_offset_header_t table_offset = 0;
248 if (fread(&table_offset, sizeof(positions_offset_header_t), 1, file) != 1)
249 {
250 throw std::logic_error(fmt::format(
251 "Failed to read positions offset from ledger file {}", file_path));
252 }
253
254 if (committed && table_offset == 0)
255 {
256 throw std::logic_error(fmt::format(
257 "Committed ledger file {} cannot be read: invalid table offset (0)",
258 file_path));
259 }
260
261 total_len = sizeof(positions_offset_header_t);
262
263 if (from_existing_file)
264 {
265 // When recovering a file from persistence, do not recover entries to
266 // start with as these are expected to be written again at a later
267 // point.
268 return;
269 }
270
271 if (table_offset != 0)
272 {
273 // If the chunk was completed, read positions table from file directly
274 total_len = table_offset;
275 fseeko(file, table_offset, SEEK_SET);
276
277 if (table_offset > total_file_size)
278 {
279 throw std::logic_error(fmt::format(
280 "Invalid table offset {} greater than total file size {}",
281 table_offset,
282 total_file_size));
283 }
284
285 positions.resize(
286 (total_file_size - table_offset) / sizeof(positions.at(0)));
287
288 if (
289 fread(
290 positions.data(),
291 sizeof(positions.at(0)),
292 positions.size(),
293 file) != positions.size())
294 {
295 throw std::logic_error(fmt::format(
296 "Failed to read positions table from ledger file {}", file_path));
297 }
298 completed = true;
299 }
300 else
301 {
302 // If the chunk was not completed, read all entries to reconstruct
303 // positions table
304 total_len = sizeof(positions_offset_header_t);
305 auto len = total_file_size - total_len;
306
307 ccf::kv::SerialisedEntryHeader entry_header = {};
308 size_t current_idx = start_idx;
309 while (len >= ccf::kv::serialised_entry_header_size)
310 {
311 if (
312 fread(
313 &entry_header, ccf::kv::serialised_entry_header_size, 1, file) !=
314 1)
315 {
317 "Failed to read entry header from ledger file {} at seqno {}",
318 file_path,
319 current_idx);
320 return;
321 }
322
323 len -= ccf::kv::serialised_entry_header_size;
324
325 const auto& entry_size = entry_header.size;
326 if (len < entry_size)
327 {
329 "Malformed incomplete ledger file {} at seqno {} (expecting "
330 "entry of size "
331 "{}, remaining {})",
332 file_path,
333 current_idx,
334 entry_size,
335 len);
336
337 return;
338 }
339
340 fseeko(file, entry_size, SEEK_CUR);
341 len -= entry_size;
342
344 "Recovered one entry of size {} at seqno {}",
345 entry_size,
346 current_idx);
347
348 current_idx++;
349 positions.push_back(total_len);
350 total_len += (ccf::kv::serialised_entry_header_size + entry_size);
351 }
352 completed = false;
353 }
354 }
355
357 {
358 if (file != nullptr)
359 {
360 std::ignore =
361 fclose(file); // NOLINT(cppcoreguidelines-owning-memory,cert-err33-c)
362 }
363 }
364
365 [[nodiscard]] size_t get_start_idx() const
366 {
367 return start_idx;
368 }
369
370 [[nodiscard]] size_t get_last_idx() const
371 {
372 return start_idx + positions.size() - 1;
373 }
374
375 [[nodiscard]] size_t get_current_size() const
376 {
377 return total_len;
378 }
379
380 [[nodiscard]] bool is_committed() const
381 {
382 return committed;
383 }
384
385 [[nodiscard]] bool is_complete() const
386 {
387 return completed;
388 }
389
390 [[nodiscard]] bool is_recovery() const
391 {
392 return recovery;
393 }
394
395 // Returns idx of new entry, and boolean to indicate that file was truncated
396 // before writing the entry
397 std::pair<size_t, bool> write_entry(
398 const uint8_t* data, size_t size, bool committable)
399 {
400 fseeko(file, total_len, SEEK_SET);
401
402 bool should_write = true;
403 bool has_truncated = false;
404 if (from_existing_file)
405 {
406 std::vector<uint8_t> entry(size);
407 if (
408 fread(entry.data(), size, 1, file) != 1 ||
409 memcmp(entry.data(), data, size) != 0)
410 {
411 // Divergence between existing and new entry. Truncate this file,
412 // write the new entry and notify the caller for further cleanup.
413 // Note that even if the truncation results in an empty file, we keep
414 // it on disk as a new entry is about to be written.
415 truncate(get_last_idx(), false /* remove_file_if_empty */);
416 has_truncated = true;
417 from_existing_file = false;
418 }
419 else
420 {
421 should_write = false;
422 }
423 }
424
425 if (should_write)
426 {
427 if (fwrite(data, size, 1, file) != 1)
428 {
429 throw std::logic_error("Failed to write entry to ledger");
430 }
431
432 // Committable entries get flushed straight away
433 if (committable && fflush(file) != 0)
434 {
435 throw std::logic_error(fmt::format(
436 "Failed to flush entry to ledger: {}",
437 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
438 }
439 }
440
441 positions.push_back(total_len);
442 total_len += size;
443
444 return std::make_pair(get_last_idx(), has_truncated);
445 }
446
447 // Return pair containing entries size and index of last entry included
448 [[nodiscard]] std::pair<size_t, size_t> entries_size(
449 size_t from,
450 size_t to,
451 std::optional<size_t> max_size = std::nullopt) const
452 {
453 if ((from < start_idx) || (to < from) || (to > get_last_idx()))
454 {
455 return {0, 0};
456 }
457
458 size_t size = 0;
459
460 // If max_size is set, return entries that fit within it (best effort).
461 while (true)
462 {
463 auto position_to =
464 (to == get_last_idx()) ? total_len : positions.at(to - start_idx + 1);
465 size = position_to - positions.at(from - start_idx);
466
467 if (!max_size.has_value() || size <= max_size.value())
468 {
469 break;
470 }
471
472 if (from == to)
473 {
474 // Request one entry that is too large: no entries are found
476 "Single ledger entry at {} in file {} is too large for remaining "
477 "space (size {} > max {})",
478 from,
479 file_name,
480 size,
481 max_size.value());
482 return {0, 0};
483 }
484 size_t to_ = from + (to - from) / 2;
486 "Requesting ledger entries from {} to {} in file {} but size {} > "
487 "max size {}: now requesting up to {}",
488 from,
489 to,
490 file_name,
491 size,
492 max_size.value(),
493 to_);
494 to = to_;
495 }
496
497 return {size, to};
498 }
499
500 std::optional<LedgerReadResult> read_entries(
501 size_t from, size_t to, std::optional<size_t> max_size = std::nullopt)
502 {
503 if ((from < start_idx) || (to > get_last_idx()) || (to < from))
504 {
506 "Cannot find entries: {} - {} in ledger file {}",
507 from,
508 to,
509 file_name);
510 return std::nullopt;
511 }
512
514 "Read entries from {} to {} in {} [max size: {}]",
515 from,
516 to,
517 file_name,
518 max_size.value_or(0));
519
520 std::unique_lock<ccf::pal::Mutex> guard(file_lock);
521 auto [size, to_] = entries_size(from, to, max_size);
522 if (size == 0)
523 {
524 return std::nullopt;
525 }
526 std::vector<uint8_t> entries(size);
527 fseeko(file, positions.at(from - start_idx), SEEK_SET);
528
529 if (fread(entries.data(), size, 1, file) != 1)
530 {
531 throw std::logic_error(fmt::format(
532 "Failed to read entry range {} - {} from file {}",
533 from,
534 to,
535 file_name));
536 }
537
538 return LedgerReadResult{entries, to_};
539 }
540
541 bool truncate(size_t idx, bool remove_file_if_empty = true)
542 {
543 if (
544 committed || (idx < start_idx - 1) ||
545 (completed && idx >= get_last_idx()))
546 {
547 return false;
548 }
549
550 if (remove_file_if_empty && idx == start_idx - 1)
551 {
552 // Truncating everything triggers file deletion
553 if (!fs::remove(dir / file_name))
554 {
555 throw std::logic_error(
556 fmt::format("Could not remove file {}", file_name));
557 }
559 "Removed ledger file {} on truncation at {}", file_name, idx);
560 return true;
561 }
562
563 // Reset positions offset header
564 fseeko(file, 0, SEEK_SET);
565 positions_offset_header_t table_offset = 0;
566 if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
567 {
568 throw std::logic_error("Failed to reset positions table offset");
569 }
570
571 completed = false;
572 if (idx != get_last_idx())
573 {
574 total_len = positions.at(idx - start_idx + 1);
575 positions.resize(idx - start_idx + 1);
576 }
577
578 if (fflush(file) != 0)
579 {
580 throw std::logic_error(fmt::format(
581 "Failed to flush ledger file: {}",
582 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
583 }
584
585 if (ftruncate(fileno(file), total_len) != 0)
586 {
587 throw std::logic_error(fmt::format(
588 "Failed to truncate ledger: {}",
589 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
590 }
591
592 fseeko(file, total_len, SEEK_SET);
593 LOG_TRACE_FMT("Truncated ledger file {} at seqno {}", file_name, idx);
594 return false;
595 }
596
597 void complete()
598 {
599 if (completed)
600 {
601 return;
602 }
603 // It may happen (e.g. during recovery) that the incomplete ledger gets
604 // truncated on the primary, so we have to make sure that whenever we
605 // complete the file it doesn't contain anything past the last_idx, which
606 // can happen on the follower unless explicitly truncated before
607 // completion.
608 truncate(get_last_idx(), /* remove_file_if_empty = */ false);
609
610 fseeko(file, total_len, SEEK_SET);
611 size_t table_offset = ftello(file);
612
613 if (
614 fwrite(
615 reinterpret_cast<uint8_t*>(positions.data()),
616 sizeof(positions.at(0)),
617 positions.size(),
618 file) != positions.size())
619 {
620 throw std::logic_error("Failed to write positions table to ledger");
621 }
622
623 // Write positions table offset at start of file
624 if (fseeko(file, 0, SEEK_SET) != 0)
625 {
626 throw std::logic_error("Failed to set file offset to 0");
627 }
628
629 if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
630 {
631 throw std::logic_error("Failed to write positions table to ledger");
632 }
633
634 if (fflush(file) != 0)
635 {
636 throw std::logic_error(fmt::format(
637 "Failed to flush ledger file: {}",
638 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
639 }
640
641 LOG_TRACE_FMT("Completed ledger file {}", file_name);
642
643 completed = true;
644 }
645
646 bool rename(const std::string& new_file_name)
647 {
648 auto file_path = dir / file_name;
649 auto new_file_path = dir / new_file_name;
650
651 try
652 {
653 files::rename(file_path, new_file_path);
654 }
655 catch (const std::exception& e)
656 {
657 // If the file cannot be renamed (e.g. file was removed), report an
658 // error and continue
659 LOG_FAIL_FMT("Error renaming ledger file: {}", e.what());
660 }
661 file_name = new_file_name;
662 return true;
663 }
664
665 void open()
666 {
667 auto new_file_name = remove_recovery_suffix(file_name.c_str());
668 rename(new_file_name);
669 recovery = false;
670 LOG_DEBUG_FMT("Open recovery ledger file {}", new_file_name);
671 }
672
673 bool commit(size_t idx)
674 {
675 if (!completed || committed || (idx != get_last_idx()))
676 {
677 // No effect if commit idx is not last idx
678 return false;
679 }
680
681 if (fflush(file) != 0)
682 {
683 throw std::logic_error(fmt::format(
684 "Failed to flush ledger file: {}",
685 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
686 }
687
688 auto committed_file_name = fmt::format(
689 "{}_{}-{}.{}",
690 file_name_prefix,
691 start_idx,
692 get_last_idx(),
693 ledger_committed_suffix);
694
695 if (recovery)
696 {
697 committed_file_name = fmt::format(
698 "{}.{}", committed_file_name, ledger_recovery_file_suffix);
699 }
700
701 if (!rename(committed_file_name))
702 {
703 return false;
704 }
705
706 committed = true;
707 LOG_DEBUG_FMT("Committed ledger file {}", file_name);
708
709 // Committed recovery files stay in the list of active files until the
710 // ledger is open
711 return !recovery;
712 }
713 };
714
715 class Ledger
716 {
717 private:
718 ringbuffer::WriterPtr to_enclave;
719
720 // Main ledger directory (write and read)
721 const fs::path ledger_dir;
722
723 // Ledger directories (read-only)
724 std::vector<fs::path> read_ledger_dirs;
725
726 // Keep tracks of all ledger files for writing.
727 // Current ledger file is always the last one
728 std::list<std::shared_ptr<LedgerFile>> files;
729
730 // Cache of ledger files for reading
731 size_t max_read_cache_files;
732 std::list<std::shared_ptr<LedgerFile>> files_read_cache;
733 ccf::pal::Mutex read_cache_lock;
734
735 std::atomic<size_t> last_idx = 0;
736 size_t committed_idx = 0;
737
738 size_t end_of_committed_files_idx = 0;
739
740 // Indicates if the ledger has been initialised at a specific idx and
741 // may still be replaying existing entries.
742 bool use_existing_files = false;
743 // Used to remember the last recovered idx on init so that
744 // use_existing_files can be disabled once this idx is passed
745 std::optional<size_t> last_idx_on_init = std::nullopt;
746
747 // Set during recovery to mark files as temporary until the recovery is
748 // complete
749 std::optional<size_t> recovery_start_idx = std::nullopt;
750
751 [[nodiscard]] auto get_it_contains_idx(size_t idx) const
752 {
753 if (idx == 0)
754 {
755 return files.end();
756 }
757
758 auto f = std::upper_bound(
759 files.begin(),
760 files.end(),
761 idx,
762 [](size_t idx, const std::shared_ptr<LedgerFile>& f) {
763 return (idx <= f->get_last_idx());
764 });
765
766 return f;
767 }
768
769 std::shared_ptr<LedgerFile> get_file_from_cache(size_t idx)
770 {
771 if (idx == 0)
772 {
773 return nullptr;
774 }
775
776 {
777 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
778
779 // First, try to find file from read cache
780 for (auto const& f : files_read_cache)
781 {
782 if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
783 {
784 return f;
785 }
786 }
787 }
788
789 // If the file is not in the cache, find the file from the ledger
790 // directories, inspecting the main ledger directory first
791 // Note: reading recovery chunks from main ledger directory is
792 // acceptable and in fact required to complete private recovery.
793 std::string ledger_dir_;
794 auto match = get_file_name_with_idx(ledger_dir, idx, true);
795 if (match.has_value())
796 {
797 ledger_dir_ = ledger_dir;
798 }
799 else
800 {
801 for (auto const& dir : read_ledger_dirs)
802 {
803 match = get_file_name_with_idx(dir, idx, false);
804 if (match.has_value())
805 {
806 ledger_dir_ = dir;
807 break;
808 }
809 }
810 }
811
812 if (!match.has_value())
813 {
814 return nullptr;
815 }
816
817 // Emplace file in the max-sized read cache, replacing the oldest entry if
818 // the read cache is full
819 std::shared_ptr<LedgerFile> match_file = nullptr;
820 try
821 {
822 match_file = std::make_shared<LedgerFile>(
823 ledger_dir_, *match); // NOLINT(bugprone-unchecked-optional-access)
824 }
825 catch (const std::exception& e)
826 {
828 "Could not open ledger file {} to read seqno {}: {}",
829 match.value(),
830 idx,
831 e.what());
832 return nullptr;
833 }
834
835 {
836 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
837
838 files_read_cache.emplace_back(match_file);
839 if (files_read_cache.size() > max_read_cache_files)
840 {
841 files_read_cache.erase(files_read_cache.begin());
842 }
843 }
844
845 return match_file;
846 }
847
848 std::shared_ptr<LedgerFile> get_file_from_idx(
849 size_t idx, bool read_cache_only = false)
850 {
851 if (idx == 0)
852 {
853 return nullptr;
854 }
855
856 if (!read_cache_only)
857 {
858 // First, check if the file is in the list of files open for writing
859 auto f = std::upper_bound(
860 files.rbegin(),
861 files.rend(),
862 idx,
863 [](size_t idx, const std::shared_ptr<LedgerFile>& f) {
864 return idx >= f->get_start_idx();
865 });
866
867 if (f != files.rend())
868 {
869 return *f;
870 }
871 }
872
873 // Otherwise, return file from read cache
874 return get_file_from_cache(idx);
875 }
876
877 [[nodiscard]] std::shared_ptr<LedgerFile> get_latest_file(
878 bool incomplete_only = true) const
879 {
880 if (files.empty())
881 {
882 return nullptr;
883 }
884 const auto& last_file = files.back();
885 if (incomplete_only && last_file->is_complete())
886 {
887 return nullptr;
888 }
889
890 return last_file;
891 }
892
893 std::optional<LedgerReadResult> read_entries_range(
894 size_t from,
895 size_t to,
896 bool read_cache_only = false,
897 std::optional<size_t> max_entries_size = std::nullopt)
898 {
899 // Note: if max_entries_size is set, this returns contiguous ledger
900 // entries on a best effort basis, so that the returned entries fit in
901 // max_entries_size but without maximising the number of entries returned.
902 if ((from <= 0) || (to < from))
903 {
904 return std::nullopt;
905 }
906
907 // During recovery or other low-knowledge batch operations, we might
908 // request entries past the end of the ledger - truncate to the true end
909 // here.
910 if (to > last_idx)
911 {
912 to = last_idx;
913 }
914
916 rr.end_idx = to;
917
918 size_t idx = from;
919 while (idx <= to)
920 {
921 auto f_from = get_file_from_idx(idx, read_cache_only);
922 if (f_from == nullptr)
923 {
924 LOG_FAIL_FMT("Cannot find ledger file for seqno {}", idx);
925 return std::nullopt;
926 }
927 auto to_ = std::min(f_from->get_last_idx(), to);
928 std::optional<size_t> max_size = std::nullopt;
929 if (max_entries_size.has_value())
930 {
931 max_size = max_entries_size.value() - rr.data.size();
932 }
933 auto v = f_from->read_entries(idx, to_, max_size);
934 if (!v.has_value())
935 {
936 break;
937 }
938 rr.end_idx = v->end_idx;
939 rr.data.insert(
940 rr.data.end(),
941 std::make_move_iterator(v->data.begin()),
942 std::make_move_iterator(v->data.end()));
943 if (v->end_idx != to_)
944 {
945 // If all the entries requested from a file are not returned (i.e.
946 // because the requested entries are larger than max_entries_size),
947 // return immediately to avoid returning non-contiguous entries from a
948 // subsequent ledger file.
949 break;
950 }
951 idx = to_ + 1;
952 }
953
954 if (!rr.data.empty())
955 {
956 return rr;
957 }
958
959 return std::nullopt;
960 }
961
962 void ignore_ledger_file(const std::string& file_name)
963 {
964 if (is_ledger_file_name_ignored(file_name))
965 {
966 return;
967 }
968
969 auto ignored_file_name =
970 fmt::format("{}.{}", file_name, ledger_ignored_file_suffix);
971 files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name);
972 }
973
974 void delete_ledger_files_after_idx(size_t idx)
975 {
976 // Use with caution! Delete all ledger files later than idx
977 for (auto const& f : fs::directory_iterator(ledger_dir))
978 {
979 auto file_name = f.path().filename();
980 auto start_idx = get_start_idx_from_file_name(file_name);
981 if (start_idx > idx)
982 {
983 if (!fs::remove(ledger_dir / file_name))
984 {
985 throw std::logic_error(
986 fmt::format("Could not remove file {}", file_name));
987 }
989 "Forcing removal of ledger file {} as start idx {} > {}",
990 file_name,
991 start_idx,
992 idx);
993 }
994 }
995 }
996
997 std::shared_ptr<LedgerFile> get_existing_ledger_file_for_idx(size_t idx)
998 {
999 if (!use_existing_files)
1000 {
1001 return nullptr;
1002 }
1003
1004 for (auto const& f : fs::directory_iterator(ledger_dir))
1005 {
1006 auto file_name = f.path().filename();
1007 if (
1008 idx == get_start_idx_from_file_name(file_name) &&
1009 !is_ledger_file_ignored(file_name))
1010 {
1011 return std::make_shared<LedgerFile>(
1012 ledger_dir, file_name, true /* from_existing_file */);
1013
1014 break;
1015 }
1016 }
1017
1018 return nullptr;
1019 }
1020
1021 public:
1023 const fs::path& ledger_dir,
1024 ringbuffer::AbstractWriterFactory& writer_factory,
1025 size_t max_read_cache_files = ledger_max_read_cache_files_default,
1026 const std::vector<std::string>& read_ledger_dirs_ = {}) :
1027 to_enclave(writer_factory.create_writer_to_inside()),
1028 ledger_dir(ledger_dir),
1029 max_read_cache_files(max_read_cache_files)
1030 {
1031 // Recover last idx from read-only ledger directories
1032 for (const auto& read_dir : read_ledger_dirs_)
1033 {
1034 LOG_INFO_FMT("Recovering read-only ledger directory \"{}\"", read_dir);
1035 if (!fs::is_directory(read_dir))
1036 {
1037 throw std::logic_error(
1038 fmt::format("{} read-only ledger is not a directory", read_dir));
1039 }
1040
1041 read_ledger_dirs.emplace_back(read_dir);
1042
1043 for (auto const& f : fs::directory_iterator(read_dir))
1044 {
1045 auto file_name = f.path().filename();
1046 auto last_idx_ = get_last_idx_from_file_name(file_name);
1047 if (
1048 !last_idx_.has_value() ||
1049 !is_ledger_file_name_committed(file_name) ||
1050 is_ledger_file_name_ignored(file_name))
1051 {
1053 "Read-only ledger file {} is ignored as not committed",
1054 file_name);
1055 continue;
1056 }
1057
1058 if (last_idx_.value() > last_idx)
1059 {
1060 last_idx = last_idx_.value();
1061 committed_idx = last_idx;
1062 end_of_committed_files_idx = last_idx;
1063 }
1064
1066 "Recovering file from read-only ledger directory: {}", file_name);
1067 }
1068 }
1069
1070 if (last_idx > 0)
1071 {
1073 "Recovered read-only ledger directories up to {}, committed up to "
1074 "{} ",
1075 last_idx,
1076 committed_idx);
1077 }
1078
1079 if (fs::is_directory(ledger_dir))
1080 {
1081 // If the ledger directory exists, populate this->files with the
1082 // writeable files from it. These must have no suffix, and must not
1083 // end-before the current committed_idx found from the read-only
1084 // directories
1085 LOG_INFO_FMT("Recovering main ledger directory {}", ledger_dir);
1086
1087 for (auto const& f : fs::directory_iterator(ledger_dir))
1088 {
1089 auto file_name = f.path().filename();
1090
1091 if (is_ledger_file_ignored(file_name))
1092 {
1094 "Ignoring ledger file {} in main ledger directory", file_name);
1095
1096 ignore_ledger_file(file_name);
1097
1098 continue;
1099 }
1100
1101 const auto file_end_idx = get_last_idx_from_file_name(file_name);
1102
1103 if (is_ledger_file_name_committed(file_name))
1104 {
1105 if (!file_end_idx.has_value())
1106 {
1108 "Unexpected file {} in {}: committed but not completed",
1109 file_name,
1110 ledger_dir);
1111 }
1112 else
1113 {
1114 if (file_end_idx.value() > committed_idx)
1115 {
1116 committed_idx = file_end_idx.value();
1117 end_of_committed_files_idx = file_end_idx.value();
1118 }
1119 }
1120
1121 continue;
1122 }
1123
1124 if (file_end_idx.has_value() && file_end_idx.value() <= committed_idx)
1125 {
1127 "Ignoring ledger file {} in main ledger directory - already "
1128 "discovered commit up to {} from read-only directories",
1129 file_name,
1130 committed_idx);
1131
1132 ignore_ledger_file(file_name);
1133
1134 continue;
1135 }
1136
1137 std::shared_ptr<LedgerFile> ledger_file = nullptr;
1138 try
1139 {
1140 ledger_file = std::make_shared<LedgerFile>(ledger_dir, file_name);
1141
1142 // Truncate file to latest recovered index to cleanup entries that
1143 // may have been corrupted (no-op if file isn't corrupted)
1144 if (ledger_file->truncate(ledger_file->get_last_idx()))
1145 {
1146 // If truncation of corrupted entries removes file, file is not
1147 // recovered
1148 LOG_FAIL_FMT("Removed ledger file {}", file_name);
1149 continue;
1150 }
1151 }
1152 catch (const std::exception& e)
1153 {
1155 "Error reading ledger file {}: {}", file_name, e.what());
1156 // Ignore file if it cannot be recovered.
1157 ignore_ledger_file(file_name);
1158 continue;
1159 }
1160
1162 "Recovering file from main ledger directory: {}", file_name);
1163 files.emplace_back(std::move(ledger_file));
1164 }
1165
1166 if (files.empty())
1167 {
1169 "Main ledger directory {} is empty: no ledger file to "
1170 "recover",
1171 ledger_dir);
1172
1173 // If we had any uncommitted files, we wouldn't be in this path and
1174 // we'd populate last_idx below. In this branch, we need to ensure
1175 // last_idx is correctly initialised. Since there are no uncommitted
1176 // files, it must match the last committed_idx we've discovered.
1177 last_idx = committed_idx;
1178 return;
1179 }
1180
1182 "Main ledger directory {} contains {} restored (writeable) files",
1183 ledger_dir,
1184 files.size());
1185
1186 files.sort([](
1187 const std::shared_ptr<LedgerFile>& a,
1188 const std::shared_ptr<LedgerFile>& b) {
1189 return a->get_last_idx() < b->get_last_idx();
1190 });
1191
1192 const auto main_ledger_dir_last_idx =
1193 get_latest_file(false)->get_last_idx();
1194 if (main_ledger_dir_last_idx > last_idx)
1195 {
1196 last_idx = main_ledger_dir_last_idx;
1197 }
1198 }
1199 else
1200 {
1201 if (!fs::create_directory(ledger_dir))
1202 {
1203 throw std::logic_error(fmt::format(
1204 "Error: Could not create ledger directory: {}", ledger_dir));
1205 }
1206 }
1207
1209 "Recovered ledger entries up to {}, committed to {}",
1210 last_idx,
1211 committed_idx);
1212 }
1213
1214 Ledger(const Ledger& that) = delete;
1215
1216 void init(size_t idx, size_t recovery_start_idx_ = 0)
1217 {
1218 TimeBoundLogger log_if_slow(
1219 fmt::format("Initing ledger - seqno={}", idx));
1220
1221 // Used by backup nodes to initialise the ledger when starting from a
1222 // non-empty state, i.e. snapshot. It is assumed that idx is included in a
1223 // committed ledger file.
1224
1225 // To restart from a snapshot cleanly, in the main ledger directory,
1226 // mark all subsequent ledger as non-committed as their contents will be
1227 // replayed.
1228 for (auto const& f : fs::directory_iterator(ledger_dir))
1229 {
1230 auto file_name = f.path().filename();
1231 if (
1232 is_ledger_file_name_committed(file_name) &&
1233 (get_start_idx_from_file_name(file_name) > idx))
1234 {
1235 auto last_idx_file = get_last_idx_from_file_name(file_name);
1236 if (!last_idx_file.has_value())
1237 {
1238 throw std::logic_error(fmt::format(
1239 "Committed ledger file {} does not include last idx in file name",
1240 file_name));
1241 }
1242
1244 "Remove committed suffix from ledger file {} after init at {}: "
1245 "last_idx {}",
1246 file_name,
1247 idx,
1248 last_idx_file.value());
1249
1250 files::rename(
1251 ledger_dir / file_name,
1252 ledger_dir /
1253 remove_suffix(
1254 file_name.string(),
1255 fmt::format(
1256 "{}{}.{}",
1257 ledger_last_idx_delimiter,
1258 last_idx_file.value(),
1259 ledger_committed_suffix)));
1260 }
1261 }
1262
1263 // Close all open write files as the the ledger should
1264 // restart cleanly, from a new chunk.
1265 files.clear();
1266
1267 use_existing_files = true;
1268 last_idx_on_init = last_idx;
1269 last_idx = idx;
1270 committed_idx = idx;
1271 if (recovery_start_idx_ > 0)
1272 {
1273 // Do not set recovery idx and create recovery chunks
1274 // if the ledger is initialised from 0 (i.e. genesis)
1275 recovery_start_idx = recovery_start_idx_;
1276 }
1277
1279 "Set last known/commit seqno to {}, recovery seqno to {}",
1280 idx,
1281 recovery_start_idx_);
1282 }
1283
1285 {
1286 // When the recovery is completed (i.e. service is open), temporary
1287 // recovery ledger chunks are renamed as they can now be recovered.
1288 // Note: this operation cannot be rolled back.
1289 LOG_INFO_FMT("Ledger complete recovery");
1290
1291 for (auto it = files.begin(); it != files.end();)
1292 {
1293 auto& f = *it;
1294 if (f->is_recovery())
1295 {
1296 f->open();
1297
1298 // Recovery files are kept in the list of active files when committed
1299 // so that they can be renamed in a stable order when the service is
1300 // open. Once this is done, they can be removed from the list of
1301 // active files.
1302 if (f->is_committed())
1303 {
1304 it = files.erase(it);
1305 continue;
1306 }
1307 }
1308 ++it;
1309 }
1310
1311 recovery_start_idx.reset();
1312 }
1313
1314 [[nodiscard]] size_t get_last_idx() const
1315 {
1316 return last_idx;
1317 }
1318
1319 void set_recovery_start_idx(size_t idx)
1320 {
1321 recovery_start_idx = idx;
1322 }
1323
1324 std::optional<LedgerReadResult> read_entry(size_t idx)
1325 {
1326 TimeBoundLogger log_if_slow(
1327 fmt::format("Reading ledger entry at {}", idx));
1328
1329 return read_entries_range(idx, idx);
1330 }
1331
1332 std::optional<LedgerReadResult> read_entries(
1333 size_t from,
1334 size_t to,
1335 std::optional<size_t> max_entries_size = std::nullopt)
1336 {
1337 TimeBoundLogger log_if_slow(
1338 fmt::format("Reading ledger entries from {} to {}", from, to));
1339
1340 return read_entries_range(from, to, false, max_entries_size);
1341 }
1342
1343 size_t write_entry(const uint8_t* data, size_t size, bool committable)
1344 {
1345 TimeBoundLogger log_if_slow(fmt::format(
1346 "Writing ledger entry - {} bytes, committable={}", size, committable));
1347
1348 auto header =
1349 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1350
1351 if ((header.flags & ccf::kv::EntryFlags::FORCE_LEDGER_CHUNK_BEFORE) != 0)
1352 {
1354 "Forcing ledger chunk before entry as required by the entry header "
1355 "flags");
1356
1357 auto file = get_latest_file();
1358 if (file != nullptr)
1359 {
1360 file->complete();
1361 LOG_DEBUG_FMT("Ledger chunk completed at {}", file->get_last_idx());
1362 }
1363 }
1364
1365 bool force_chunk_after =
1367 if (force_chunk_after)
1368 {
1369 if (!committable)
1370 {
1371 throw std::logic_error(
1372 "Ledger chunks cannot end in a non-committable transaction");
1373 }
1375 "Forcing ledger chunk after entry as required by the entry header "
1376 "flags");
1377 }
1378
1379 auto file = get_latest_file();
1380 if (file == nullptr)
1381 {
1382 // If no file is currently open for writing, create a new one
1383 size_t start_idx = last_idx + 1;
1384 if (use_existing_files)
1385 {
1386 // When recovering files from persistence, try to find one on disk
1387 // first
1388 file = get_existing_ledger_file_for_idx(start_idx);
1389 }
1390 if (file == nullptr)
1391 {
1392 bool is_recovery = recovery_start_idx.has_value() &&
1393 start_idx > recovery_start_idx.value();
1394 file =
1395 std::make_shared<LedgerFile>(ledger_dir, start_idx, is_recovery);
1396 }
1397 files.emplace_back(file);
1398 }
1399 auto [last_idx_, has_truncated] =
1400 file->write_entry(data, size, committable);
1401 last_idx = last_idx_;
1402
1403 if (has_truncated)
1404 {
1405 // If a divergence was detected when writing the entry, delete all
1406 // further ledger files to cleanly continue
1407 LOG_INFO_FMT("Found divergent ledger entry at {}", last_idx);
1408 delete_ledger_files_after_idx(last_idx);
1409 use_existing_files = false;
1410 }
1411
1412 if (
1413 use_existing_files && last_idx_on_init.has_value() &&
1414 last_idx > last_idx_on_init.value())
1415 {
1416 use_existing_files = false;
1417 }
1418
1420 "Wrote entry at {} [committable: {}, force chunk after: {}]",
1421 last_idx,
1422 committable,
1423 force_chunk_after);
1424
1425 if (committable && force_chunk_after)
1426 {
1427 file->complete();
1428 LOG_DEBUG_FMT("Ledger chunk completed at {}", last_idx);
1429 }
1430
1431 return last_idx;
1432 }
1433
1434 void truncate(size_t idx)
1435 {
1436 TimeBoundLogger log_if_slow(fmt::format("Truncating ledger at {}", idx));
1437
1438 LOG_DEBUG_FMT("Ledger truncate: {}/{}", idx, last_idx);
1439
1440 // Conservative check to avoid truncating to future indices, or dropping
1441 // committed entries. If the ledger is being initialised from a snapshot
1442 // alone, the first truncation effectively sets the last index.
1443 if (last_idx != 0 && (idx >= last_idx || idx < committed_idx))
1444 {
1446 "Ignoring truncate to {} - last_idx: {}, committed_idx: {}",
1447 idx,
1448 last_idx,
1449 committed_idx);
1450 return;
1451 }
1452
1453 auto f_from = get_it_contains_idx(idx + 1);
1454 auto f_to = get_it_contains_idx(last_idx);
1455 auto f_end = std::next(f_to);
1456
1457 for (auto it = f_from; it != f_end;)
1458 {
1459 // Truncate the first file to the truncation index while the more
1460 // recent files are deleted entirely
1461 auto truncate_idx = (it == f_from) ? idx : (*it)->get_start_idx() - 1;
1462 if ((*it)->truncate(truncate_idx))
1463 {
1464 it = files.erase(it);
1465 }
1466 else
1467 {
1468 it++;
1469 }
1470 }
1471
1472 last_idx = idx;
1473 }
1474
1475 void commit(size_t idx)
1476 {
1477 TimeBoundLogger log_if_slow(
1478 fmt::format("Committing ledger entry {}", idx));
1479
1480 LOG_DEBUG_FMT("Ledger commit: {}/{}", idx, last_idx);
1481
1482 if (idx <= committed_idx || idx > last_idx)
1483 {
1484 return;
1485 }
1486
1487 auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
1488 get_it_contains_idx(committed_idx);
1489 auto f_to = get_it_contains_idx(idx);
1490 auto f_end = std::next(f_to);
1491
1492 for (auto it = f_from; it != f_end;)
1493 {
1494 // Commit all previous file to their latest index while the latest
1495 // file is committed to the committed index
1496 const auto last_idx_in_file = (*it)->get_last_idx();
1497 auto commit_idx = (it == f_to) ? idx : last_idx_in_file;
1498 if (
1499 (*it)->commit(commit_idx) &&
1500 (it != f_to || (idx == last_idx_in_file)))
1501 {
1502 end_of_committed_files_idx = last_idx_in_file;
1503 it = files.erase(it);
1504 }
1505 else
1506 {
1507 it++;
1508 }
1509 }
1510
1511 committed_idx = idx;
1512 }
1513
1514 [[nodiscard]] bool is_in_committed_file(size_t idx) const
1515 {
1516 return idx <= end_of_committed_files_idx;
1517 }
1518
1520 {
1521 // Filled on construction
1523 size_t from_idx{};
1524 size_t to_idx{};
1525 size_t max_size{};
1526
1527 // First argument is ledger entries (or nullopt if not found)
1528 // Second argument is uv status code, which may indicate a cancellation
1530 std::function<void(std::optional<LedgerReadResult>&&, int)>;
1532
1533 // Final result
1534 std::optional<LedgerReadResult> read_result = std::nullopt;
1535 };
1536
1537 static void on_ledger_get_async(uv_work_t* req)
1538 {
1539 auto* data = static_cast<AsyncLedgerGet*>(req->data);
1540
1541 data->read_result = data->ledger->read_entries_range(
1542 data->from_idx, data->to_idx, true, data->max_size);
1543 }
1544
1545 static void on_ledger_get_async_complete(uv_work_t* req, int status)
1546 {
1547 auto* data = static_cast<AsyncLedgerGet*>(req->data);
1548
1549 data->result_cb(std::move(data->read_result), status);
1550
1551 delete data; // NOLINT(cppcoreguidelines-owning-memory)
1552 delete req; // NOLINT(cppcoreguidelines-owning-memory)
1553 }
1554
1556 size_t from_idx,
1557 size_t to_idx,
1558 std::optional<LedgerReadResult>&& read_result,
1560 {
1561 if (read_result.has_value())
1562 {
1564 ::consensus::ledger_entry_range,
1565 to_enclave,
1566 from_idx,
1567 read_result->end_idx,
1568 purpose,
1569 read_result->data);
1570 }
1571 else
1572 {
1574 ::consensus::ledger_no_entry_range,
1575 to_enclave,
1576 from_idx,
1577 to_idx,
1578 purpose);
1579 }
1580 }
1581
1584 {
1586 disp,
1587 ::consensus::ledger_init,
1588 [this](const uint8_t* data, size_t size) {
1589 auto idx = serialized::read<::consensus::Index>(data, size);
1590 auto recovery_start_index =
1591 serialized::read<::consensus::Index>(data, size);
1592 init(idx, recovery_start_index);
1593 });
1594
1596 disp,
1597 ::consensus::ledger_append,
1598 [this](const uint8_t* data, size_t size) {
1599 auto committable = serialized::read<bool>(data, size);
1600 write_entry(data, size, committable);
1601 });
1602
1604 disp,
1605 ::consensus::ledger_truncate,
1606 [this](const uint8_t* data, size_t size) {
1607 auto idx = serialized::read<::consensus::Index>(data, size);
1608 auto recovery_mode = serialized::read<bool>(data, size);
1609 truncate(idx);
1610 if (recovery_mode)
1611 {
1613 }
1614 });
1615
1617 disp,
1618 ::consensus::ledger_commit,
1619 [this](const uint8_t* data, size_t size) {
1620 auto idx = serialized::read<::consensus::Index>(data, size);
1621 commit(idx);
1622 });
1623
1625 disp, ::consensus::ledger_open, [this](const uint8_t*, size_t) {
1627 });
1628
1630 disp,
1631 ::consensus::ledger_get_range,
1632 [&](const uint8_t* data, size_t size) {
1633 auto [from_idx, to_idx, purpose] =
1634 ringbuffer::read_message<::consensus::ledger_get_range>(data, size);
1635
1636 // Ledger entries response has metadata so cap total entries size
1637 // accordingly
1638 constexpr size_t write_ledger_range_response_metadata_size = 2048;
1639 auto max_entries_size = to_enclave->get_max_message_size() -
1640 write_ledger_range_response_metadata_size;
1641
1642 if (is_in_committed_file(to_idx))
1643 {
1644 // Start an asynchronous job to do this, since it is committed and
1645 // can be accessed independently (and in parallel)
1646 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
1647 auto* work_handle = new uv_work_t;
1648
1649 {
1650 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
1651 auto* job = new AsyncLedgerGet;
1652 job->ledger = this;
1653 job->from_idx = from_idx;
1654 job->to_idx = to_idx;
1655 job->max_size = max_entries_size;
1656 job->result_cb = [this,
1657 from_idx_ = from_idx,
1658 to_idx_ = to_idx,
1659 purpose_ =
1660 purpose](auto&& read_result, int /*status*/) {
1661 // NB: Even if status is cancelled (and entry is empty), we
1662 // want to write this result back to the enclave
1664 from_idx_,
1665 to_idx_,
1666 std::forward<decltype(read_result)>(read_result),
1667 purpose_);
1668 };
1669
1670 work_handle->data = job;
1671 }
1672
1673 uv_queue_work(
1674 uv_default_loop(),
1675 work_handle,
1678 }
1679 else
1680 {
1681 // Read synchronously, since this accesses uncommitted state and
1682 // must accurately reflect changing files
1684 from_idx,
1685 to_idx,
1686 read_entries(from_idx, to_idx, max_entries_size),
1687 purpose);
1688 }
1689 });
1690 }
1691 };
1692}
Definition ledger.h:148
std::pair< size_t, bool > write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:397
LedgerFile(const std::string &dir, const std::string &file_name_, bool from_existing_file_=false)
Definition ledger.h:215
LedgerFile(const fs::path &dir, size_t start_idx, bool recovery=false)
Definition ledger.h:178
bool truncate(size_t idx, bool remove_file_if_empty=true)
Definition ledger.h:541
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt)
Definition ledger.h:500
~LedgerFile()
Definition ledger.h:356
bool commit(size_t idx)
Definition ledger.h:673
bool is_committed() const
Definition ledger.h:380
void complete()
Definition ledger.h:597
bool is_recovery() const
Definition ledger.h:390
std::pair< size_t, size_t > entries_size(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt) const
Definition ledger.h:448
bool rename(const std::string &new_file_name)
Definition ledger.h:646
bool is_complete() const
Definition ledger.h:385
void open()
Definition ledger.h:665
size_t get_last_idx() const
Definition ledger.h:370
size_t get_current_size() const
Definition ledger.h:375
size_t get_start_idx() const
Definition ledger.h:365
Definition ledger.h:716
std::optional< LedgerReadResult > read_entry(size_t idx)
Definition ledger.h:1324
Ledger(const fs::path &ledger_dir, ringbuffer::AbstractWriterFactory &writer_factory, size_t max_read_cache_files=ledger_max_read_cache_files_default, const std::vector< std::string > &read_ledger_dirs_={})
Definition ledger.h:1022
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1332
size_t write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:1343
void truncate(size_t idx)
Definition ledger.h:1434
static void on_ledger_get_async_complete(uv_work_t *req, int status)
Definition ledger.h:1545
void complete_recovery()
Definition ledger.h:1284
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition ledger.h:1582
static void on_ledger_get_async(uv_work_t *req)
Definition ledger.h:1537
Ledger(const Ledger &that)=delete
void write_ledger_get_range_response(size_t from_idx, size_t to_idx, std::optional< LedgerReadResult > &&read_result, ::consensus::LedgerRequestPurpose purpose)
Definition ledger.h:1555
void set_recovery_start_idx(size_t idx)
Definition ledger.h:1319
void init(size_t idx, size_t recovery_start_idx_=0)
Definition ledger.h:1216
size_t get_last_idx() const
Definition ledger.h:1314
bool is_in_committed_file(size_t idx) const
Definition ledger.h:1514
void commit(size_t idx)
Definition ledger.h:1475
Definition messaging.h:38
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition after_io.h:8
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::mutex Mutex
Definition locking.h:12
LedgerRequestPurpose
Definition ledger_enclave_types.h:14
Definition files.h:20
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition ledger.h:142
std::vector< uint8_t > data
Definition ledger.h:143
size_t end_idx
Definition ledger.h:144
Definition ledger.h:1520
std::optional< LedgerReadResult > read_result
Definition ledger.h:1534
Ledger * ledger
Definition ledger.h:1522
size_t max_size
Definition ledger.h:1525
ResultCallback result_cb
Definition ledger.h:1531
size_t from_idx
Definition ledger.h:1523
size_t to_idx
Definition ledger.h:1524
std::function< void(std::optional< LedgerReadResult > &&, int)> ResultCallback
Definition ledger.h:1530
Definition time_bound_logger.h:14
Definition serialised_entry_format.h:21
uint64_t size
Definition serialised_entry_format.h:28