27namespace fs = std::filesystem;
31 static constexpr size_t ledger_max_read_cache_files_default = 5;
33 static constexpr auto ledger_committed_suffix =
"committed";
34 static constexpr auto ledger_start_idx_delimiter =
"_";
35 static constexpr auto ledger_last_idx_delimiter =
"-";
36 static constexpr auto ledger_recovery_file_suffix =
"recovery";
37 static constexpr auto ledger_ignored_file_suffix =
"ignored";
39 static inline size_t get_start_idx_from_file_name(
40 const std::string& file_name)
42 auto pos = file_name.find(ledger_start_idx_delimiter);
43 if (pos == std::string::npos)
45 throw std::logic_error(fmt::format(
46 "Ledger file name {} does not contain a start seqno", file_name));
49 return std::stol(file_name.substr(pos + 1));
52 static inline std::optional<size_t> get_last_idx_from_file_name(
53 const std::string& file_name)
55 auto pos = file_name.find(ledger_last_idx_delimiter);
56 if (pos == std::string::npos)
62 return std::stol(file_name.substr(pos + 1));
65 static inline bool is_ledger_file_name_committed(
const std::string& file_name)
67 return file_name.ends_with(ledger_committed_suffix);
70 static inline bool is_ledger_file_name_recovery(
const std::string& file_name)
72 return file_name.ends_with(ledger_recovery_file_suffix);
75 static inline bool is_ledger_file_name_ignored(
const std::string& file_name)
77 return file_name.ends_with(ledger_ignored_file_suffix);
80 static inline bool is_ledger_file_ignored(
const std::string& file_name)
83 return is_ledger_file_name_recovery(file_name) ||
84 is_ledger_file_name_ignored(file_name);
87 static inline fs::path remove_suffix(
88 std::string_view file_name,
const std::string& suffix)
90 if (file_name.ends_with(suffix))
92 file_name.remove_suffix(suffix.size());
97 static inline fs::path remove_recovery_suffix(std::string_view file_name)
100 file_name, fmt::format(
".{}", ledger_recovery_file_suffix));
103 static std::optional<std::string> get_file_name_with_idx(
104 const std::string& dir,
size_t idx,
bool allow_recovery_files)
106 std::optional<std::string> match = std::nullopt;
107 for (
auto const& f : fs::directory_iterator(dir))
111 auto f_name = f.path().filename();
113 is_ledger_file_name_ignored(f_name) ||
114 (!allow_recovery_files && is_ledger_file_name_recovery(f_name)))
119 size_t start_idx = 0;
120 std::optional<size_t> last_idx = std::nullopt;
123 start_idx = get_start_idx_from_file_name(f_name);
124 last_idx = get_last_idx_from_file_name(f_name);
126 catch (
const std::exception& e)
131 if (idx >= start_idx && last_idx.has_value() && idx <= last_idx.value())
150 using positions_offset_header_t = size_t;
151 static constexpr auto file_name_prefix =
"ledger";
158 FILE* file =
nullptr;
161 size_t start_idx = 1;
162 size_t total_len = 0;
163 std::vector<uint32_t> positions;
165 bool completed =
false;
166 bool committed =
false;
168 bool recovery =
false;
174 bool from_existing_file =
false;
178 LedgerFile(
const fs::path& dir,
size_t start_idx,
bool recovery =
false) :
180 file_name(fmt::format(
"{}_{}", file_name_prefix, start_idx)),
181 start_idx(start_idx),
187 fmt::format(
"{}.{}", file_name.string(), ledger_recovery_file_suffix);
190 auto file_path = dir / file_name;
191 if (fs::exists(file_path))
193 throw std::logic_error(fmt::format(
194 "Cannot create new ledger file {} in main ledger directory {} as it "
199 file = fopen(file_path.c_str(),
"w+b");
202 throw std::logic_error(fmt::format(
203 "Unable to open ledger file {}: {}", file_path, strerror(errno)));
207 fseeko(file,
sizeof(positions_offset_header_t), SEEK_SET);
208 total_len =
sizeof(positions_offset_header_t);
213 const std::string& dir,
214 const std::string& file_name_,
215 bool from_existing_file_ =
false) :
217 file_name(file_name_),
219 from_existing_file(from_existing_file_)
221 auto file_path = (fs::path(dir) / fs::path(file_name));
223 committed = is_ledger_file_name_committed(file_name);
224 start_idx = get_start_idx_from_file_name(file_name);
226 const auto mode = committed ?
"rb" :
"r+b";
228 file = fopen(file_path.c_str(), mode);
232 throw std::logic_error(fmt::format(
233 "Unable to open ledger file {}: {}", file_path, strerror(errno)));
237 fseeko(file, 0, SEEK_END);
238 size_t total_file_size = ftello(file);
241 fseeko(file, 0, SEEK_SET);
242 positions_offset_header_t table_offset = 0;
243 if (fread(&table_offset,
sizeof(positions_offset_header_t), 1, file) != 1)
245 throw std::logic_error(fmt::format(
246 "Failed to read positions offset from ledger file {}", file_path));
249 if (committed && table_offset == 0)
251 throw std::logic_error(fmt::format(
252 "Committed ledger file {} cannot be read: invalid table offset (0)",
256 total_len =
sizeof(positions_offset_header_t);
258 if (from_existing_file)
266 if (table_offset != 0)
269 total_len = table_offset;
270 fseeko(file, table_offset, SEEK_SET);
272 if (table_offset > total_file_size)
274 throw std::logic_error(fmt::format(
275 "Invalid table offset {} greater than total file size {}",
281 (total_file_size - table_offset) /
sizeof(positions.at(0)));
286 sizeof(positions.at(0)),
288 file) != positions.size())
290 throw std::logic_error(fmt::format(
291 "Failed to read positions table from ledger file {}", file_path));
299 total_len =
sizeof(positions_offset_header_t);
300 auto len = total_file_size - total_len;
303 size_t current_idx = start_idx;
304 while (len >= ccf::kv::serialised_entry_header_size)
308 &entry_header, ccf::kv::serialised_entry_header_size, 1, file) !=
312 "Failed to read entry header from ledger file {} at seqno {}",
318 len -= ccf::kv::serialised_entry_header_size;
320 const auto& entry_size = entry_header.
size;
321 if (len < entry_size)
324 "Malformed incomplete ledger file {} at seqno {} (expecting "
335 fseeko(file, entry_size, SEEK_CUR);
339 "Recovered one entry of size {} at seqno {}",
344 positions.push_back(total_len);
345 total_len += (ccf::kv::serialised_entry_header_size + entry_size);
366 return start_idx + positions.size() - 1;
392 const uint8_t* data,
size_t size,
bool committable)
394 fseeko(file, total_len, SEEK_SET);
396 bool should_write =
true;
397 bool has_truncated =
false;
398 if (from_existing_file)
400 std::vector<uint8_t> entry(size);
402 fread(entry.data(), size, 1, file) != 1 ||
403 memcmp(entry.data(), data, size) != 0)
410 has_truncated =
true;
411 from_existing_file =
false;
415 should_write =
false;
421 if (fwrite(data, size, 1, file) != 1)
423 throw std::logic_error(
"Failed to write entry to ledger");
427 if (committable && fflush(file) != 0)
429 throw std::logic_error(fmt::format(
430 "Failed to flush entry to ledger: {}", strerror(errno)));
434 positions.push_back(total_len);
444 std::optional<size_t> max_size = std::nullopt)
const
446 if ((from < start_idx) || (to < from) || (to >
get_last_idx()))
457 (to ==
get_last_idx()) ? total_len : positions.at(to - start_idx + 1);
458 size = position_to - positions.at(from - start_idx);
460 if (!max_size.has_value() || size <= max_size.value())
470 "Single ledger entry at {} in file {} is too large for remaining "
471 "space (size {} > max {})",
478 size_t to_ = from + (to - from) / 2;
480 "Requesting ledger entries from {} to {} in file {} but size {} > "
481 "max size {}: now requesting up to {}",
496 size_t from,
size_t to, std::optional<size_t> max_size = std::nullopt)
498 if ((from < start_idx) || (to >
get_last_idx()) || (to < from))
501 "Cannot find entries: {} - {} in ledger file {}",
509 "Read entries from {} to {} in {} [max size: {}]",
513 max_size.value_or(0));
515 std::unique_lock<ccf::pal::Mutex> guard(file_lock);
521 std::vector<uint8_t> entries(size);
522 fseeko(file, positions.at(from - start_idx), SEEK_SET);
524 if (fread(entries.data(), size, 1, file) != 1)
526 throw std::logic_error(fmt::format(
527 "Failed to read entry range {} - {} from file {}",
536 bool truncate(
size_t idx,
bool remove_file_if_empty =
true)
539 committed || (idx < start_idx - 1) ||
545 if (remove_file_if_empty && idx == start_idx - 1)
548 if (!fs::remove(dir / file_name))
550 throw std::logic_error(
551 fmt::format(
"Could not remove file {}", file_name));
554 "Removed ledger file {} on truncation at {}", file_name, idx);
559 fseeko(file, 0, SEEK_SET);
560 positions_offset_header_t table_offset = 0;
561 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
563 throw std::logic_error(
"Failed to reset positions table offset");
569 total_len = positions.at(idx - start_idx + 1);
570 positions.resize(idx - start_idx + 1);
573 if (fflush(file) != 0)
575 throw std::logic_error(
576 fmt::format(
"Failed to flush ledger file: {}", strerror(errno)));
579 if (ftruncate(fileno(file), total_len))
581 throw std::logic_error(
582 fmt::format(
"Failed to truncate ledger: {}", strerror(errno)));
585 fseeko(file, total_len, SEEK_SET);
586 LOG_TRACE_FMT(
"Truncated ledger file {} at seqno {}", file_name, idx);
603 fseeko(file, total_len, SEEK_SET);
604 size_t table_offset = ftello(file);
608 reinterpret_cast<uint8_t*
>(positions.data()),
609 sizeof(positions.at(0)),
611 file) != positions.size())
613 throw std::logic_error(
"Failed to write positions table to ledger");
617 if (fseeko(file, 0, SEEK_SET) != 0)
619 throw std::logic_error(
"Failed to set file offset to 0");
622 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
624 throw std::logic_error(
"Failed to write positions table to ledger");
627 if (fflush(file) != 0)
629 throw std::logic_error(
630 fmt::format(
"Failed to flush ledger file: {}", strerror(errno)));
638 bool rename(
const std::string& new_file_name)
640 auto file_path = dir / file_name;
641 auto new_file_path = dir / new_file_name;
645 files::rename(file_path, new_file_path);
647 catch (
const std::exception& e)
651 LOG_FAIL_FMT(
"Error renaming ledger file: {}", e.what());
653 file_name = new_file_name;
659 auto new_file_name = remove_recovery_suffix(file_name.c_str());
662 LOG_DEBUG_FMT(
"Open recovery ledger file {}", new_file_name);
673 if (fflush(file) != 0)
675 throw std::logic_error(
676 fmt::format(
"Failed to flush ledger file: {}", strerror(errno)));
679 auto committed_file_name = fmt::format(
684 ledger_committed_suffix);
688 committed_file_name = fmt::format(
689 "{}.{}", committed_file_name, ledger_recovery_file_suffix);
692 if (!
rename(committed_file_name))
712 const fs::path ledger_dir;
715 std::vector<fs::path> read_ledger_dirs;
719 std::list<std::shared_ptr<LedgerFile>>
files;
722 size_t max_read_cache_files;
723 std::list<std::shared_ptr<LedgerFile>> files_read_cache;
727 size_t committed_idx = 0;
729 size_t end_of_committed_files_idx = 0;
733 bool use_existing_files =
false;
736 std::optional<size_t> last_idx_on_init = std::nullopt;
740 std::optional<size_t> recovery_start_idx = std::nullopt;
742 auto get_it_contains_idx(
size_t idx)
const
749 auto f = std::upper_bound(
753 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
754 return (idx <= f->get_last_idx());
760 std::shared_ptr<LedgerFile> get_file_from_cache(
size_t idx)
768 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
771 for (
auto const& f : files_read_cache)
773 if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
784 std::string ledger_dir_;
785 auto match = get_file_name_with_idx(ledger_dir, idx,
true);
786 if (match.has_value())
788 ledger_dir_ = ledger_dir;
792 for (
auto const& dir : read_ledger_dirs)
794 match = get_file_name_with_idx(dir, idx,
false);
795 if (match.has_value())
803 if (!match.has_value())
810 std::shared_ptr<LedgerFile> match_file =
nullptr;
813 match_file = std::make_shared<LedgerFile>(ledger_dir_, match.value());
815 catch (
const std::exception& e)
818 "Could not open ledger file {} to read seqno {}: {}",
826 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
828 files_read_cache.emplace_back(match_file);
829 if (files_read_cache.size() > max_read_cache_files)
831 files_read_cache.erase(files_read_cache.begin());
838 std::shared_ptr<LedgerFile> get_file_from_idx(
839 size_t idx,
bool read_cache_only =
false)
846 if (!read_cache_only)
849 auto f = std::upper_bound(
853 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
854 return idx >= f->get_start_idx();
857 if (f !=
files.rend())
864 return get_file_from_cache(idx);
867 std::shared_ptr<LedgerFile> get_latest_file(
868 bool incomplete_only =
true)
const
874 const auto& last_file =
files.back();
875 if (incomplete_only && last_file->is_complete())
883 std::optional<LedgerReadResult> read_entries_range(
886 bool read_cache_only =
false,
887 std::optional<size_t> max_entries_size = std::nullopt)
892 if ((from <= 0) || (to < from))
911 auto f_from = get_file_from_idx(idx, read_cache_only);
912 if (f_from ==
nullptr)
914 LOG_FAIL_FMT(
"Cannot find ledger file for seqno {}", idx);
917 auto to_ = std::min(f_from->get_last_idx(), to);
918 std::optional<size_t> max_size = std::nullopt;
919 if (max_entries_size.has_value())
921 max_size = max_entries_size.value() - rr.
data.size();
923 auto v = f_from->read_entries(idx, to_, max_size);
931 std::make_move_iterator(v->data.begin()),
932 std::make_move_iterator(v->data.end()));
933 if (v->end_idx != to_)
944 if (!rr.
data.empty())
954 void ignore_ledger_file(
const std::string& file_name)
956 if (is_ledger_file_name_ignored(file_name))
961 auto ignored_file_name =
962 fmt::format(
"{}.{}", file_name, ledger_ignored_file_suffix);
963 files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name);
966 void delete_ledger_files_after_idx(
size_t idx)
969 for (
auto const& f : fs::directory_iterator(ledger_dir))
971 auto file_name = f.path().filename();
972 auto start_idx = get_start_idx_from_file_name(file_name);
975 if (!fs::remove(ledger_dir / file_name))
977 throw std::logic_error(
978 fmt::format(
"Could not remove file {}", file_name));
981 "Forcing removal of ledger file {} as start idx {} > {}",
989 std::shared_ptr<LedgerFile> get_existing_ledger_file_for_idx(
size_t idx)
991 if (!use_existing_files)
996 for (
auto const& f : fs::directory_iterator(ledger_dir))
998 auto file_name = f.path().filename();
1000 idx == get_start_idx_from_file_name(file_name) &&
1001 !is_ledger_file_ignored(file_name))
1003 return std::make_shared<LedgerFile>(
1004 ledger_dir, file_name,
true );
1015 const fs::path& ledger_dir,
1017 size_t max_read_cache_files = ledger_max_read_cache_files_default,
1018 const std::vector<std::string>& read_ledger_dirs_ = {}) :
1019 to_enclave(writer_factory.create_writer_to_inside()),
1020 ledger_dir(ledger_dir),
1021 max_read_cache_files(max_read_cache_files)
1024 for (
const auto& read_dir : read_ledger_dirs_)
1026 LOG_INFO_FMT(
"Recovering read-only ledger directory \"{}\"", read_dir);
1027 if (!fs::is_directory(read_dir))
1029 throw std::logic_error(
1030 fmt::format(
"{} read-only ledger is not a directory", read_dir));
1033 read_ledger_dirs.emplace_back(read_dir);
1035 for (
auto const& f : fs::directory_iterator(read_dir))
1037 auto file_name = f.path().filename();
1038 auto last_idx_ = get_last_idx_from_file_name(file_name);
1040 !last_idx_.has_value() ||
1041 !is_ledger_file_name_committed(file_name) ||
1042 is_ledger_file_name_ignored(file_name))
1045 "Read-only ledger file {} is ignored as not committed",
1050 if (last_idx_.value() > last_idx)
1052 last_idx = last_idx_.value();
1053 committed_idx = last_idx;
1054 end_of_committed_files_idx = last_idx;
1058 "Recovering file from read-only ledger directory: {}", file_name);
1065 "Recovered read-only ledger directories up to {}, committed up to "
1071 if (fs::is_directory(ledger_dir))
1077 LOG_INFO_FMT(
"Recovering main ledger directory {}", ledger_dir);
1079 for (
auto const& f : fs::directory_iterator(ledger_dir))
1081 auto file_name = f.path().filename();
1083 if (is_ledger_file_ignored(file_name))
1086 "Ignoring ledger file {} in main ledger directory", file_name);
1088 ignore_ledger_file(file_name);
1093 const auto file_end_idx = get_last_idx_from_file_name(file_name);
1095 if (is_ledger_file_name_committed(file_name))
1097 if (!file_end_idx.has_value())
1100 "Unexpected file {} in {}: committed but not completed",
1106 if (file_end_idx.value() > committed_idx)
1108 committed_idx = file_end_idx.value();
1109 end_of_committed_files_idx = file_end_idx.value();
1116 if (file_end_idx.has_value() && file_end_idx.value() <= committed_idx)
1119 "Ignoring ledger file {} in main ledger directory - already "
1120 "discovered commit up to {} from read-only directories",
1124 ignore_ledger_file(file_name);
1129 std::shared_ptr<LedgerFile> ledger_file =
nullptr;
1132 ledger_file = std::make_shared<LedgerFile>(ledger_dir, file_name);
1136 if (ledger_file->truncate(ledger_file->get_last_idx()))
1144 catch (
const std::exception& e)
1147 "Error reading ledger file {}: {}", file_name, e.what());
1149 ignore_ledger_file(file_name);
1154 "Recovering file from main ledger directory: {}", file_name);
1155 files.emplace_back(std::move(ledger_file));
1161 "Main ledger directory {} is empty: no ledger file to "
1169 last_idx = committed_idx;
1175 "Main ledger directory {} contains {} restored (writeable) files",
1181 const std::shared_ptr<LedgerFile>& a,
1182 const std::shared_ptr<LedgerFile>& b) {
1183 return a->get_last_idx() < b->get_last_idx();
1186 const auto main_ledger_dir_last_idx =
1187 get_latest_file(
false)->get_last_idx();
1188 if (main_ledger_dir_last_idx > last_idx)
1190 last_idx = main_ledger_dir_last_idx;
1195 if (!fs::create_directory(ledger_dir))
1197 throw std::logic_error(fmt::format(
1198 "Error: Could not create ledger directory: {}", ledger_dir));
1203 "Recovered ledger entries up to {}, committed to {}",
1210 void init(
size_t idx,
size_t recovery_start_idx_ = 0)
1213 fmt::format(
"Initing ledger - seqno={}", idx));
1222 for (
auto const& f : fs::directory_iterator(ledger_dir))
1224 auto file_name = f.path().filename();
1226 is_ledger_file_name_committed(file_name) &&
1227 (get_start_idx_from_file_name(file_name) > idx))
1229 auto last_idx_file = get_last_idx_from_file_name(file_name);
1230 if (!last_idx_file.has_value())
1232 throw std::logic_error(fmt::format(
1233 "Committed ledger file {} does not include last idx in file name",
1238 "Remove committed suffix from ledger file {} after init at {}: "
1242 last_idx_file.value());
1245 ledger_dir / file_name,
1251 ledger_last_idx_delimiter,
1252 last_idx_file.value(),
1253 ledger_committed_suffix)));
1261 use_existing_files =
true;
1262 last_idx_on_init = last_idx;
1264 committed_idx = idx;
1265 if (recovery_start_idx_ > 0)
1269 recovery_start_idx = recovery_start_idx_;
1273 "Set last known/commit seqno to {}, recovery seqno to {}",
1275 recovery_start_idx_);
1285 for (
auto it =
files.begin(); it !=
files.end();)
1288 if (f->is_recovery())
1296 if (f->is_committed())
1298 it =
files.erase(it);
1305 recovery_start_idx.reset();
1315 recovery_start_idx = idx;
1321 fmt::format(
"Reading ledger entry at {}", idx));
1323 return read_entries_range(idx, idx);
1329 std::optional<size_t> max_entries_size = std::nullopt)
1332 fmt::format(
"Reading ledger entries from {} to {}", from, to));
1334 return read_entries_range(from, to,
false, max_entries_size);
1337 size_t write_entry(
const uint8_t* data,
size_t size,
bool committable)
1340 "Writing ledger entry - {} bytes, committable={}", size, committable));
1343 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1348 "Forcing ledger chunk before entry as required by the entry header "
1351 auto file = get_latest_file();
1352 if (file !=
nullptr)
1355 LOG_DEBUG_FMT(
"Ledger chunk completed at {}", file->get_last_idx());
1359 bool force_chunk_after =
1361 if (force_chunk_after)
1365 throw std::logic_error(
1366 "Ledger chunks cannot end in a non-committable transaction");
1369 "Forcing ledger chunk after entry as required by the entry header "
1373 auto file = get_latest_file();
1374 if (file ==
nullptr)
1377 size_t start_idx = last_idx + 1;
1378 if (use_existing_files)
1382 file = get_existing_ledger_file_for_idx(start_idx);
1384 if (file ==
nullptr)
1386 bool is_recovery = recovery_start_idx.has_value() &&
1387 start_idx > recovery_start_idx.value();
1389 std::make_shared<LedgerFile>(ledger_dir, start_idx, is_recovery);
1391 files.emplace_back(file);
1393 auto [last_idx_, has_truncated] =
1394 file->write_entry(data, size, committable);
1395 last_idx = last_idx_;
1401 LOG_INFO_FMT(
"Found divergent ledger entry at {}", last_idx);
1402 delete_ledger_files_after_idx(last_idx);
1403 use_existing_files =
false;
1407 use_existing_files && last_idx_on_init.has_value() &&
1408 last_idx > last_idx_on_init.value())
1410 use_existing_files =
false;
1414 "Wrote entry at {} [committable: {}, force chunk after: {}]",
1419 if (committable && force_chunk_after)
1430 TimeBoundLogger log_if_slow(fmt::format(
"Truncating ledger at {}", idx));
1437 if (last_idx != 0 && (idx >= last_idx || idx < committed_idx))
1440 "Ignoring truncate to {} - last_idx: {}, committed_idx: {}",
1447 auto f_from = get_it_contains_idx(idx + 1);
1448 auto f_to = get_it_contains_idx(last_idx);
1449 auto f_end = std::next(f_to);
1451 for (
auto it = f_from; it != f_end;)
1455 auto truncate_idx = (it == f_from) ? idx : (*it)->get_start_idx() - 1;
1456 if ((*it)->truncate(truncate_idx))
1458 it =
files.erase(it);
1472 fmt::format(
"Committing ledger entry {}", idx));
1476 if (idx <= committed_idx || idx > last_idx)
1481 auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
1482 get_it_contains_idx(committed_idx);
1483 auto f_to = get_it_contains_idx(idx);
1484 auto f_end = std::next(f_to);
1486 for (
auto it = f_from; it != f_end;)
1490 const auto last_idx_in_file = (*it)->get_last_idx();
1491 auto commit_idx = (it == f_to) ? idx : last_idx_in_file;
1493 (*it)->commit(commit_idx) &&
1494 (it != f_to || (idx == last_idx_in_file)))
1496 end_of_committed_files_idx = last_idx_in_file;
1497 it =
files.erase(it);
1505 committed_idx = idx;
1510 return idx <= end_of_committed_files_idx;
1524 std::function<void(std::optional<LedgerReadResult>&&,
int)>;
1535 data->
read_result = data->ledger->read_entries_range(
1536 data->from_idx, data->to_idx,
true, data->max_size);
1543 data->
result_cb(std::move(data->read_result), status);
1552 std::optional<LedgerReadResult>&& read_result,
1555 if (read_result.has_value())
1558 ::consensus::ledger_entry_range,
1561 read_result->end_idx,
1568 ::consensus::ledger_no_entry_range,
1581 ::consensus::ledger_init,
1582 [
this](
const uint8_t* data,
size_t size) {
1583 auto idx = serialized::read<::consensus::Index>(data, size);
1584 auto recovery_start_index =
1585 serialized::read<::consensus::Index>(data, size);
1586 init(idx, recovery_start_index);
1591 ::consensus::ledger_append,
1592 [
this](
const uint8_t* data,
size_t size) {
1593 auto committable = serialized::read<bool>(data, size);
1599 ::consensus::ledger_truncate,
1600 [
this](
const uint8_t* data,
size_t size) {
1601 auto idx = serialized::read<::consensus::Index>(data, size);
1602 auto recovery_mode = serialized::read<bool>(data, size);
1612 ::consensus::ledger_commit,
1613 [
this](
const uint8_t* data,
size_t size) {
1614 auto idx = serialized::read<::consensus::Index>(data, size);
1619 disp, ::consensus::ledger_open, [
this](
const uint8_t*,
size_t) {
1625 ::consensus::ledger_get_range,
1626 [&](
const uint8_t* data,
size_t size) {
1627 auto [from_idx, to_idx, purpose] =
1628 ringbuffer::read_message<::consensus::ledger_get_range>(data, size);
1632 constexpr size_t write_ledger_range_response_metadata_size = 2048;
1633 auto max_entries_size = to_enclave->get_max_message_size() -
1634 write_ledger_range_response_metadata_size;
1640 uv_work_t* work_handle =
new uv_work_t;
1645 job->from_idx = from_idx;
1646 job->to_idx = to_idx;
1647 job->max_size = max_entries_size;
1648 job->result_cb = [
this,
1649 from_idx_ = from_idx,
1652 purpose](
auto&& read_result,
int status) {
1656 from_idx_, to_idx_, std::move(read_result), purpose_);
1659 work_handle->data = job;
std::pair< size_t, bool > write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:391
LedgerFile(const std::string &dir, const std::string &file_name_, bool from_existing_file_=false)
Definition ledger.h:212
LedgerFile(const fs::path &dir, size_t start_idx, bool recovery=false)
Definition ledger.h:178
bool truncate(size_t idx, bool remove_file_if_empty=true)
Definition ledger.h:536
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt)
Definition ledger.h:495
~LedgerFile()
Definition ledger.h:351
bool commit(size_t idx)
Definition ledger.h:665
bool is_committed() const
Definition ledger.h:374
void complete()
Definition ledger.h:590
bool is_recovery() const
Definition ledger.h:384
std::pair< size_t, size_t > entries_size(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt) const
Definition ledger.h:441
bool rename(const std::string &new_file_name)
Definition ledger.h:638
bool is_complete() const
Definition ledger.h:379
void open()
Definition ledger.h:657
size_t get_last_idx() const
Definition ledger.h:364
size_t get_current_size() const
Definition ledger.h:369
size_t get_start_idx() const
Definition ledger.h:359
std::optional< LedgerReadResult > read_entry(size_t idx)
Definition ledger.h:1318
Ledger(const fs::path &ledger_dir, ringbuffer::AbstractWriterFactory &writer_factory, size_t max_read_cache_files=ledger_max_read_cache_files_default, const std::vector< std::string > &read_ledger_dirs_={})
Definition ledger.h:1014
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1326
size_t write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:1337
void truncate(size_t idx)
Definition ledger.h:1428
static void on_ledger_get_async_complete(uv_work_t *req, int status)
Definition ledger.h:1539
void complete_recovery()
Definition ledger.h:1278
bool is_in_committed_file(size_t idx)
Definition ledger.h:1508
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition ledger.h:1576
static void on_ledger_get_async(uv_work_t *req)
Definition ledger.h:1531
Ledger(const Ledger &that)=delete
void write_ledger_get_range_response(size_t from_idx, size_t to_idx, std::optional< LedgerReadResult > &&read_result, ::consensus::LedgerRequestPurpose purpose)
Definition ledger.h:1549
void set_recovery_start_idx(size_t idx)
Definition ledger.h:1313
void init(size_t idx, size_t recovery_start_idx_=0)
Definition ledger.h:1210
size_t get_last_idx() const
Definition ledger.h:1308
void commit(size_t idx)
Definition ledger.h:1469
Definition messaging.h:38
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::mutex Mutex
Definition locking.h:12
LedgerRequestPurpose
Definition ledger_enclave_types.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
std::vector< uint8_t > data
Definition ledger.h:143
size_t end_idx
Definition ledger.h:144
std::optional< LedgerReadResult > read_result
Definition ledger.h:1528
Ledger * ledger
Definition ledger.h:1516
size_t max_size
Definition ledger.h:1519
ResultCallback result_cb
Definition ledger.h:1525
size_t from_idx
Definition ledger.h:1517
size_t to_idx
Definition ledger.h:1518
std::function< void(std::optional< LedgerReadResult > &&, int)> ResultCallback
Definition ledger.h:1524
Definition time_bound_logger.h:14