CCF
Loading...
Searching...
No Matches
ledger_chunker.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/kv/version.h"
7
8#include <cstdint>
9#include <limits>
10#include <map>
11#include <numeric>
12
13#define FMT_HEADER_ONLY
14#include <fmt/format.h>
15
16namespace ccf::kv
17{
19 {
20 protected:
21 const size_t chunk_threshold;
23
24 std::map<Version, size_t> transaction_sizes = {};
25 std::set<Version> chunk_ends = {};
26 std::set<Version> forced_chunk_versions = {};
27 std::mutex chunker_lock;
28
29 size_t get_unchunked_size(Version up_to) const
30 {
31 auto begin = transaction_sizes.cbegin();
32 auto end = transaction_sizes.upper_bound(up_to);
33
34 auto chunk_before = chunk_ends.lower_bound(up_to);
35 if (chunk_before != chunk_ends.begin())
36 {
37 std::advance(chunk_before, -1);
38 begin = transaction_sizes.upper_bound(*chunk_before);
39 }
40
41 return std::accumulate(
42 begin, end, 0, [](size_t n, const auto& p) { return n + p.second; });
43 }
44
45 public:
46 static constexpr size_t max_chunk_threshold_size =
47 std::numeric_limits<uint32_t>::max(); // 4GB
48
50 size_t threshold = max_chunk_threshold_size, Version start_from = 0) :
51 chunk_threshold(threshold),
52 current_tx_version(start_from)
53 {
54 if (threshold == 0 || threshold > max_chunk_threshold_size)
55 {
56 throw std::logic_error(fmt::format(
57 "Error: Ledger chunk threshold ({}) must be between 1-{}",
58 threshold,
60 }
61 }
62
63 void append_entry_size(size_t n) override
64 {
65 std::lock_guard<std::mutex> l(chunker_lock);
67 }
68
69 void force_end_of_chunk(Version v) override
70 {
71 std::lock_guard<std::mutex> l(chunker_lock);
72 forced_chunk_versions.insert(v);
73 }
74
76 {
77 std::lock_guard<std::mutex> l(chunker_lock);
78 if (!forced_chunk_versions.empty())
79 {
80 // There is an outstanding forced-chunk request for this if there is a
81 // forced-chunk request at f <= v, and no chunk_end >= f
82
83 // upper_bound > v
84 auto forced_it = forced_chunk_versions.upper_bound(v);
85 if (forced_it != forced_chunk_versions.begin())
86 {
87 // There is some f before forced_it, which is <= v
88 std::advance(forced_it, -1);
89 Version f = *forced_it;
90
91 const auto ender_it = chunk_ends.lower_bound(f);
92 if (ender_it == chunk_ends.end())
93 {
94 return true;
95 }
96 }
97 }
98
100 }
101
102 void rolled_back_to(Version v) override
103 {
104 std::lock_guard<std::mutex> l(chunker_lock);
106
107 transaction_sizes.erase(
108 transaction_sizes.upper_bound(v), transaction_sizes.end());
109 chunk_ends.erase(chunk_ends.upper_bound(v), chunk_ends.end());
111 forced_chunk_versions.upper_bound(v), forced_chunk_versions.end());
112 }
113
114 void compacted_to(Version v) override
115 {
116 std::lock_guard<std::mutex> l(chunker_lock);
117 Version compactable_v;
118
119 auto compactable_it = chunk_ends.lower_bound(v);
120 if (compactable_it != chunk_ends.begin())
121 {
122 std::advance(compactable_it, -1);
123 compactable_v = *compactable_it;
124 }
125 else
126 {
127 compactable_v = 0;
128 }
129
130 transaction_sizes.erase(
131 transaction_sizes.begin(),
132 transaction_sizes.upper_bound(compactable_v));
133 chunk_ends.erase(
134 chunk_ends.begin(), chunk_ends.upper_bound(compactable_v));
136 forced_chunk_versions.begin(),
137 forced_chunk_versions.upper_bound(compactable_v));
138 }
139
140 void produced_chunk_at(Version v) override
141 {
142 std::lock_guard<std::mutex> l(chunker_lock);
143 chunk_ends.insert(v);
144 }
145 };
146}
Definition app_interface.h:19
uint64_t Version
Definition version.h:8
Definition ledger_chunker_interface.h:13
Definition ledger_chunker.h:19
std::mutex chunker_lock
Definition ledger_chunker.h:27
void append_entry_size(size_t n) override
Definition ledger_chunker.h:63
void rolled_back_to(Version v) override
Definition ledger_chunker.h:102
LedgerChunker(size_t threshold=max_chunk_threshold_size, Version start_from=0)
Definition ledger_chunker.h:49
std::map< Version, size_t > transaction_sizes
Definition ledger_chunker.h:24
bool is_chunk_end_requested(Version v) override
Definition ledger_chunker.h:75
static constexpr size_t max_chunk_threshold_size
Definition ledger_chunker.h:46
size_t get_unchunked_size(Version up_to) const
Definition ledger_chunker.h:29
Version current_tx_version
Definition ledger_chunker.h:22
std::set< Version > chunk_ends
Definition ledger_chunker.h:25
std::set< Version > forced_chunk_versions
Definition ledger_chunker.h:26
void compacted_to(Version v) override
Definition ledger_chunker.h:114
const size_t chunk_threshold
Definition ledger_chunker.h:21
void force_end_of_chunk(Version v) override
Definition ledger_chunker.h:69
void produced_chunk_at(Version v) override
Definition ledger_chunker.h:140