42 bool is_open_ =
false;
45 std::shared_ptr<AbstractForwarder> cmd_forwarder;
48 size_t sig_tx_interval = 5000;
49 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
50 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
52 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
55 void update_consensus()
76 if (endpoint ==
nullptr)
80 if (allowed_verbs.empty())
83 HTTP_STATUS_NOT_FOUND,
84 ccf::errors::ResourceNotFound,
85 fmt::format(
"Unknown path: {}.", ctx->get_method()));
89 std::vector<char const*> allowed_verb_strs;
90 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
91 for (
auto verb : allowed_verbs)
93 allowed_verb_strs.push_back(verb.c_str());
95 const std::string allow_header_value =
96 fmt::format(
"{}", fmt::join(allowed_verb_strs,
", "));
101 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
102 if (ctx->get_request_verb() == HTTP_OPTIONS)
104 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
109 HTTP_STATUS_METHOD_NOT_ALLOWED,
110 ccf::errors::UnsupportedHttpVerb,
112 "Allowed methods for '{}' are: {}.",
114 allow_header_value));
122 bool check_uri_allowed(
123 std::shared_ptr<ccf::RpcContextImpl> ctx,
126 auto interface_id = ctx->get_session_context()->interface_id;
127 if ((
consensus !=
nullptr) && interface_id)
129 if (!node_configuration_subsystem)
131 node_configuration_subsystem =
133 if (!node_configuration_subsystem)
135 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
140 const auto& ncs = node_configuration_subsystem->get();
142 const auto& required_features = endpoint->required_operator_features;
143 if (!required_features.empty())
147 const auto& interfaces = ncs.node_config.network.rpc_interfaces;
148 auto interface_it = interfaces.find(*interface_id);
149 if (interface_it == interfaces.end())
151 throw std::runtime_error(fmt::format(
152 "Could not find RPC interface named '{}' in startup config",
156 const auto& enabled_features =
157 interface_it->second.enabled_operator_features;
158 for (
const auto& required_feature : required_features)
161 enabled_features.find(required_feature) == enabled_features.end())
164 "Incoming request {} requires opt-in feature {}, which is not "
165 "enabled on interface {} where this request was received - "
167 endpoint->full_uri_path,
170 ctx->set_response_status(HTTP_STATUS_NOT_FOUND);
176 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
178 if (rit != ncs.rpc_interface_regexes.end())
181 for (
const auto& re : rit->second)
184 if (std::regex_match(endpoint->full_uri_path, m, re))
192 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
198 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
200 icfg.endorsement.has_value() &&
205 "Request for {} rejected because the interface is unsecured and "
206 "no accepted_endpoints have been configured.",
207 endpoint->full_uri_path);
208 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
222 std::optional<std::string> resolve_redirect_location(
223 const RedirectionResolverConfig& resolver,
227 switch (resolver.kind)
231 const auto role_it = resolver.target.find(
"role");
232 const bool seeking_primary =
233 role_it == resolver.target.end() || role_it.value() ==
"primary";
234 const bool seeking_backup =
235 !seeking_primary && role_it.value() ==
"backup";
236 if (!seeking_primary && !seeking_backup)
241 const auto interface_it = resolver.target.find(
"interface");
242 const auto target_interface =
243 (interface_it == resolver.target.end()) ?
245 interface_it.value().get<
std::string>();
247 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
251 const auto primary_id =
consensus->primary();
252 if (seeking_primary && primary_id.has_value())
254 target_node_its.push_back(
nodes.find(primary_id.value()));
256 else if (seeking_backup)
258 for (
auto it =
nodes.begin(); it !=
nodes.end(); ++it)
260 if (it->first != primary_id)
262 target_node_its.push_back(it);
267 if (target_node_its.empty())
273 target_node_its[random() % target_node_its.size()];
274 if (node_it !=
nodes.end())
276 const auto& interfaces = node_it->second.rpc_interfaces;
278 const auto target_interface_it = interfaces.find(target_interface);
279 if (target_interface_it != interfaces.end())
281 return target_interface_it->second.published_address;
293 return resolver.target[
"address"].get<std::string>();
303 std::shared_ptr<ccf::RpcContextImpl> ctx,
307 auto rs = endpoint->properties.redirection_strategy;
318 const bool is_primary =
325 const auto listen_interface =
326 ctx->get_session_context()->interface_id.value_or(
327 PRIMARY_RPC_INTERFACE);
328 const auto location =
329 resolve_redirect_location(resolver, tx, listen_interface);
330 if (location.has_value())
332 ctx->set_response_header(
333 http::headers::LOCATION,
335 "https://{}{}", location.value(), ctx->get_request_url()));
336 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
342 HTTP_STATUS_SERVICE_UNAVAILABLE,
343 ccf::errors::PrimaryNotFound,
344 "Request should be redirected to primary, but receiving node "
345 "does not know current primary address");
353 const bool is_backup =
360 const auto listen_interface =
361 ctx->get_session_context()->interface_id.value_or(
362 PRIMARY_RPC_INTERFACE);
363 const auto location =
364 resolve_redirect_location(resolver, tx, listen_interface);
365 if (location.has_value())
367 ctx->set_response_header(
368 http::headers::LOCATION,
370 "https://{}{}", location.value(), ctx->get_request_url()));
371 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
377 HTTP_STATUS_SERVICE_UNAVAILABLE,
378 ccf::errors::BackupNotFound,
379 "Request should be redirected to backup, but receiving node "
380 "does not know any current backup address");
394 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
397 if (!node_configuration_subsystem)
399 node_configuration_subsystem =
401 if (!node_configuration_subsystem)
403 LOG_FAIL_FMT(
"Unable to access NodeConfigurationSubsystem");
408 const auto& node_config_state = node_configuration_subsystem->get();
409 const auto& interfaces =
410 node_config_state.node_config.network.rpc_interfaces;
411 const auto interface_it = interfaces.find(incoming_interface);
412 if (interface_it == interfaces.end())
415 "Could not find startup config for interface {}", incoming_interface);
419 return interface_it->second.redirections;
422 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
426 auto current_view =
consensus->get_view();
427 auto session_ctx = ctx->get_session_context();
428 if (!session_ctx->active_view.has_value())
431 session_ctx->active_view = current_view;
434 else if (current_view != *session_ctx->active_view)
436 auto msg = fmt::format(
437 "Potential loss of session consistency on session {}. Started "
438 "in view {}, now in view {}. Closing session.",
439 session_ctx->client_session_id,
446 HTTP_STATUS_INTERNAL_SERVER_ERROR,
447 ccf::errors::SessionConsistencyLost,
449 ctx->terminate_session =
true;
457 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
458 std::shared_ptr<ccf::RpcContextImpl> ctx,
462 if (endpoint->authn_policies.empty())
467 std::unique_ptr<AuthnIdentity> identity =
nullptr;
469 std::string auth_error_reason;
470 std::vector<ODataAuthErrorDetails> error_details;
471 for (
const auto& policy : endpoint->authn_policies)
473 identity = policy->authenticate(tx, ctx, auth_error_reason);
474 if (identity !=
nullptr)
479 error_details.emplace_back(ODataAuthErrorDetails{
480 policy->get_security_scheme_name(),
481 ccf::errors::InvalidAuthenticationInfo,
485 if (identity ==
nullptr)
488 endpoint->authn_policies.back()->set_unauthenticated_error(
489 ctx, std::move(auth_error_reason));
492 std::vector<nlohmann::json> json_details;
493 json_details.reserve(error_details.size());
494 for (
auto& details : error_details)
496 json_details.emplace_back(details);
499 HTTP_STATUS_UNAUTHORIZED,
500 ccf::errors::InvalidAuthenticationInfo,
501 "Invalid authentication credentials.",
508 [[nodiscard]] std::chrono::milliseconds get_forwarding_timeout(
509 std::shared_ptr<ccf::RpcContextImpl> ctx)
const
511 auto r = std::chrono::milliseconds(3'000);
513 auto interface_id = ctx->get_session_context()->interface_id;
514 if (interface_id.has_value())
516 const auto& ncs = node_configuration_subsystem->get();
517 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
518 if (rit != ncs.node_config.network.rpc_interfaces.end())
520 if (rit->second.forwarding_timeout_ms.has_value())
523 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
532 std::shared_ptr<ccf::RpcContextImpl> ctx,
540 HTTP_STATUS_NOT_IMPLEMENTED,
541 ccf::errors::NotImplemented,
542 "Request cannot be forwarded to primary on HTTP/2 interface.");
547 if (!cmd_forwarder || (
consensus ==
nullptr))
550 HTTP_STATUS_INTERNAL_SERVER_ERROR,
551 ccf::errors::InternalError,
552 "No consensus or forwarder to forward request.");
557 if (ctx->get_session_context()->is_forwarded)
562 HTTP_STATUS_SERVICE_UNAVAILABLE,
563 ccf::errors::RequestAlreadyForwarded,
564 "RPC was already forwarded.");
571 if (!check_session_consistency(ctx))
577 if (!primary_id.has_value())
580 HTTP_STATUS_SERVICE_UNAVAILABLE,
581 ccf::errors::InternalError,
582 "RPC could not be forwarded to unknown primary.");
587 if (!cmd_forwarder->forward_command(
590 ctx->get_session_context()->caller_cert,
591 get_forwarding_timeout(ctx)))
594 HTTP_STATUS_SERVICE_UNAVAILABLE,
595 ccf::errors::InternalError,
596 "Unable to establish channel to forward to primary.");
601 LOG_TRACE_FMT(
"RPC forwarded to primary {}", primary_id.value());
604 ctx->response_is_pending =
true;
608 ctx->get_session_context()->is_forwarding =
true;
611 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
616 const auto start_time = std::chrono::high_resolution_clock::now();
618 process_command_inner(ctx, endpoint, attempts);
620 const auto end_time = std::chrono::high_resolution_clock::now();
622 if (endpoint !=
nullptr)
624 endpoints::RequestCompletedEvent rce;
625 rce.method = endpoint->dispatch.verb.c_str();
626 rce.dispatch_path = endpoint->dispatch.uri_path;
627 rce.status = ctx->get_response_status();
632 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
633 end_time - start_time);
634 rce.attempts = attempts;
640 endpoints::DispatchFailedEvent dfe;
641 dfe.method = ctx->get_method();
642 dfe.status = ctx->get_response_status();
648 void process_command_inner(
649 std::shared_ptr<ccf::RpcContextImpl> ctx,
653 constexpr auto max_attempts = 30;
654 while (attempts < max_attempts)
663 HTTP_STATUS_SERVICE_UNAVAILABLE,
664 ccf::errors::TooManyPendingTransactions,
665 "Too many transactions pending commit on the service.");
677 ctx->reset_response();
683 HTTP_STATUS_NOT_FOUND,
684 ccf::errors::FrontendNotOpen,
685 "Frontend is not open.");
692 endpoint = find_endpoint(ctx, *tx_p);
693 if (endpoint ==
nullptr)
700 if (!check_uri_allowed(ctx, endpoint))
705 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
706 redirections = std::nullopt;
710 if (ctx->get_session_context()->interface_id.has_value())
712 redirections = get_redirections_config(
714 *ctx->get_session_context()->interface_id);
719 if (redirections.has_value())
721 if (check_redirect(*tx_p, ctx, endpoint, *redirections))
730 const bool forwardable = (
consensus !=
nullptr);
732 if (!is_primary && forwardable)
734 switch (endpoint->properties.forwarding_required)
743 if (ctx->get_session_context()->is_forwarding)
745 forward(ctx, *tx_p, endpoint);
753 forward(ctx, *tx_p, endpoint);
760 std::unique_ptr<AuthnIdentity> identity =
761 get_authenticated_identity(ctx, *tx_p, endpoint);
769 if (!endpoint->authn_policies.empty())
771 if (identity ==
nullptr)
775 args.caller = std::move(identity);
782 if (!check_session_consistency(ctx))
787 if (!ctx->should_apply_writes())
792 if (ctx->response_is_pending)
797 if (args.owned_tx ==
nullptr)
800 "Bad endpoint: During execution of {} {}, returned a non-pending "
801 "response but stole ownership of Tx object",
802 ctx->get_request_verb().c_str(),
803 ctx->get_request_path());
805 ctx->clear_response_headers();
807 HTTP_STATUS_INTERNAL_SERVER_ERROR,
808 ccf::errors::InternalError,
809 "Illegal endpoint implementation");
822 if (tx_id.has_value() &&
consensus !=
nullptr)
831 endpoint, args, tx_id.value());
833 catch (
const std::exception& e)
836 ctx->clear_response_headers();
838 args, tx_id.value());
840 HTTP_STATUS_INTERNAL_SERVER_ERROR,
841 ccf::errors::InternalError,
843 "Failed to execute local commit handler func: {}",
849 ctx->clear_response_headers();
851 args, tx_id.value());
853 HTTP_STATUS_INTERNAL_SERVER_ERROR,
854 ccf::errors::InternalError,
855 "Failed to execute local commit handler func");
876 ctx->clear_response_headers();
878 HTTP_STATUS_SERVICE_UNAVAILABLE,
879 ccf::errors::TransactionReplicationFailed,
880 "Transaction failed to replicate.");
891 "Transaction execution conflicted with compaction: {}", e.
what());
894 catch (RpcException& e)
896 ctx->clear_response_headers();
897 ctx->set_error(std::move(e.error));
903 ctx->clear_response_headers();
905 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.
describe());
909 catch (
const nlohmann::json::exception& e)
911 ctx->clear_response_headers();
913 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
926 catch (
const std::exception& e)
928 ctx->clear_response_headers();
930 HTTP_STATUS_INTERNAL_SERVER_ERROR,
931 ccf::errors::InternalError,
938 ctx->clear_response_headers();
940 HTTP_STATUS_SERVICE_UNAVAILABLE,
941 ccf::errors::TransactionCommitAttemptsExceedLimit,
943 "Transaction continued to conflict after {} attempts. Retry "
947 static constexpr size_t retry_after_seconds = 3;
948 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
962 size_t sig_tx_interval_,
size_t sig_ms_interval_)
override
964 sig_tx_interval = sig_tx_interval_;
965 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
966 ms_to_sig = sig_ms_interval;
970 std::shared_ptr<AbstractForwarder> cmd_forwarder_)
override
972 cmd_forwarder = cmd_forwarder_;
977 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
988 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
998 if (history !=
nullptr)
1004 const auto& [txid, root, term_of_next_version] =
1020 void process(std::shared_ptr<ccf::RpcContextImpl> ctx)
override
1026 process_command(ctx);
1035 if (!ctx->get_session_context()->is_forwarded)
1037 throw std::logic_error(
1038 "Processing forwarded command with unitialised forwarded context");
1042 process_command(ctx);
1043 if (ctx->response_is_pending)
1047 throw std::logic_error(
"Forwarded RPC cannot be forwarded");
1051 void tick(std::chrono::milliseconds elapsed)
override