42 bool is_open_ =
false;
45 std::shared_ptr<AbstractForwarder> cmd_forwarder;
48 size_t sig_tx_interval = 5000;
49 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
50 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
52 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
55 void update_consensus()
76 if (endpoint ==
nullptr)
80 if (allowed_verbs.empty())
83 HTTP_STATUS_NOT_FOUND,
84 ccf::errors::ResourceNotFound,
85 fmt::format(
"Unknown path: {}.", ctx->get_method()));
89 std::vector<char const*> allowed_verb_strs;
90 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
91 for (
auto verb : allowed_verbs)
93 allowed_verb_strs.push_back(verb.c_str());
95 const std::string allow_header_value =
96 fmt::format(
"{}", fmt::join(allowed_verb_strs,
", "));
101 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
102 if (ctx->get_request_verb() == HTTP_OPTIONS)
104 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
109 HTTP_STATUS_METHOD_NOT_ALLOWED,
110 ccf::errors::UnsupportedHttpVerb,
112 "Allowed methods for '{}' are: {}.",
114 allow_header_value));
122 bool check_uri_allowed(
123 std::shared_ptr<ccf::RpcContextImpl> ctx,
126 auto interface_id = ctx->get_session_context()->interface_id;
129 if (!node_configuration_subsystem)
131 node_configuration_subsystem =
133 if (!node_configuration_subsystem)
135 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
140 auto& ncs = node_configuration_subsystem->get();
141 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
143 if (rit != ncs.rpc_interface_regexes.end())
146 for (
const auto& re : rit->second)
149 if (std::regex_match(endpoint->full_uri_path, m, re))
157 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
163 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
168 "Request for {} rejected because the interface is unsecured and "
169 "no accepted_endpoints have been configured.",
170 endpoint->full_uri_path);
171 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
185 std::optional<std::string> resolve_redirect_location(
186 const RedirectionResolverConfig& resolver,
190 switch (resolver.kind)
194 const auto role_it = resolver.target.find(
"role");
195 const bool seeking_primary =
196 role_it == resolver.target.end() || role_it.value() ==
"primary";
197 const bool seeking_backup =
198 !seeking_primary && role_it.value() ==
"backup";
199 if (!seeking_primary && !seeking_backup)
204 const auto interface_it = resolver.target.find(
"interface");
205 const auto target_interface =
206 (interface_it == resolver.target.end()) ?
208 interface_it.value().get<
std::string>();
210 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
214 const auto primary_id =
consensus->primary();
215 if (seeking_primary && primary_id.has_value())
217 target_node_its.push_back(
nodes.find(primary_id.value()));
219 else if (seeking_backup)
221 for (
auto it =
nodes.begin(); it !=
nodes.end(); ++it)
223 if (it->first != primary_id)
225 target_node_its.push_back(it);
230 if (target_node_its.empty())
235 const auto node_it = target_node_its[rand() % target_node_its.size()];
236 if (node_it !=
nodes.end())
238 const auto& interfaces = node_it->second.rpc_interfaces;
240 const auto target_interface_it = interfaces.find(target_interface);
241 if (target_interface_it != interfaces.end())
243 return target_interface_it->second.published_address;
255 return resolver.target[
"address"].get<std::string>();
265 std::shared_ptr<ccf::RpcContextImpl> ctx,
269 auto rs = endpoint->properties.redirection_strategy;
280 const bool is_primary =
287 const auto listen_interface =
288 ctx->get_session_context()->interface_id.value_or(
289 PRIMARY_RPC_INTERFACE);
290 const auto location =
291 resolve_redirect_location(resolver, tx, listen_interface);
292 if (location.has_value())
294 ctx->set_response_header(
295 http::headers::LOCATION,
297 "https://{}{}", location.value(), ctx->get_request_url()));
298 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
304 HTTP_STATUS_SERVICE_UNAVAILABLE,
305 ccf::errors::PrimaryNotFound,
306 "Request should be redirected to primary, but receiving node "
307 "does not know current primary address");
315 const bool is_backup =
322 const auto listen_interface =
323 ctx->get_session_context()->interface_id.value_or(
324 PRIMARY_RPC_INTERFACE);
325 const auto location =
326 resolve_redirect_location(resolver, tx, listen_interface);
327 if (location.has_value())
329 ctx->set_response_header(
330 http::headers::LOCATION,
332 "https://{}{}", location.value(), ctx->get_request_url()));
333 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
339 HTTP_STATUS_SERVICE_UNAVAILABLE,
340 ccf::errors::BackupNotFound,
341 "Request should be redirected to backup, but receiving node "
342 "does not know any current backup address");
356 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
359 if (!node_configuration_subsystem)
361 node_configuration_subsystem =
363 if (!node_configuration_subsystem)
365 LOG_FAIL_FMT(
"Unable to access NodeConfigurationSubsystem");
370 const auto& node_config_state = node_configuration_subsystem->get();
371 const auto& interfaces =
372 node_config_state.node_config.network.rpc_interfaces;
373 const auto interface_it = interfaces.find(incoming_interface);
374 if (interface_it == interfaces.end())
377 "Could not find startup config for interface {}", incoming_interface);
381 return interface_it->second.redirections;
384 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
388 auto current_view =
consensus->get_view();
389 auto session_ctx = ctx->get_session_context();
390 if (!session_ctx->active_view.has_value())
393 session_ctx->active_view = current_view;
395 else if (current_view != session_ctx->active_view.value())
397 auto msg = fmt::format(
398 "Potential loss of session consistency on session {}. Started "
399 "in view {}, now in view {}. Closing session.",
400 session_ctx->client_session_id,
401 session_ctx->active_view.value(),
406 HTTP_STATUS_INTERNAL_SERVER_ERROR,
407 ccf::errors::SessionConsistencyLost,
409 ctx->terminate_session =
true;
417 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
418 std::shared_ptr<ccf::RpcContextImpl> ctx,
422 if (endpoint->authn_policies.empty())
427 std::unique_ptr<AuthnIdentity> identity =
nullptr;
429 std::string auth_error_reason;
430 std::vector<ODataAuthErrorDetails> error_details;
431 for (
const auto& policy : endpoint->authn_policies)
433 identity = policy->authenticate(tx, ctx, auth_error_reason);
434 if (identity !=
nullptr)
441 error_details.emplace_back(ODataAuthErrorDetails{
442 policy->get_security_scheme_name(),
443 ccf::errors::InvalidAuthenticationInfo,
448 if (identity ==
nullptr)
451 endpoint->authn_policies.back()->set_unauthenticated_error(
452 ctx, std::move(auth_error_reason));
455 std::vector<nlohmann::json> json_details;
456 for (
auto& details : error_details)
458 json_details.push_back(details);
461 HTTP_STATUS_UNAUTHORIZED,
462 ccf::errors::InvalidAuthenticationInfo,
463 "Invalid authentication credentials.",
464 std::move(json_details));
470 std::chrono::milliseconds get_forwarding_timeout(
471 std::shared_ptr<ccf::RpcContextImpl> ctx)
const
473 auto r = std::chrono::milliseconds(3'000);
475 auto interface_id = ctx->get_session_context()->interface_id;
476 if (interface_id.has_value())
478 auto& ncs = node_configuration_subsystem->get();
479 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
480 if (rit != ncs.node_config.network.rpc_interfaces.end())
482 if (rit->second.forwarding_timeout_ms.has_value())
484 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
493 std::shared_ptr<ccf::RpcContextImpl> ctx,
501 HTTP_STATUS_NOT_IMPLEMENTED,
502 ccf::errors::NotImplemented,
503 "Request cannot be forwarded to primary on HTTP/2 interface.");
511 HTTP_STATUS_INTERNAL_SERVER_ERROR,
512 ccf::errors::InternalError,
513 "No consensus or forwarder to forward request.");
518 if (ctx->get_session_context()->is_forwarded)
523 HTTP_STATUS_SERVICE_UNAVAILABLE,
524 ccf::errors::RequestAlreadyForwarded,
525 "RPC was already forwarded.");
532 if (!check_session_consistency(ctx))
538 if (!primary_id.has_value())
541 HTTP_STATUS_SERVICE_UNAVAILABLE,
542 ccf::errors::InternalError,
543 "RPC could not be forwarded to unknown primary.");
548 if (!cmd_forwarder->forward_command(
551 ctx->get_session_context()->caller_cert,
552 get_forwarding_timeout(ctx)))
555 HTTP_STATUS_SERVICE_UNAVAILABLE,
556 ccf::errors::InternalError,
557 "Unable to establish channel to forward to primary.");
562 LOG_TRACE_FMT(
"RPC forwarded to primary {}", primary_id.value());
565 ctx->response_is_pending =
true;
569 ctx->get_session_context()->is_forwarding =
true;
574 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
579 const auto start_time = std::chrono::high_resolution_clock::now();
581 process_command_inner(ctx, endpoint, attempts);
583 const auto end_time = std::chrono::high_resolution_clock::now();
585 if (endpoint !=
nullptr)
587 endpoints::RequestCompletedEvent rce;
588 rce.method = endpoint->dispatch.verb.c_str();
589 rce.dispatch_path = endpoint->dispatch.uri_path;
590 rce.status = ctx->get_response_status();
595 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
596 end_time - start_time);
597 rce.attempts = attempts;
603 endpoints::DispatchFailedEvent dfe;
604 dfe.method = ctx->get_method();
605 dfe.status = ctx->get_response_status();
611 void process_command_inner(
612 std::shared_ptr<ccf::RpcContextImpl> ctx,
616 constexpr auto max_attempts = 30;
617 while (attempts < max_attempts)
626 HTTP_STATUS_SERVICE_UNAVAILABLE,
627 ccf::errors::TooManyPendingTransactions,
628 "Too many transactions pending commit on the service.");
640 ctx->reset_response();
646 HTTP_STATUS_NOT_FOUND,
647 ccf::errors::FrontendNotOpen,
648 "Frontend is not open.");
655 endpoint = find_endpoint(ctx, *tx_p);
656 if (endpoint ==
nullptr)
663 if (!check_uri_allowed(ctx, endpoint))
668 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
669 redirections = std::nullopt;
673 if (ctx->get_session_context()->interface_id.has_value())
675 redirections = get_redirections_config(
676 ctx->get_session_context()->interface_id.value());
681 if (redirections.has_value())
683 if (check_redirect(*tx_p, ctx, endpoint, redirections.value()))
692 const bool forwardable = (
consensus !=
nullptr);
694 if (!is_primary && forwardable)
696 switch (endpoint->properties.forwarding_required)
705 if (ctx->get_session_context()->is_forwarding)
707 forward(ctx, *tx_p, endpoint);
715 forward(ctx, *tx_p, endpoint);
722 std::unique_ptr<AuthnIdentity> identity =
723 get_authenticated_identity(ctx, *tx_p, endpoint);
731 if (!endpoint->authn_policies.empty())
733 if (identity ==
nullptr)
739 args.caller = std::move(identity);
747 if (!check_session_consistency(ctx))
752 if (!ctx->should_apply_writes())
757 if (ctx->response_is_pending)
761 else if (args.owned_tx ==
nullptr)
764 "Bad endpoint: During execution of {} {}, returned a non-pending "
765 "response but stole ownership of Tx object",
766 ctx->get_request_verb().c_str(),
767 ctx->get_request_path());
769 ctx->clear_response_headers();
771 HTTP_STATUS_INTERNAL_SERVER_ERROR,
772 ccf::errors::InternalError,
773 "Illegal endpoint implementation");
786 if (tx_id.has_value() &&
consensus !=
nullptr)
795 endpoint, args, tx_id.value());
797 catch (
const std::exception& e)
800 ctx->clear_response_headers();
802 args, tx_id.value());
804 HTTP_STATUS_INTERNAL_SERVER_ERROR,
805 ccf::errors::InternalError,
807 "Failed to execute local commit handler func: {}",
813 ctx->clear_response_headers();
815 args, tx_id.value());
817 HTTP_STATUS_INTERNAL_SERVER_ERROR,
818 ccf::errors::InternalError,
819 "Failed to execute local commit handler func");
840 ctx->clear_response_headers();
842 HTTP_STATUS_SERVICE_UNAVAILABLE,
843 ccf::errors::TransactionReplicationFailed,
844 "Transaction failed to replicate.");
855 "Transaction execution conflicted with compaction: {}", e.
what());
858 catch (RpcException& e)
860 ctx->clear_response_headers();
861 ctx->set_error(std::move(e.error));
867 ctx->clear_response_headers();
869 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.
describe());
873 catch (
const nlohmann::json::exception& e)
875 ctx->clear_response_headers();
877 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
890 catch (
const std::exception& e)
892 ctx->clear_response_headers();
894 HTTP_STATUS_INTERNAL_SERVER_ERROR,
895 ccf::errors::InternalError,
902 ctx->clear_response_headers();
904 HTTP_STATUS_SERVICE_UNAVAILABLE,
905 ccf::errors::TransactionCommitAttemptsExceedLimit,
907 "Transaction continued to conflict after {} attempts. Retry "
911 static constexpr size_t retry_after_seconds = 3;
912 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
930 size_t sig_tx_interval_,
size_t sig_ms_interval_)
override
932 sig_tx_interval = sig_tx_interval_;
933 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
934 ms_to_sig = sig_ms_interval;
938 std::shared_ptr<AbstractForwarder> cmd_forwarder_)
override
940 cmd_forwarder = cmd_forwarder_;
945 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
956 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
972 const auto& [txid, root, term_of_next_version] =
988 void process(std::shared_ptr<ccf::RpcContextImpl> ctx)
override
994 process_command(ctx);
1003 if (!ctx->get_session_context()->is_forwarded)
1005 throw std::logic_error(
1006 "Processing forwarded command with unitialised forwarded context");
1010 process_command(ctx);
1011 if (ctx->response_is_pending)
1015 throw std::logic_error(
"Forwarded RPC cannot be forwarded");
1019 void tick(std::chrono::milliseconds elapsed)
override