CCF
Loading...
Searching...
No Matches
fetch.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
6#include "ccf/rest_verb.h"
8#include "http/curl.h"
9#include "http/http_builder.h"
10
11#include <charconv>
12#include <curl/curl.h>
13#include <llhttp/llhttp.h>
14#include <memory>
15#include <optional>
16#include <span>
17#include <stdexcept>
18#include <string>
19#include <vector>
20
21#define EXPECT_HTTP_RESPONSE_STATUS(request, status_code, expected) \
22 do \
23 { \
24 if (status_code != expected) \
25 { \
26 throw std::runtime_error(fmt::format( \
27 "Expected {} response from {} {}, instead received {}", \
28 ccf::http_status_str(expected), \
29 request->get_method().c_str(), \
30 request->get_url(), \
31 status_code)); \
32 } \
33 } while (0)
34
35namespace snapshots
36{
38 {
39 std::string snapshot_name;
40 std::vector<uint8_t> snapshot_data;
41 };
42
44 {
46 size_t range_end;
47 size_t total_size;
48 };
49
50 static ContentRangeHeader parse_content_range_header(
51 const ccf::curl::CurlRequest& request)
52 {
53 const auto& headers = request.get_response_headers();
54
55 auto it = headers.find(ccf::http::headers::CONTENT_RANGE);
56 if (it == headers.end())
57 {
58 throw std::runtime_error(
59 "Response is missing expected content-range header");
60 }
61
62 auto [unit, remaining] = ccf::nonstd::split_1(it->second, " ");
63 if (unit != "bytes")
64 {
65 throw std::runtime_error(
66 "Unexpected content-range unit. Only 'bytes' is supported");
67 }
68
69 auto [range, total_size] = ccf::nonstd::split_1(remaining, "/");
70 auto [range_start, range_end] = ccf::nonstd::split_1(range, "-");
71
72 if (range_start.empty() || range_end.empty() || total_size.empty())
73 {
74 throw std::runtime_error(fmt::format(
75 "Unsupported content-range header format. Expected 'bytes "
76 "<begin>-<end>/<total>', received: {}",
77 it->second));
78 }
79
80 ContentRangeHeader parsed_values{};
81
82 {
83 const auto [p, ec] = std::from_chars(
84 range_start.begin(), range_start.end(), parsed_values.range_start);
85 if (ec != std::errc())
86 {
87 throw std::runtime_error(fmt::format(
88 "Could not parse range start ({}) from content-range header: {}",
89 range_start,
90 it->second));
91 }
92 }
93
94 {
95 const auto [p, ec] = std::from_chars(
96 range_end.begin(), range_end.end(), parsed_values.range_end);
97 if (ec != std::errc())
98 {
99 throw std::runtime_error(fmt::format(
100 "Could not parse range end ({}) from content-range header: {}",
101 range_end,
102 it->second));
103 }
104 }
105
106 {
107 const auto [p, ec] = std::from_chars(
108 total_size.begin(), total_size.end(), parsed_values.total_size);
109 if (ec != std::errc())
110 {
111 throw std::runtime_error(fmt::format(
112 "Could not parse total size ({}) from content-range header: {}",
113 total_size,
114 it->second));
115 }
116 }
117
118 return parsed_values;
119 }
120
121 static std::optional<SnapshotResponse> try_fetch_from_peer(
122 const std::string& peer_address,
123 const std::string& path_to_peer_ca,
124 size_t max_size)
125 {
126 try
127 {
128 ccf::curl::UniqueCURL curl_easy;
129 curl_easy.set_opt(CURLOPT_CAINFO, path_to_peer_ca.c_str());
130
131 auto response_body = std::make_unique<ccf::curl::ResponseBody>(max_size);
132
133 // Get snapshot. This may be redirected multiple times, and we follow
134 // these redirects ourself so we can extract the final URL. Once the
135 // redirects terminate, the final response is likely to be extremely large
136 // so is fetched over multiple requests for a sub-range, returning
137 // PARTIAL_CONTENT each time.
138 std::string snapshot_url =
139 fmt::format("https://{}/node/snapshot", peer_address);
140
141 // Fetch 4MB chunks at a time
142 constexpr size_t range_size = 4L * 1024 * 1024;
143 size_t range_start = 0;
144 size_t range_end = range_size;
145 bool fetched_all = false;
146
147 auto process_partial_response =
148 [&](const ccf::curl::CurlRequest& request) {
149 auto content_range = parse_content_range_header(request);
150
151 if (content_range.range_start != range_start)
152 {
153 throw std::runtime_error(fmt::format(
154 "Unexpected range response. Requested bytes {}-{}, received "
155 "range starting at {}",
156 range_start,
157 range_end,
158 content_range.range_start));
159 }
160
161 // The server may give us _less_ than we requested (since they know
162 // where the file ends), but should never give us more
163 if (content_range.range_end > range_end)
164 {
165 throw std::runtime_error(fmt::format(
166 "Unexpected range response. Requested bytes {}-{}, received "
167 "range ending at {}",
168 range_start,
169 range_end,
170 content_range.range_end));
171 }
172
173 const auto range_size =
174 content_range.range_end - content_range.range_start;
176 "Received {}-byte chunk from {}. Now have {}/{}",
177 range_size,
178 request.get_url(),
179 content_range.range_end,
180 content_range.total_size);
181
182 if (content_range.range_end == content_range.total_size)
183 {
184 fetched_all = true;
185 }
186 else
187 {
188 // Advance range for next request
189 range_start = range_end;
190 range_end = range_start + range_size;
191 }
192 };
193
194 const auto max_redirects = 20;
195 for (auto redirect_count = 1; redirect_count <= max_redirects;
196 ++redirect_count)
197 {
199 "Making snapshot discovery request {}/{} to {}",
200 redirect_count,
201 max_redirects,
202 snapshot_url);
203
205 headers.append(
206 "Range", fmt::format("bytes={}-{}", range_start, range_end));
207
208 CURLcode curl_response = CURLE_FAILED_INIT;
209 long status_code = 0;
210 std::unique_ptr<ccf::curl::CurlRequest> request;
212 [&curl_response, &status_code, &request](
213 std::unique_ptr<ccf::curl::CurlRequest>&& request_,
214 CURLcode curl_response_,
215 long status_code_) {
216 curl_response = curl_response_;
217 status_code = status_code_;
218 request = std::move(request_);
219 };
220
222 std::make_unique<ccf::curl::CurlRequest>(
223 std::move(curl_easy),
224 HTTP_GET,
225 snapshot_url,
226 std::move(headers),
227 nullptr, // No request body
228 std::move(response_body),
229 std::move(response_callback)));
230
231 if (curl_response != CURLE_OK)
232 {
233 throw std::runtime_error(fmt::format(
234 "Error fetching snapshot redirect from {}: {} ({})",
235 request->get_url(),
236 curl_easy_strerror(curl_response),
237 status_code));
238 }
239
240 if (status_code == HTTP_STATUS_NOT_FOUND)
241 {
242 LOG_INFO_FMT("Peer has no suitable snapshot");
243 return std::nullopt;
244 }
245
246 if (status_code == HTTP_STATUS_PARTIAL_CONTENT)
247 {
248 process_partial_response(*request);
249
250 response_body = std::move(request->get_response_ptr());
251 curl_easy = std::move(request->get_easy_handle_ptr());
252 break;
253 }
254
256 request, status_code, HTTP_STATUS_PERMANENT_REDIRECT);
257
258 char* redirect_url = nullptr;
260 request->get_easy_handle(), CURLINFO_REDIRECT_URL, &redirect_url);
261 if (redirect_url == nullptr)
262 {
263 throw std::runtime_error(
264 "Redirect response found, but CURLINFO_REDIRECT_URL returned no "
265 "value");
266 }
267
269 "Snapshot fetch received redirect response with location {}",
270 redirect_url);
271 snapshot_url = redirect_url;
272
273 response_body = std::move(request->get_response_ptr());
274 curl_easy = std::move(request->get_easy_handle_ptr());
275
276 // Ignore any body from redirect responses
277 response_body->buffer.clear();
278 }
279
280 while (!fetched_all)
281 {
283 headers.append(
284 "Range", fmt::format("bytes={}-{}", range_start, range_end));
285
286 std::unique_ptr<ccf::curl::CurlRequest> snapshot_range_request;
287 CURLcode curl_response = CURLE_OK;
288 long snapshot_range_status_code = 0;
289
290 ccf::curl::CurlRequest::ResponseCallback snapshot_response_callback =
291 [&](
292 std::unique_ptr<ccf::curl::CurlRequest>&& request_,
293 CURLcode curl_response_,
294 long status_code_) {
295 snapshot_range_request = std::move(request_);
296 curl_response = curl_response_;
297 snapshot_range_status_code = status_code_;
298 };
299
301 std::make_unique<ccf::curl::CurlRequest>(
302 std::move(curl_easy),
303 HTTP_GET,
304 snapshot_url,
305 std::move(headers),
306 nullptr, // No request body
307 std::move(response_body),
308 snapshot_response_callback));
309 if (curl_response != CURLE_OK)
310 {
311 throw std::runtime_error(fmt::format(
312 "Error fetching snapshot chunk range from {}: {} ({})",
313 snapshot_range_request->get_url(),
314 curl_easy_strerror(curl_response),
315 snapshot_range_status_code));
316 }
318 snapshot_range_request,
319 snapshot_range_status_code,
320 HTTP_STATUS_PARTIAL_CONTENT);
321
322 process_partial_response(*snapshot_range_request);
323
324 response_body = std::move(snapshot_range_request->get_response_ptr());
325 curl_easy = std::move(snapshot_range_request->get_easy_handle_ptr());
326 }
327
328 const auto url_components = ccf::nonstd::split(snapshot_url, "/");
329 const std::string snapshot_name(url_components.back());
330
331 return SnapshotResponse{snapshot_name, std::move(response_body->buffer)};
332 }
333 catch (const std::exception& e)
334 {
335 LOG_FAIL_FMT("Error during snapshot fetch: {}", e.what());
336 return std::nullopt;
337 }
338 }
339
340 static std::optional<SnapshotResponse> fetch_from_peer(
341 const std::string& peer_address,
342 const std::string& path_to_peer_ca,
343 size_t max_attempts,
344 size_t retry_delay_ms,
345 size_t max_size)
346 {
347 for (size_t attempt = 0; attempt < max_attempts; ++attempt)
348 {
350 "Fetching snapshot from {} (attempt {}/{})",
351 peer_address,
352 attempt + 1,
353 max_attempts);
354
355 if (attempt > 0)
356 {
357 std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms));
358 }
359
360 auto response =
361 try_fetch_from_peer(peer_address, path_to_peer_ca, max_size);
362 if (response.has_value())
363 {
364 return response;
365 }
366 }
368 "Exceeded maximum snapshot fetch retries ({}), giving up", max_attempts);
369 return std::nullopt;
370 }
371}
Definition curl.h:374
static void synchronous_perform(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:490
const ResponseHeaders::HeaderMap & get_response_headers() const
Definition curl.h:539
std::string get_url() const
Definition curl.h:524
std::function< void(std::unique_ptr< CurlRequest > &&request, CURLcode curl_response_code, long status_code)> ResponseCallback
Definition curl.h:379
Definition curl.h:53
void set_opt(auto option, auto value)
Definition curl.h:107
Definition curl.h:149
void append(const char *str)
Definition curl.h:165
#define CHECK_CURL_EASY_GETINFO(handle, info, arg)
Definition curl.h:35
#define EXPECT_HTTP_RESPONSE_STATUS(request, status_code, expected)
Definition fetch.h:21
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition fetch.h:36
Definition fetch.h:44
size_t range_start
Definition fetch.h:45
size_t range_end
Definition fetch.h:46
size_t total_size
Definition fetch.h:47
Definition fetch.h:38
std::string snapshot_name
Definition fetch.h:39
std::vector< uint8_t > snapshot_data
Definition fetch.h:40