27namespace fs = std::filesystem;
31 static constexpr size_t ledger_max_read_cache_files_default = 5;
33 static constexpr auto ledger_committed_suffix =
"committed";
34 static constexpr auto ledger_start_idx_delimiter =
"_";
35 static constexpr auto ledger_last_idx_delimiter =
"-";
36 static constexpr auto ledger_recovery_file_suffix =
"recovery";
37 static constexpr auto ledger_ignored_file_suffix =
"ignored";
39 static inline size_t get_start_idx_from_file_name(
40 const std::string& file_name)
42 auto pos = file_name.find(ledger_start_idx_delimiter);
43 if (pos == std::string::npos)
45 throw std::logic_error(fmt::format(
46 "Ledger file name {} does not contain a start seqno", file_name));
49 return std::stol(file_name.substr(pos + 1));
52 static inline std::optional<size_t> get_last_idx_from_file_name(
53 const std::string& file_name)
55 auto pos = file_name.find(ledger_last_idx_delimiter);
56 if (pos == std::string::npos)
62 return std::stol(file_name.substr(pos + 1));
65 static inline bool is_ledger_file_name_committed(
const std::string& file_name)
67 return file_name.ends_with(ledger_committed_suffix);
70 static inline bool is_ledger_file_name_recovery(
const std::string& file_name)
72 return file_name.ends_with(ledger_recovery_file_suffix);
75 static inline bool is_ledger_file_name_ignored(
const std::string& file_name)
77 return file_name.ends_with(ledger_ignored_file_suffix);
80 static inline bool is_ledger_file_ignored(
const std::string& file_name)
83 return is_ledger_file_name_recovery(file_name) ||
84 is_ledger_file_name_ignored(file_name);
87 static inline fs::path remove_suffix(
88 std::string_view file_name,
const std::string& suffix)
90 if (file_name.ends_with(suffix))
92 file_name.remove_suffix(suffix.size());
97 static inline fs::path remove_recovery_suffix(std::string_view file_name)
100 file_name, fmt::format(
".{}", ledger_recovery_file_suffix));
103 static std::optional<std::string> get_file_name_with_idx(
104 const std::string& dir,
size_t idx,
bool allow_recovery_files)
106 std::optional<std::string> match = std::nullopt;
107 for (
auto const& f : fs::directory_iterator(dir))
111 auto f_name = f.path().filename();
113 is_ledger_file_name_ignored(f_name) ||
114 (!allow_recovery_files && is_ledger_file_name_recovery(f_name)))
119 size_t start_idx = 0;
120 std::optional<size_t> last_idx = std::nullopt;
123 start_idx = get_start_idx_from_file_name(f_name);
124 last_idx = get_last_idx_from_file_name(f_name);
126 catch (
const std::exception& e)
131 if (idx >= start_idx && last_idx.has_value() && idx <= last_idx.value())
150 using positions_offset_header_t = size_t;
151 static constexpr auto file_name_prefix =
"ledger";
158 FILE* file =
nullptr;
161 size_t start_idx = 1;
162 size_t total_len = 0;
163 std::vector<uint32_t> positions;
165 bool completed =
false;
166 bool committed =
false;
168 bool recovery =
false;
174 bool from_existing_file =
false;
178 LedgerFile(
const fs::path& dir,
size_t start_idx,
bool recovery =
false) :
180 file_name(fmt::format(
"{}_{}", file_name_prefix, start_idx)),
181 start_idx(start_idx),
187 fmt::format(
"{}.{}", file_name.string(), ledger_recovery_file_suffix);
190 auto file_path = dir / file_name;
191 if (fs::exists(file_path))
193 throw std::logic_error(fmt::format(
194 "Cannot create new ledger file {} in main ledger directory {} as it "
200 file = fopen(file_path.c_str(),
"w+b");
203 throw std::logic_error(fmt::format(
204 "Unable to open ledger file {}: {}",
206 std::strerror(errno)));
210 fseeko(file,
sizeof(positions_offset_header_t), SEEK_SET);
211 total_len =
sizeof(positions_offset_header_t);
216 const std::string& dir,
217 const std::string& file_name_,
218 bool from_existing_file_ =
false) :
220 file_name(file_name_),
221 from_existing_file(from_existing_file_)
223 auto file_path = (fs::path(dir) / fs::path(file_name));
225 committed = is_ledger_file_name_committed(file_name);
226 start_idx = get_start_idx_from_file_name(file_name);
228 const auto*
const mode = committed ?
"rb" :
"r+b";
231 file = fopen(file_path.c_str(), mode);
235 throw std::logic_error(fmt::format(
236 "Unable to open ledger file {}: {}",
238 std::strerror(errno)));
242 fseeko(file, 0, SEEK_END);
243 size_t total_file_size = ftello(file);
246 fseeko(file, 0, SEEK_SET);
247 positions_offset_header_t table_offset = 0;
248 if (fread(&table_offset,
sizeof(positions_offset_header_t), 1, file) != 1)
250 throw std::logic_error(fmt::format(
251 "Failed to read positions offset from ledger file {}", file_path));
254 if (committed && table_offset == 0)
256 throw std::logic_error(fmt::format(
257 "Committed ledger file {} cannot be read: invalid table offset (0)",
261 total_len =
sizeof(positions_offset_header_t);
263 if (from_existing_file)
271 if (table_offset != 0)
274 total_len = table_offset;
275 fseeko(file, table_offset, SEEK_SET);
277 if (table_offset > total_file_size)
279 throw std::logic_error(fmt::format(
280 "Invalid table offset {} greater than total file size {}",
286 (total_file_size - table_offset) /
sizeof(positions.at(0)));
291 sizeof(positions.at(0)),
293 file) != positions.size())
295 throw std::logic_error(fmt::format(
296 "Failed to read positions table from ledger file {}", file_path));
304 total_len =
sizeof(positions_offset_header_t);
305 auto len = total_file_size - total_len;
308 size_t current_idx = start_idx;
309 while (len >= ccf::kv::serialised_entry_header_size)
313 &entry_header, ccf::kv::serialised_entry_header_size, 1, file) !=
317 "Failed to read entry header from ledger file {} at seqno {}",
323 len -= ccf::kv::serialised_entry_header_size;
325 const auto& entry_size = entry_header.
size;
326 if (len < entry_size)
329 "Malformed incomplete ledger file {} at seqno {} (expecting "
340 fseeko(file, entry_size, SEEK_CUR);
344 "Recovered one entry of size {} at seqno {}",
349 positions.push_back(total_len);
350 total_len += (ccf::kv::serialised_entry_header_size + entry_size);
372 return start_idx + positions.size() - 1;
398 const uint8_t* data,
size_t size,
bool committable)
400 fseeko(file, total_len, SEEK_SET);
402 bool should_write =
true;
403 bool has_truncated =
false;
404 if (from_existing_file)
406 std::vector<uint8_t> entry(size);
408 fread(entry.data(), size, 1, file) != 1 ||
409 memcmp(entry.data(), data, size) != 0)
416 has_truncated =
true;
417 from_existing_file =
false;
421 should_write =
false;
427 if (fwrite(data, size, 1, file) != 1)
429 throw std::logic_error(
"Failed to write entry to ledger");
433 if (committable && fflush(file) != 0)
435 throw std::logic_error(fmt::format(
436 "Failed to flush entry to ledger: {}",
437 std::strerror(errno)));
441 positions.push_back(total_len);
451 std::optional<size_t> max_size = std::nullopt)
const
453 if ((from < start_idx) || (to < from) || (to >
get_last_idx()))
464 (to ==
get_last_idx()) ? total_len : positions.at(to - start_idx + 1);
465 size = position_to - positions.at(from - start_idx);
467 if (!max_size.has_value() || size <= max_size.value())
476 "Single ledger entry at {} in file {} is too large for remaining "
477 "space (size {} > max {})",
484 size_t to_ = from + (to - from) / 2;
486 "Requesting ledger entries from {} to {} in file {} but size {} > "
487 "max size {}: now requesting up to {}",
501 size_t from,
size_t to, std::optional<size_t> max_size = std::nullopt)
503 if ((from < start_idx) || (to >
get_last_idx()) || (to < from))
506 "Cannot find entries: {} - {} in ledger file {}",
514 "Read entries from {} to {} in {} [max size: {}]",
518 max_size.value_or(0));
520 std::unique_lock<ccf::pal::Mutex> guard(file_lock);
526 std::vector<uint8_t> entries(size);
527 fseeko(file, positions.at(from - start_idx), SEEK_SET);
529 if (fread(entries.data(), size, 1, file) != 1)
531 throw std::logic_error(fmt::format(
532 "Failed to read entry range {} - {} from file {}",
541 bool truncate(
size_t idx,
bool remove_file_if_empty =
true)
544 committed || (idx < start_idx - 1) ||
550 if (remove_file_if_empty && idx == start_idx - 1)
553 if (!fs::remove(dir / file_name))
555 throw std::logic_error(
556 fmt::format(
"Could not remove file {}", file_name));
559 "Removed ledger file {} on truncation at {}", file_name, idx);
564 fseeko(file, 0, SEEK_SET);
565 positions_offset_header_t table_offset = 0;
566 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
568 throw std::logic_error(
"Failed to reset positions table offset");
574 total_len = positions.at(idx - start_idx + 1);
575 positions.resize(idx - start_idx + 1);
578 if (fflush(file) != 0)
580 throw std::logic_error(fmt::format(
581 "Failed to flush ledger file: {}",
582 std::strerror(errno)));
585 if (ftruncate(fileno(file), total_len) != 0)
587 throw std::logic_error(fmt::format(
588 "Failed to truncate ledger: {}",
589 std::strerror(errno)));
592 fseeko(file, total_len, SEEK_SET);
593 LOG_TRACE_FMT(
"Truncated ledger file {} at seqno {}", file_name, idx);
610 fseeko(file, total_len, SEEK_SET);
611 size_t table_offset = ftello(file);
615 reinterpret_cast<uint8_t*
>(positions.data()),
616 sizeof(positions.at(0)),
618 file) != positions.size())
620 throw std::logic_error(
"Failed to write positions table to ledger");
624 if (fseeko(file, 0, SEEK_SET) != 0)
626 throw std::logic_error(
"Failed to set file offset to 0");
629 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
631 throw std::logic_error(
"Failed to write positions table to ledger");
634 if (fflush(file) != 0)
636 throw std::logic_error(fmt::format(
637 "Failed to flush ledger file: {}",
638 std::strerror(errno)));
646 bool rename(
const std::string& new_file_name)
648 auto file_path = dir / file_name;
649 auto new_file_path = dir / new_file_name;
653 files::rename(file_path, new_file_path);
655 catch (
const std::exception& e)
659 LOG_FAIL_FMT(
"Error renaming ledger file: {}", e.what());
661 file_name = new_file_name;
667 auto new_file_name = remove_recovery_suffix(file_name.c_str());
670 LOG_DEBUG_FMT(
"Open recovery ledger file {}", new_file_name);
681 if (fflush(file) != 0)
683 throw std::logic_error(fmt::format(
684 "Failed to flush ledger file: {}",
685 std::strerror(errno)));
688 auto committed_file_name = fmt::format(
693 ledger_committed_suffix);
697 committed_file_name = fmt::format(
698 "{}.{}", committed_file_name, ledger_recovery_file_suffix);
701 if (!
rename(committed_file_name))
721 const fs::path ledger_dir;
724 std::vector<fs::path> read_ledger_dirs;
728 std::list<std::shared_ptr<LedgerFile>>
files;
731 size_t max_read_cache_files;
732 std::list<std::shared_ptr<LedgerFile>> files_read_cache;
735 std::atomic<size_t> last_idx = 0;
736 size_t committed_idx = 0;
738 size_t end_of_committed_files_idx = 0;
742 bool use_existing_files =
false;
745 std::optional<size_t> last_idx_on_init = std::nullopt;
749 std::optional<size_t> recovery_start_idx = std::nullopt;
751 [[nodiscard]]
auto get_it_contains_idx(
size_t idx)
const
758 auto f = std::upper_bound(
762 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
763 return (idx <= f->get_last_idx());
769 std::shared_ptr<LedgerFile> get_file_from_cache(
size_t idx)
777 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
780 for (
auto const& f : files_read_cache)
782 if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
793 std::string ledger_dir_;
794 auto match = get_file_name_with_idx(ledger_dir, idx,
true);
795 if (match.has_value())
797 ledger_dir_ = ledger_dir;
801 for (
auto const& dir : read_ledger_dirs)
803 match = get_file_name_with_idx(dir, idx,
false);
804 if (match.has_value())
812 if (!match.has_value())
819 std::shared_ptr<LedgerFile> match_file =
nullptr;
822 match_file = std::make_shared<LedgerFile>(
823 ledger_dir_, *match);
825 catch (
const std::exception& e)
828 "Could not open ledger file {} to read seqno {}: {}",
836 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
838 files_read_cache.emplace_back(match_file);
839 if (files_read_cache.size() > max_read_cache_files)
841 files_read_cache.erase(files_read_cache.begin());
848 std::shared_ptr<LedgerFile> get_file_from_idx(
849 size_t idx,
bool read_cache_only =
false)
856 if (!read_cache_only)
859 auto f = std::upper_bound(
863 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
864 return idx >= f->get_start_idx();
867 if (f !=
files.rend())
874 return get_file_from_cache(idx);
877 [[nodiscard]] std::shared_ptr<LedgerFile> get_latest_file(
878 bool incomplete_only =
true)
const
884 const auto& last_file =
files.back();
885 if (incomplete_only && last_file->is_complete())
893 std::optional<LedgerReadResult> read_entries_range(
896 bool read_cache_only =
false,
897 std::optional<size_t> max_entries_size = std::nullopt)
902 if ((from <= 0) || (to < from))
921 auto f_from = get_file_from_idx(idx, read_cache_only);
922 if (f_from ==
nullptr)
924 LOG_FAIL_FMT(
"Cannot find ledger file for seqno {}", idx);
927 auto to_ = std::min(f_from->get_last_idx(), to);
928 std::optional<size_t> max_size = std::nullopt;
929 if (max_entries_size.has_value())
931 max_size = max_entries_size.value() - rr.
data.size();
933 auto v = f_from->read_entries(idx, to_, max_size);
941 std::make_move_iterator(v->data.begin()),
942 std::make_move_iterator(v->data.end()));
943 if (v->end_idx != to_)
954 if (!rr.
data.empty())
962 void ignore_ledger_file(
const std::string& file_name)
964 if (is_ledger_file_name_ignored(file_name))
969 auto ignored_file_name =
970 fmt::format(
"{}.{}", file_name, ledger_ignored_file_suffix);
971 files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name);
974 void delete_ledger_files_after_idx(
size_t idx)
977 for (
auto const& f : fs::directory_iterator(ledger_dir))
979 auto file_name = f.path().filename();
980 auto start_idx = get_start_idx_from_file_name(file_name);
983 if (!fs::remove(ledger_dir / file_name))
985 throw std::logic_error(
986 fmt::format(
"Could not remove file {}", file_name));
989 "Forcing removal of ledger file {} as start idx {} > {}",
997 std::shared_ptr<LedgerFile> get_existing_ledger_file_for_idx(
size_t idx)
999 if (!use_existing_files)
1004 for (
auto const& f : fs::directory_iterator(ledger_dir))
1006 auto file_name = f.path().filename();
1008 idx == get_start_idx_from_file_name(file_name) &&
1009 !is_ledger_file_ignored(file_name))
1011 return std::make_shared<LedgerFile>(
1012 ledger_dir, file_name,
true );
1023 const fs::path& ledger_dir,
1025 size_t max_read_cache_files = ledger_max_read_cache_files_default,
1026 const std::vector<std::string>& read_ledger_dirs_ = {}) :
1027 to_enclave(writer_factory.create_writer_to_inside()),
1028 ledger_dir(ledger_dir),
1029 max_read_cache_files(max_read_cache_files)
1032 for (
const auto& read_dir : read_ledger_dirs_)
1034 LOG_INFO_FMT(
"Recovering read-only ledger directory \"{}\"", read_dir);
1035 if (!fs::is_directory(read_dir))
1037 throw std::logic_error(
1038 fmt::format(
"{} read-only ledger is not a directory", read_dir));
1041 read_ledger_dirs.emplace_back(read_dir);
1043 for (
auto const& f : fs::directory_iterator(read_dir))
1045 auto file_name = f.path().filename();
1046 auto last_idx_ = get_last_idx_from_file_name(file_name);
1048 !last_idx_.has_value() ||
1049 !is_ledger_file_name_committed(file_name) ||
1050 is_ledger_file_name_ignored(file_name))
1053 "Read-only ledger file {} is ignored as not committed",
1058 if (last_idx_.value() > last_idx)
1060 last_idx = last_idx_.value();
1061 committed_idx = last_idx;
1062 end_of_committed_files_idx = last_idx;
1066 "Recovering file from read-only ledger directory: {}", file_name);
1073 "Recovered read-only ledger directories up to {}, committed up to "
1079 if (fs::is_directory(ledger_dir))
1085 LOG_INFO_FMT(
"Recovering main ledger directory {}", ledger_dir);
1087 for (
auto const& f : fs::directory_iterator(ledger_dir))
1089 auto file_name = f.path().filename();
1091 if (is_ledger_file_ignored(file_name))
1094 "Ignoring ledger file {} in main ledger directory", file_name);
1096 ignore_ledger_file(file_name);
1101 const auto file_end_idx = get_last_idx_from_file_name(file_name);
1103 if (is_ledger_file_name_committed(file_name))
1105 if (!file_end_idx.has_value())
1108 "Unexpected file {} in {}: committed but not completed",
1114 if (file_end_idx.value() > committed_idx)
1116 committed_idx = file_end_idx.value();
1117 end_of_committed_files_idx = file_end_idx.value();
1124 if (file_end_idx.has_value() && file_end_idx.value() <= committed_idx)
1127 "Ignoring ledger file {} in main ledger directory - already "
1128 "discovered commit up to {} from read-only directories",
1132 ignore_ledger_file(file_name);
1137 std::shared_ptr<LedgerFile> ledger_file =
nullptr;
1140 ledger_file = std::make_shared<LedgerFile>(ledger_dir, file_name);
1144 if (ledger_file->truncate(ledger_file->get_last_idx()))
1152 catch (
const std::exception& e)
1155 "Error reading ledger file {}: {}", file_name, e.what());
1157 ignore_ledger_file(file_name);
1162 "Recovering file from main ledger directory: {}", file_name);
1163 files.emplace_back(std::move(ledger_file));
1169 "Main ledger directory {} is empty: no ledger file to "
1177 last_idx = committed_idx;
1182 "Main ledger directory {} contains {} restored (writeable) files",
1187 const std::shared_ptr<LedgerFile>& a,
1188 const std::shared_ptr<LedgerFile>& b) {
1189 return a->get_last_idx() < b->get_last_idx();
1192 const auto main_ledger_dir_last_idx =
1193 get_latest_file(
false)->get_last_idx();
1194 if (main_ledger_dir_last_idx > last_idx)
1196 last_idx = main_ledger_dir_last_idx;
1201 if (!fs::create_directory(ledger_dir))
1203 throw std::logic_error(fmt::format(
1204 "Error: Could not create ledger directory: {}", ledger_dir));
1209 "Recovered ledger entries up to {}, committed to {}",
1216 void init(
size_t idx,
size_t recovery_start_idx_ = 0)
1219 fmt::format(
"Initing ledger - seqno={}", idx));
1228 for (
auto const& f : fs::directory_iterator(ledger_dir))
1230 auto file_name = f.path().filename();
1232 is_ledger_file_name_committed(file_name) &&
1233 (get_start_idx_from_file_name(file_name) > idx))
1235 auto last_idx_file = get_last_idx_from_file_name(file_name);
1236 if (!last_idx_file.has_value())
1238 throw std::logic_error(fmt::format(
1239 "Committed ledger file {} does not include last idx in file name",
1244 "Remove committed suffix from ledger file {} after init at {}: "
1248 last_idx_file.value());
1251 ledger_dir / file_name,
1257 ledger_last_idx_delimiter,
1258 last_idx_file.value(),
1259 ledger_committed_suffix)));
1267 use_existing_files =
true;
1268 last_idx_on_init = last_idx;
1270 committed_idx = idx;
1271 if (recovery_start_idx_ > 0)
1275 recovery_start_idx = recovery_start_idx_;
1279 "Set last known/commit seqno to {}, recovery seqno to {}",
1281 recovery_start_idx_);
1291 for (
auto it =
files.begin(); it !=
files.end();)
1294 if (f->is_recovery())
1302 if (f->is_committed())
1304 it =
files.erase(it);
1311 recovery_start_idx.reset();
1321 recovery_start_idx = idx;
1327 fmt::format(
"Reading ledger entry at {}", idx));
1329 return read_entries_range(idx, idx);
1335 std::optional<size_t> max_entries_size = std::nullopt)
1338 fmt::format(
"Reading ledger entries from {} to {}", from, to));
1340 return read_entries_range(from, to,
false, max_entries_size);
1343 size_t write_entry(
const uint8_t* data,
size_t size,
bool committable)
1346 "Writing ledger entry - {} bytes, committable={}", size, committable));
1349 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1354 "Forcing ledger chunk before entry as required by the entry header "
1357 auto file = get_latest_file();
1358 if (file !=
nullptr)
1361 LOG_DEBUG_FMT(
"Ledger chunk completed at {}", file->get_last_idx());
1365 bool force_chunk_after =
1367 if (force_chunk_after)
1371 throw std::logic_error(
1372 "Ledger chunks cannot end in a non-committable transaction");
1375 "Forcing ledger chunk after entry as required by the entry header "
1379 auto file = get_latest_file();
1380 if (file ==
nullptr)
1383 size_t start_idx = last_idx + 1;
1384 if (use_existing_files)
1388 file = get_existing_ledger_file_for_idx(start_idx);
1390 if (file ==
nullptr)
1392 bool is_recovery = recovery_start_idx.has_value() &&
1393 start_idx > recovery_start_idx.value();
1395 std::make_shared<LedgerFile>(ledger_dir, start_idx, is_recovery);
1397 files.emplace_back(file);
1399 auto [last_idx_, has_truncated] =
1400 file->write_entry(data, size, committable);
1401 last_idx = last_idx_;
1407 LOG_INFO_FMT(
"Found divergent ledger entry at {}", last_idx);
1408 delete_ledger_files_after_idx(last_idx);
1409 use_existing_files =
false;
1413 use_existing_files && last_idx_on_init.has_value() &&
1414 last_idx > last_idx_on_init.value())
1416 use_existing_files =
false;
1420 "Wrote entry at {} [committable: {}, force chunk after: {}]",
1425 if (committable && force_chunk_after)
1436 TimeBoundLogger log_if_slow(fmt::format(
"Truncating ledger at {}", idx));
1443 if (last_idx != 0 && (idx >= last_idx || idx < committed_idx))
1446 "Ignoring truncate to {} - last_idx: {}, committed_idx: {}",
1453 auto f_from = get_it_contains_idx(idx + 1);
1454 auto f_to = get_it_contains_idx(last_idx);
1455 auto f_end = std::next(f_to);
1457 for (
auto it = f_from; it != f_end;)
1461 auto truncate_idx = (it == f_from) ? idx : (*it)->get_start_idx() - 1;
1462 if ((*it)->truncate(truncate_idx))
1464 it =
files.erase(it);
1478 fmt::format(
"Committing ledger entry {}", idx));
1482 if (idx <= committed_idx || idx > last_idx)
1487 auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
1488 get_it_contains_idx(committed_idx);
1489 auto f_to = get_it_contains_idx(idx);
1490 auto f_end = std::next(f_to);
1492 for (
auto it = f_from; it != f_end;)
1496 const auto last_idx_in_file = (*it)->get_last_idx();
1497 auto commit_idx = (it == f_to) ? idx : last_idx_in_file;
1499 (*it)->commit(commit_idx) &&
1500 (it != f_to || (idx == last_idx_in_file)))
1502 end_of_committed_files_idx = last_idx_in_file;
1503 it =
files.erase(it);
1511 committed_idx = idx;
1516 return idx <= end_of_committed_files_idx;
1530 std::function<void(std::optional<LedgerReadResult>&&,
int)>;
1541 data->
read_result = data->ledger->read_entries_range(
1542 data->from_idx, data->to_idx,
true, data->max_size);
1549 data->
result_cb(std::move(data->read_result), status);
1558 std::optional<LedgerReadResult>&& read_result,
1561 if (read_result.has_value())
1564 ::consensus::ledger_entry_range,
1567 read_result->end_idx,
1574 ::consensus::ledger_no_entry_range,
1587 ::consensus::ledger_init,
1588 [
this](
const uint8_t* data,
size_t size) {
1589 auto idx = serialized::read<::consensus::Index>(data, size);
1590 auto recovery_start_index =
1591 serialized::read<::consensus::Index>(data, size);
1592 init(idx, recovery_start_index);
1597 ::consensus::ledger_append,
1598 [
this](
const uint8_t* data,
size_t size) {
1599 auto committable = serialized::read<bool>(data, size);
1605 ::consensus::ledger_truncate,
1606 [
this](
const uint8_t* data,
size_t size) {
1607 auto idx = serialized::read<::consensus::Index>(data, size);
1608 auto recovery_mode = serialized::read<bool>(data, size);
1618 ::consensus::ledger_commit,
1619 [
this](
const uint8_t* data,
size_t size) {
1620 auto idx = serialized::read<::consensus::Index>(data, size);
1625 disp, ::consensus::ledger_open, [
this](
const uint8_t*,
size_t) {
1631 ::consensus::ledger_get_range,
1632 [&](
const uint8_t* data,
size_t size) {
1633 auto [from_idx, to_idx, purpose] =
1634 ringbuffer::read_message<::consensus::ledger_get_range>(data, size);
1638 constexpr size_t write_ledger_range_response_metadata_size = 2048;
1639 auto max_entries_size = to_enclave->get_max_message_size() -
1640 write_ledger_range_response_metadata_size;
1647 auto* work_handle =
new uv_work_t;
1653 job->from_idx = from_idx;
1654 job->to_idx = to_idx;
1655 job->max_size = max_entries_size;
1656 job->result_cb = [
this,
1657 from_idx_ = from_idx,
1660 purpose](
auto&& read_result,
int ) {
1666 std::forward<
decltype(read_result)>(read_result),
1670 work_handle->data = job;
std::pair< size_t, bool > write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:397
LedgerFile(const std::string &dir, const std::string &file_name_, bool from_existing_file_=false)
Definition ledger.h:215
LedgerFile(const fs::path &dir, size_t start_idx, bool recovery=false)
Definition ledger.h:178
bool truncate(size_t idx, bool remove_file_if_empty=true)
Definition ledger.h:541
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt)
Definition ledger.h:500
~LedgerFile()
Definition ledger.h:356
bool commit(size_t idx)
Definition ledger.h:673
bool is_committed() const
Definition ledger.h:380
void complete()
Definition ledger.h:597
bool is_recovery() const
Definition ledger.h:390
std::pair< size_t, size_t > entries_size(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt) const
Definition ledger.h:448
bool rename(const std::string &new_file_name)
Definition ledger.h:646
bool is_complete() const
Definition ledger.h:385
void open()
Definition ledger.h:665
size_t get_last_idx() const
Definition ledger.h:370
size_t get_current_size() const
Definition ledger.h:375
size_t get_start_idx() const
Definition ledger.h:365
std::optional< LedgerReadResult > read_entry(size_t idx)
Definition ledger.h:1324
Ledger(const fs::path &ledger_dir, ringbuffer::AbstractWriterFactory &writer_factory, size_t max_read_cache_files=ledger_max_read_cache_files_default, const std::vector< std::string > &read_ledger_dirs_={})
Definition ledger.h:1022
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1332
size_t write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:1343
void truncate(size_t idx)
Definition ledger.h:1434
static void on_ledger_get_async_complete(uv_work_t *req, int status)
Definition ledger.h:1545
void complete_recovery()
Definition ledger.h:1284
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition ledger.h:1582
static void on_ledger_get_async(uv_work_t *req)
Definition ledger.h:1537
Ledger(const Ledger &that)=delete
void write_ledger_get_range_response(size_t from_idx, size_t to_idx, std::optional< LedgerReadResult > &&read_result, ::consensus::LedgerRequestPurpose purpose)
Definition ledger.h:1555
void set_recovery_start_idx(size_t idx)
Definition ledger.h:1319
void init(size_t idx, size_t recovery_start_idx_=0)
Definition ledger.h:1216
size_t get_last_idx() const
Definition ledger.h:1314
bool is_in_committed_file(size_t idx) const
Definition ledger.h:1514
void commit(size_t idx)
Definition ledger.h:1475
Definition messaging.h:38
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::mutex Mutex
Definition locking.h:12
LedgerRequestPurpose
Definition ledger_enclave_types.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
std::vector< uint8_t > data
Definition ledger.h:143
size_t end_idx
Definition ledger.h:144
std::optional< LedgerReadResult > read_result
Definition ledger.h:1534
Ledger * ledger
Definition ledger.h:1522
size_t max_size
Definition ledger.h:1525
ResultCallback result_cb
Definition ledger.h:1531
size_t from_idx
Definition ledger.h:1523
size_t to_idx
Definition ledger.h:1524
std::function< void(std::optional< LedgerReadResult > &&, int)> ResultCallback
Definition ledger.h:1530
Definition time_bound_logger.h:14