CCF
Loading...
Searching...
No Matches
file_serving_handlers.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
8
9namespace ccf::node
10{
11 // Helper function to lookup redirect address based on the interface on this
12 // node which received the request. Will either return an address, or
13 // populate an appropriate error on the response context.
14 // Takes both CommandEndpointContext and ReadOnlyTx, so that it can be
15 // called be either read-only or read-write endpoints
16 static std::optional<std::string> get_redirect_address_for_node(
19 const ccf::NodeId& target_node)
20 {
21 auto* nodes = ro_tx.ro<ccf::Nodes>(ccf::Tables::NODES);
22
23 auto node_info = nodes->get(target_node);
24 if (!node_info.has_value())
25 {
26 LOG_FAIL_FMT("Node redirection error: Unknown node {}", target_node);
27 ctx.rpc_ctx->set_error(
28 HTTP_STATUS_INTERNAL_SERVER_ERROR,
29 ccf::errors::InternalError,
30 fmt::format(
31 "Cannot find node info to produce redirect response for node {}",
32 target_node));
33 return std::nullopt;
34 }
35
36 const auto interface_id = ctx.rpc_ctx->get_session_context()->interface_id;
37 if (!interface_id.has_value())
38 {
39 LOG_FAIL_FMT("Node redirection error: Non-RPC request");
40 ctx.rpc_ctx->set_error(
41 HTTP_STATUS_INTERNAL_SERVER_ERROR,
42 ccf::errors::InternalError,
43 "Cannot redirect non-RPC request");
44 return std::nullopt;
45 }
46
47 const auto& interfaces = node_info->rpc_interfaces;
48 const auto interface_it = interfaces.find(interface_id.value());
49 if (interface_it == interfaces.end())
50 {
52 "Node redirection error: Target missing interface {}",
53 interface_id.value());
54 ctx.rpc_ctx->set_error(
55 HTTP_STATUS_INTERNAL_SERVER_ERROR,
56 ccf::errors::InternalError,
57 fmt::format(
58 "Cannot redirect request. Received on RPC interface {}, which is "
59 "not present on target node {}",
60 interface_id.value(),
61 target_node));
62 return std::nullopt;
63 }
64
65 const auto& interface = interface_it->second;
66 return interface.published_address;
67 }
68
69 static void init_file_serving_handlers(
71 {
72 // Redirect to endpoint for a single specific snapshot
73 static constexpr auto snapshot_since_param_key = "since";
74
75 auto find_snapshot = [&](ccf::endpoints::ReadOnlyEndpointContext& ctx) {
76 size_t latest_idx = 0;
77 {
78 // Get latest_idx from query param, if present
79 const auto parsed_query =
80 http::parse_query(ctx.rpc_ctx->get_request_query());
81
82 std::string error_reason;
83 auto snapshot_since = http::get_query_value_opt<ccf::SeqNo>(
84 parsed_query, snapshot_since_param_key, error_reason);
85
86 if (snapshot_since.has_value())
87 {
88 if (!error_reason.empty())
89 {
90 ctx.rpc_ctx->set_error(
91 HTTP_STATUS_BAD_REQUEST,
92 ccf::errors::InvalidQueryParameterValue,
93 std::move(error_reason));
94 return;
95 }
96 latest_idx = snapshot_since.value();
97 }
98 }
99
100 auto node_operation = node_context.get_subsystem<AbstractNodeOperation>();
101 if (node_operation == nullptr)
102 {
103 ctx.rpc_ctx->set_error(
104 HTTP_STATUS_INTERNAL_SERVER_ERROR,
105 ccf::errors::InternalError,
106 "Unable to access NodeOperation subsystem");
107 return;
108 }
109
110 if (!node_operation->can_replicate())
111 {
112 // Try to redirect to primary for preferable snapshot, expected to
113 // match later /join request
114 auto primary_id = node_operation->get_primary();
115 if (primary_id.has_value())
116 {
117 const auto address =
118 get_redirect_address_for_node(ctx, ctx.tx, *primary_id);
119 if (!address.has_value())
120 {
121 return;
122 }
123
124 auto location =
125 fmt::format("https://{}/node/snapshot", address.value());
126 if (latest_idx != 0)
127 {
128 location +=
129 fmt::format("?{}={}", snapshot_since_param_key, latest_idx);
130 }
131
132 ctx.rpc_ctx->set_response_header(http::headers::LOCATION, location);
133 ctx.rpc_ctx->set_error(
134 HTTP_STATUS_PERMANENT_REDIRECT,
135 ccf::errors::NodeCannotHandleRequest,
136 "Node is not primary; redirecting for preferable snapshot");
137 return;
138 }
139
140 // If there is no current primary, fall-back to returning this
141 // node's best snapshot rather than terminating the fetch with an
142 // error
143 }
144
145 auto node_configuration_subsystem =
147 if (node_configuration_subsystem == nullptr)
148 {
149 ctx.rpc_ctx->set_error(
150 HTTP_STATUS_INTERNAL_SERVER_ERROR,
151 ccf::errors::InternalError,
152 "NodeConfigurationSubsystem is not available");
153 return;
154 }
155
156 const auto& snapshots_config =
157 node_configuration_subsystem->get().node_config.snapshots;
158
159 const auto orig_latest = latest_idx;
160 auto latest_committed_snapshot =
162 snapshots_config.directory, latest_idx);
163
164 if (!latest_committed_snapshot.has_value())
165 {
166 ctx.rpc_ctx->set_error(
167 HTTP_STATUS_NOT_FOUND,
168 ccf::errors::ResourceNotFound,
169 fmt::format(
170 "This node has no committed snapshots since {}", orig_latest));
171 return;
172 }
173
174 const auto& snapshot_path = latest_committed_snapshot.value();
175
176 const auto address =
177 get_redirect_address_for_node(ctx, ctx.tx, node_context.get_node_id());
178 if (!address.has_value())
179 {
180 return;
181 }
182
183 auto redirect_url = fmt::format(
184 "https://{}/node/snapshot/{}", address.value(), snapshot_path);
185 LOG_DEBUG_FMT("Redirecting to snapshot: {}", redirect_url);
186 ctx.rpc_ctx->set_response_header(
187 ccf::http::headers::LOCATION, redirect_url);
188 ctx.rpc_ctx->set_response_status(HTTP_STATUS_PERMANENT_REDIRECT);
189 };
190 registry
192 "/snapshot", HTTP_HEAD, find_snapshot, no_auth_required)
195 snapshot_since_param_key, ccf::endpoints::OptionalParameter)
196 .set_openapi_hidden(true)
198 .install();
199 registry
201 "/snapshot", HTTP_GET, find_snapshot, no_auth_required)
204 snapshot_since_param_key, ccf::endpoints::OptionalParameter)
205 .set_openapi_hidden(true)
207 .install();
208
209 auto get_snapshot = [&](ccf::endpoints::CommandEndpointContext& ctx) {
210 auto node_configuration_subsystem =
212 if (node_configuration_subsystem == nullptr)
213 {
214 ctx.rpc_ctx->set_error(
215 HTTP_STATUS_INTERNAL_SERVER_ERROR,
216 ccf::errors::InternalError,
217 "NodeConfigurationSubsystem is not available");
218 return;
219 }
220
221 const auto& snapshots_config =
222 node_configuration_subsystem->get().node_config.snapshots;
223
224 std::string snapshot_name;
225 std::string error;
227 ctx.rpc_ctx->get_request_path_params(),
228 "snapshot_name",
229 snapshot_name,
230 error))
231 {
232 ctx.rpc_ctx->set_error(
233 HTTP_STATUS_BAD_REQUEST,
234 ccf::errors::InvalidResourceName,
235 std::move(error));
236 return;
237 }
238
239 files::fs::path snapshot_path =
240 files::fs::path(snapshots_config.directory) / snapshot_name;
241
242 std::ifstream f(snapshot_path, std::ios::binary);
243 if (!f.good())
244 {
245 ctx.rpc_ctx->set_error(
246 HTTP_STATUS_NOT_FOUND,
247 ccf::errors::ResourceNotFound,
248 fmt::format(
249 "This node does not have a snapshot named {}", snapshot_name));
250 return;
251 }
252
253 LOG_DEBUG_FMT("Found snapshot: {}", snapshot_path.string());
254
255 f.seekg(0, std::ifstream::end);
256 const auto total_size = (size_t)f.tellg();
257
258 ctx.rpc_ctx->set_response_header("accept-ranges", "bytes");
259
260 ctx.rpc_ctx->set_response_header(
261 ccf::http::headers::CCF_SNAPSHOT_NAME, snapshot_name);
262
263 if (ctx.rpc_ctx->get_request_verb() == HTTP_HEAD)
264 {
265 ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK);
266 ctx.rpc_ctx->set_response_header(
267 ccf::http::headers::CONTENT_LENGTH, total_size);
268 return;
269 }
270
271 size_t range_start = 0;
272 size_t range_end = total_size;
273 {
274 const auto range_header = ctx.rpc_ctx->get_request_header("range");
275 if (range_header.has_value())
276 {
277 LOG_TRACE_FMT("Parsing range header {}", range_header.value());
278
279 auto [unit, ranges] = ccf::nonstd::split_1(range_header.value(), "=");
280 if (unit != "bytes")
281 {
282 ctx.rpc_ctx->set_error(
283 HTTP_STATUS_BAD_REQUEST,
284 ccf::errors::InvalidHeaderValue,
285 "Only 'bytes' is supported as a Range header unit");
286 return;
287 }
288
289 if (ranges.find(',') != std::string::npos)
290 {
291 ctx.rpc_ctx->set_error(
292 HTTP_STATUS_BAD_REQUEST,
293 ccf::errors::InvalidHeaderValue,
294 "Multiple ranges are not supported");
295 return;
296 }
297
298 const auto segments = ccf::nonstd::split(ranges, "-");
299 if (segments.size() != 2)
300 {
301 ctx.rpc_ctx->set_error(
302 HTTP_STATUS_BAD_REQUEST,
303 ccf::errors::InvalidHeaderValue,
304 fmt::format(
305 "Invalid format, cannot parse range in {}",
306 range_header.value()));
307 return;
308 }
309
310 const auto s_range_start = segments[0];
311 const auto s_range_end = segments[1];
312
313 if (!s_range_start.empty())
314 {
315 {
316 const auto [p, ec] = std::from_chars(
317 s_range_start.begin(), s_range_start.end(), range_start);
318 if (ec != std::errc())
319 {
320 ctx.rpc_ctx->set_error(
321 HTTP_STATUS_BAD_REQUEST,
322 ccf::errors::InvalidHeaderValue,
323 fmt::format(
324 "Unable to parse start of range value {} in {}",
325 s_range_start,
326 range_header.value()));
327 return;
328 }
329 }
330
331 if (range_start > total_size)
332 {
333 ctx.rpc_ctx->set_error(
334 HTTP_STATUS_BAD_REQUEST,
335 ccf::errors::InvalidHeaderValue,
336 fmt::format(
337 "Start of range {} is larger than total file size {}",
338 range_start,
339 total_size));
340 return;
341 }
342
343 if (!s_range_end.empty())
344 {
345 // Fully-specified range, like "X-Y"
346 {
347 const auto [p, ec] = std::from_chars(
348 s_range_end.begin(), s_range_end.end(), range_end);
349 if (ec != std::errc())
350 {
351 ctx.rpc_ctx->set_error(
352 HTTP_STATUS_BAD_REQUEST,
353 ccf::errors::InvalidHeaderValue,
354 fmt::format(
355 "Unable to parse end of range value {} in {}",
356 s_range_end,
357 range_header.value()));
358 return;
359 }
360 }
361
362 if (range_end > total_size)
363 {
365 "Requested snapshot range ending at {}, but file size is "
366 "only {} - shrinking range end",
367 range_end,
368 total_size);
369 range_end = total_size;
370 }
371
372 if (range_end < range_start)
373 {
374 ctx.rpc_ctx->set_error(
375 HTTP_STATUS_BAD_REQUEST,
376 ccf::errors::InvalidHeaderValue,
377 fmt::format(
378 "Invalid range: Start ({}) and end ({}) out of order",
379 range_start,
380 range_end));
381 return;
382 }
383 }
384 else
385 {
386 // Else this is an open-ended range like "X-"
387 range_end = total_size;
388 }
389 }
390 else
391 {
392 if (!s_range_end.empty())
393 {
394 // Negative range, like "-Y"
395 size_t offset = 0;
396 const auto [p, ec] =
397 std::from_chars(s_range_end.begin(), s_range_end.end(), offset);
398 if (ec != std::errc())
399 {
400 ctx.rpc_ctx->set_error(
401 HTTP_STATUS_BAD_REQUEST,
402 ccf::errors::InvalidHeaderValue,
403 fmt::format(
404 "Unable to parse end of range offset value {} in {}",
405 s_range_end,
406 range_header.value()));
407 return;
408 }
409
410 range_end = total_size;
411 range_start = range_end - offset;
412 }
413 else
414 {
415 ctx.rpc_ctx->set_error(
416 HTTP_STATUS_BAD_REQUEST,
417 ccf::errors::InvalidHeaderValue,
418 "Invalid range: Must contain range-start or range-end");
419 return;
420 }
421 }
422 }
423 }
424
425 const auto range_size = range_end - range_start;
426
428 "Reading {}-byte range from {} to {}",
429 range_size,
430 range_start,
431 range_end);
432
433 // Read requested range into buffer
434 std::vector<uint8_t> contents(range_size);
435 f.seekg(range_start);
436 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
437 f.read(reinterpret_cast<char*>(contents.data()), contents.size());
438 f.close();
439
440 // Build successful response
441 ctx.rpc_ctx->set_response_status(HTTP_STATUS_PARTIAL_CONTENT);
442 ctx.rpc_ctx->set_response_header(
443 ccf::http::headers::CONTENT_TYPE,
444 ccf::http::headervalues::contenttype::OCTET_STREAM);
445 ctx.rpc_ctx->set_response_body(std::move(contents));
446
447 // Partial Content responses describe the current response in
448 // Content-Range
449 ctx.rpc_ctx->set_response_header(
450 ccf::http::headers::CONTENT_RANGE,
451 fmt::format("bytes {}-{}/{}", range_start, range_end, total_size));
452 };
453 registry
455 "/snapshot/{snapshot_name}", HTTP_HEAD, get_snapshot, no_auth_required)
457 .set_openapi_hidden(true)
459 .install();
460 registry
462 "/snapshot/{snapshot_name}", HTTP_GET, get_snapshot, no_auth_required)
464 .set_openapi_hidden(true)
466 .install();
467 }
468}
Definition node_operation_interface.h:23
Definition base_endpoint_registry.h:121
Definition node_configuration_subsystem.h:13
virtual Endpoint make_command_endpoint(const std::string &method, RESTVerb verb, const CommandEndpointFunction &f, const AuthnPolicies &ap)
Definition endpoint_registry.cpp:254
virtual Endpoint make_read_only_endpoint(const std::string &method, RESTVerb verb, const ReadOnlyEndpointFunction &f, const AuthnPolicies &ap)
Definition endpoint_registry.cpp:235
Definition tx.h:159
M::ReadOnlyHandle * ro(M &m)
Definition tx.h:168
Definition map.h:30
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
@ OptionalParameter
Definition endpoint.h:123
bool get_path_param(const ccf::PathParams &params, const std::string &param_name, T &value, std::string &error)
Definition endpoint_registry.h:64
Definition file_serving_handlers.h:10
@ error
Definition tls_session.h:23
uint64_t SeqNo
Definition tx_id.h:36
std::optional< fs::path > find_latest_committed_snapshot_in_directory(const fs::path &directory, size_t &latest_committed_snapshot_idx)
Definition filenames.h:136
Definition node_context.h:12
std::shared_ptr< T > get_subsystem(const std::string &name) const
Definition node_context.h:37
virtual ccf::NodeId get_node_id() const
Definition node_context.h:65
Definition endpoint_context.h:24
std::shared_ptr< ccf::RpcContext > rpc_ctx
Definition endpoint_context.h:31
Endpoint & add_query_parameter(const std::string &param_name, QueryParamPresence presence=QueryParamPresence::RequiredParameter)
Definition endpoint.h:447
Endpoint & require_operator_feature(OperatorFeature feature)
Definition endpoint.cpp:16
Endpoint & set_openapi_hidden(bool hidden)
Definition endpoint.cpp:10
void install()
Definition endpoint.cpp:135
Endpoint & set_forwarding_required(ForwardingRequired fr)
Definition endpoint.cpp:74
Definition endpoint_context.h:70