CCF
Loading...
Searching...
No Matches
frontend.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/http_status.h"
7#include "ccf/node_context.h"
8#include "ccf/pal/locking.h"
9#include "ccf/rpc_exception.h"
16#include "enclave/rpc_handler.h"
17#include "forwarder.h"
18#include "http/http_jwt.h"
20#include "kv/store.h"
24
25#define FMT_HEADER_ONLY
26
27#include <fmt/format.h>
28#include <utility>
29#include <vector>
30
31namespace ccf
32{
34 {
35 protected:
39
40 private:
41 ccf::pal::Mutex open_lock;
42 bool is_open_ = false;
43
45 std::shared_ptr<AbstractForwarder> cmd_forwarder;
46 ccf::kv::TxHistory* history{nullptr};
47
48 size_t sig_tx_interval = 5000;
49 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
50 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
51
52 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
53 nullptr;
54
55 void update_consensus()
56 {
57 auto* c = tables.get_consensus().get();
58
59 if (consensus != c)
60 {
61 consensus = c;
63 }
64 }
65
66 void update_history()
67 {
68 history = tables.get_history().get();
69 endpoints.set_history(history);
70 }
71
73 std::shared_ptr<ccf::RpcContextImpl> ctx, ccf::kv::CommittableTx& tx)
74 {
75 const auto endpoint = endpoints.find_endpoint(tx, *ctx);
76 if (endpoint == nullptr)
77 {
78 // Every path from here should populate an appropriate response error
79 const auto allowed_verbs = endpoints.get_allowed_verbs(tx, *ctx);
80 if (allowed_verbs.empty())
81 {
82 ctx->set_error(
83 HTTP_STATUS_NOT_FOUND,
84 ccf::errors::ResourceNotFound,
85 fmt::format("Unknown path: {}.", ctx->get_method()));
86 }
87 else
88 {
89 std::vector<char const*> allowed_verb_strs;
90 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
91 for (auto verb : allowed_verbs)
92 {
93 allowed_verb_strs.push_back(verb.c_str());
94 }
95 const std::string allow_header_value =
96 fmt::format("{}", fmt::join(allowed_verb_strs, ", "));
97 // List allowed methods in 2 places:
98 // - ALLOW header for standards compliance + machine parsing
99 // - Body for visiblity + human readability (unless this was an
100 // OPTIONS request, which returns a 204 No Content)
101 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
102 if (ctx->get_request_verb() == HTTP_OPTIONS)
103 {
104 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
105 }
106 else
107 {
108 ctx->set_error(
109 HTTP_STATUS_METHOD_NOT_ALLOWED,
110 ccf::errors::UnsupportedHttpVerb,
111 fmt::format(
112 "Allowed methods for '{}' are: {}.",
113 ctx->get_method(),
114 allow_header_value));
115 }
116 }
117 }
118
119 return endpoint;
120 }
121
122 bool check_uri_allowed(
123 std::shared_ptr<ccf::RpcContextImpl> ctx,
124 const endpoints::EndpointDefinitionPtr& endpoint)
125 {
126 auto interface_id = ctx->get_session_context()->interface_id;
127 if ((consensus != nullptr) && interface_id)
128 {
129 if (!node_configuration_subsystem)
130 {
131 node_configuration_subsystem =
132 node_context.get_subsystem<NodeConfigurationSubsystem>();
133 if (!node_configuration_subsystem)
134 {
135 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
136 return false;
137 }
138 }
139
140 const auto& ncs = node_configuration_subsystem->get();
141
142 const auto& required_features = endpoint->required_operator_features;
143 if (!required_features.empty())
144 {
145 // Check that all required opt-in features are present on this
146 // interface's enabled features
147 const auto& interfaces = ncs.node_config.network.rpc_interfaces;
148 auto interface_it = interfaces.find(*interface_id);
149 if (interface_it == interfaces.end())
150 {
151 throw std::runtime_error(fmt::format(
152 "Could not find RPC interface named '{}' in startup config",
153 *interface_id));
154 }
155
156 const auto& enabled_features =
157 interface_it->second.enabled_operator_features;
158 for (const auto& required_feature : required_features)
159 {
160 if (
161 enabled_features.find(required_feature) == enabled_features.end())
162 {
164 "Incoming request {} requires opt-in feature {}, which is not "
165 "enabled on interface {} where this request was received - "
166 "returning error",
167 endpoint->full_uri_path,
168 required_feature,
169 *interface_id);
170 ctx->set_response_status(HTTP_STATUS_NOT_FOUND);
171 return false;
172 }
173 }
174 }
175
176 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
177
178 if (rit != ncs.rpc_interface_regexes.end())
179 {
180 bool ok = false;
181 for (const auto& re : rit->second)
182 {
183 std::smatch m;
184 if (std::regex_match(endpoint->full_uri_path, m, re))
185 {
186 ok = true;
187 break;
188 }
189 }
190 if (!ok)
191 {
192 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
193 return false;
194 }
195 }
196 else
197 {
198 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
199 if (
200 icfg.endorsement.has_value() &&
201 icfg.endorsement->authority == Authority::UNSECURED)
202 {
203 // Unsecured interfaces are opt-in only.
205 "Request for {} rejected because the interface is unsecured and "
206 "no accepted_endpoints have been configured.",
207 endpoint->full_uri_path);
208 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
209 return false;
210 }
211 }
212 }
213 else
214 {
215 // internal or forwarded: OK because they have been checked by the
216 // forwarder (forward() happens further down).
217 }
218
219 return true;
220 }
221
222 std::optional<std::string> resolve_redirect_location(
223 const RedirectionResolverConfig& resolver,
225 const ccf::ListenInterfaceID& incoming_interface)
226 {
227 switch (resolver.kind)
228 {
230 {
231 const auto role_it = resolver.target.find("role");
232 const bool seeking_primary =
233 role_it == resolver.target.end() || role_it.value() == "primary";
234 const bool seeking_backup =
235 !seeking_primary && role_it.value() == "backup";
236 if (!seeking_primary && !seeking_backup)
237 {
238 return std::nullopt;
239 }
240
241 const auto interface_it = resolver.target.find("interface");
242 const auto target_interface =
243 (interface_it == resolver.target.end()) ?
244 incoming_interface :
245 interface_it.value().get<std::string>();
246
247 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
248 target_node_its;
250 {
251 const auto primary_id = consensus->primary();
252 if (seeking_primary && primary_id.has_value())
253 {
254 target_node_its.push_back(nodes.find(primary_id.value()));
255 }
256 else if (seeking_backup)
257 {
258 for (auto it = nodes.begin(); it != nodes.end(); ++it)
259 {
260 if (it->first != primary_id)
261 {
262 target_node_its.push_back(it);
263 }
264 }
265 }
266 }
267 if (target_node_its.empty())
268 {
269 return std::nullopt;
270 }
271
272 const auto node_it =
273 target_node_its[random() % target_node_its.size()];
274 if (node_it != nodes.end())
275 {
276 const auto& interfaces = node_it->second.rpc_interfaces;
277
278 const auto target_interface_it = interfaces.find(target_interface);
279 if (target_interface_it != interfaces.end())
280 {
281 return target_interface_it->second.published_address;
282 }
283 }
284 else
285 {
286 return std::nullopt;
287 }
288 break;
289 }
290
292 {
293 return resolver.target["address"].get<std::string>();
294 break;
295 }
296 }
297
298 return std::nullopt;
299 }
300
301 bool check_redirect(
303 std::shared_ptr<ccf::RpcContextImpl> ctx,
304 const endpoints::EndpointDefinitionPtr& endpoint,
306 {
307 auto rs = endpoint->properties.redirection_strategy;
308
309 switch (rs)
310 {
312 {
313 return false;
314 }
315
317 {
318 const bool is_primary =
319 (consensus != nullptr) && consensus->can_replicate();
320
321 if (!is_primary)
322 {
323 auto resolver = redirections.to_primary;
324
325 const auto listen_interface =
326 ctx->get_session_context()->interface_id.value_or(
327 PRIMARY_RPC_INTERFACE);
328 const auto location =
329 resolve_redirect_location(resolver, tx, listen_interface);
330 if (location.has_value())
331 {
332 ctx->set_response_header(
333 http::headers::LOCATION,
334 fmt::format(
335 "https://{}{}", location.value(), ctx->get_request_url()));
336 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
337 return true;
338 }
339
340 // Should have redirected, but don't know how to. Return an error
341 ctx->set_error(
342 HTTP_STATUS_SERVICE_UNAVAILABLE,
343 ccf::errors::PrimaryNotFound,
344 "Request should be redirected to primary, but receiving node "
345 "does not know current primary address");
346 return true;
347 }
348 return false;
349 }
350
352 {
353 const bool is_backup =
354 (consensus != nullptr) && !consensus->can_replicate();
355
356 if (!is_backup)
357 {
358 auto resolver = redirections.to_backup;
359
360 const auto listen_interface =
361 ctx->get_session_context()->interface_id.value_or(
362 PRIMARY_RPC_INTERFACE);
363 const auto location =
364 resolve_redirect_location(resolver, tx, listen_interface);
365 if (location.has_value())
366 {
367 ctx->set_response_header(
368 http::headers::LOCATION,
369 fmt::format(
370 "https://{}{}", location.value(), ctx->get_request_url()));
371 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
372 return true;
373 }
374
375 // Should have redirected, but don't know how to. Return an error
376 ctx->set_error(
377 HTTP_STATUS_SERVICE_UNAVAILABLE,
378 ccf::errors::BackupNotFound,
379 "Request should be redirected to backup, but receiving node "
380 "does not know any current backup address");
381 return true;
382 }
383 return false;
384 }
385
386 default:
387 {
388 LOG_FAIL_FMT("Unhandled redirection strategy: {}", rs);
389 return false;
390 }
391 }
392 }
393
394 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
395 get_redirections_config(const ccf::ListenInterfaceID& incoming_interface)
396 {
397 if (!node_configuration_subsystem)
398 {
399 node_configuration_subsystem =
400 node_context.get_subsystem<NodeConfigurationSubsystem>();
401 if (!node_configuration_subsystem)
402 {
403 LOG_FAIL_FMT("Unable to access NodeConfigurationSubsystem");
404 return std::nullopt;
405 }
406 }
407
408 const auto& node_config_state = node_configuration_subsystem->get();
409 const auto& interfaces =
410 node_config_state.node_config.network.rpc_interfaces;
411 const auto interface_it = interfaces.find(incoming_interface);
412 if (interface_it == interfaces.end())
413 {
415 "Could not find startup config for interface {}", incoming_interface);
416 return std::nullopt;
417 }
418
419 return interface_it->second.redirections;
420 }
421
422 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
423 {
424 if (consensus != nullptr)
425 {
426 auto current_view = consensus->get_view();
427 auto session_ctx = ctx->get_session_context();
428 if (!session_ctx->active_view.has_value())
429 {
430 // First request on this session - assign the active term
431 session_ctx->active_view = current_view;
432 }
433 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
434 else if (current_view != *session_ctx->active_view)
435 {
436 auto msg = fmt::format(
437 "Potential loss of session consistency on session {}. Started "
438 "in view {}, now in view {}. Closing session.",
439 session_ctx->client_session_id,
440 *session_ctx // NOLINT(bugprone-unchecked-optional-access)
441 ->active_view,
442 current_view);
443 LOG_INFO_FMT("{}", msg);
444
445 ctx->set_error(
446 HTTP_STATUS_INTERNAL_SERVER_ERROR,
447 ccf::errors::SessionConsistencyLost,
448 std::move(msg));
449 ctx->terminate_session = true;
450 return false;
451 }
452 }
453
454 return true;
455 }
456
457 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
458 std::shared_ptr<ccf::RpcContextImpl> ctx,
460 const endpoints::EndpointDefinitionPtr& endpoint)
461 {
462 if (endpoint->authn_policies.empty())
463 {
464 return nullptr;
465 }
466
467 std::unique_ptr<AuthnIdentity> identity = nullptr;
468
469 std::string auth_error_reason;
470 std::vector<ODataAuthErrorDetails> error_details;
471 for (const auto& policy : endpoint->authn_policies)
472 {
473 identity = policy->authenticate(tx, ctx, auth_error_reason);
474 if (identity != nullptr)
475 {
476 break;
477 }
478 // Collate error details
479 error_details.emplace_back(ODataAuthErrorDetails{
480 policy->get_security_scheme_name(),
481 ccf::errors::InvalidAuthenticationInfo,
482 auth_error_reason});
483 }
484
485 if (identity == nullptr)
486 {
487 // If none were accepted, let the last set the response header
488 endpoint->authn_policies.back()->set_unauthenticated_error(
489 ctx, std::move(auth_error_reason));
490 // Return collated error details for the auth policies
491 // declared in the request
492 std::vector<nlohmann::json> json_details;
493 json_details.reserve(error_details.size());
494 for (auto& details : error_details)
495 {
496 json_details.emplace_back(details);
497 }
498 ctx->set_error(
499 HTTP_STATUS_UNAUTHORIZED,
500 ccf::errors::InvalidAuthenticationInfo,
501 "Invalid authentication credentials.",
502 json_details);
503 }
504
505 return identity;
506 }
507
508 [[nodiscard]] std::chrono::milliseconds get_forwarding_timeout(
509 std::shared_ptr<ccf::RpcContextImpl> ctx) const
510 {
511 auto r = std::chrono::milliseconds(3'000);
512
513 auto interface_id = ctx->get_session_context()->interface_id;
514 if (interface_id.has_value())
515 {
516 const auto& ncs = node_configuration_subsystem->get();
517 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
518 if (rit != ncs.node_config.network.rpc_interfaces.end())
519 {
520 if (rit->second.forwarding_timeout_ms.has_value())
521 {
522 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
523 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
524 }
525 }
526 }
527
528 return r;
529 }
530
531 void forward(
532 std::shared_ptr<ccf::RpcContextImpl> ctx,
533 ccf::kv::ReadOnlyTx& /*tx*/,
534 const endpoints::EndpointDefinitionPtr& /*endpoint*/)
535 {
536 // HTTP/2 does not support forwarding
537 if (ctx->get_http_version() == HttpVersion::HTTP2)
538 {
539 ctx->set_error(
540 HTTP_STATUS_NOT_IMPLEMENTED,
541 ccf::errors::NotImplemented,
542 "Request cannot be forwarded to primary on HTTP/2 interface.");
543
544 return;
545 }
546
547 if (!cmd_forwarder || (consensus == nullptr))
548 {
549 ctx->set_error(
550 HTTP_STATUS_INTERNAL_SERVER_ERROR,
551 ccf::errors::InternalError,
552 "No consensus or forwarder to forward request.");
553
554 return;
555 }
556
557 if (ctx->get_session_context()->is_forwarded)
558 {
559 // If the request was already forwarded, return an error to prevent
560 // daisy chains.
561 ctx->set_error(
562 HTTP_STATUS_SERVICE_UNAVAILABLE,
563 ccf::errors::RequestAlreadyForwarded,
564 "RPC was already forwarded.");
565
566 return;
567 }
568
569 // Before attempting to forward, make sure we're in the same View as we
570 // previously thought we were.
571 if (!check_session_consistency(ctx))
572 {
573 return;
574 }
575
576 auto primary_id = consensus->primary();
577 if (!primary_id.has_value())
578 {
579 ctx->set_error(
580 HTTP_STATUS_SERVICE_UNAVAILABLE,
581 ccf::errors::InternalError,
582 "RPC could not be forwarded to unknown primary.");
583
584 return;
585 }
586
587 if (!cmd_forwarder->forward_command(
588 ctx,
589 primary_id.value(),
590 ctx->get_session_context()->caller_cert,
591 get_forwarding_timeout(ctx)))
592 {
593 ctx->set_error(
594 HTTP_STATUS_SERVICE_UNAVAILABLE,
595 ccf::errors::InternalError,
596 "Unable to establish channel to forward to primary.");
597
598 return;
599 }
600
601 LOG_TRACE_FMT("RPC forwarded to primary {}", primary_id.value());
602
603 // Indicate that the RPC has been forwarded to primary
604 ctx->response_is_pending = true;
605
606 // Ensure future requests on this session are forwarded for session
607 // consistency
608 ctx->get_session_context()->is_forwarding = true;
609 }
610
611 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
612 {
613 size_t attempts = 0;
614 endpoints::EndpointDefinitionPtr endpoint = nullptr;
615
616 const auto start_time = std::chrono::high_resolution_clock::now();
617
618 process_command_inner(ctx, endpoint, attempts);
619
620 const auto end_time = std::chrono::high_resolution_clock::now();
621
622 if (endpoint != nullptr)
623 {
624 endpoints::RequestCompletedEvent rce;
625 rce.method = endpoint->dispatch.verb.c_str();
626 rce.dispatch_path = endpoint->dispatch.uri_path;
627 rce.status = ctx->get_response_status();
628 // Although enclave time returns a microsecond value, the actual
629 // precision/granularity depends on the host's TimeUpdater. By default
630 // this only advances each millisecond. Avoid implying more precision
631 // than that, by rounding to milliseconds
632 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
633 end_time - start_time);
634 rce.attempts = attempts;
635
637 }
638 else
639 {
640 endpoints::DispatchFailedEvent dfe;
641 dfe.method = ctx->get_method();
642 dfe.status = ctx->get_response_status();
643
645 }
646 }
647
648 void process_command_inner(
649 std::shared_ptr<ccf::RpcContextImpl> ctx,
651 size_t& attempts)
652 {
653 constexpr auto max_attempts = 30;
654 while (attempts < max_attempts)
655 {
656 if (consensus != nullptr)
657 {
658 if (
660 consensus->is_at_max_capacity())
661 {
662 ctx->set_error(
663 HTTP_STATUS_SERVICE_UNAVAILABLE,
664 ccf::errors::TooManyPendingTransactions,
665 "Too many transactions pending commit on the service.");
666 return;
667 }
668 }
669
670 std::unique_ptr<ccf::kv::CommittableTx> tx_p = tables.create_tx_ptr();
671 set_root_on_proposals(*ctx, *tx_p);
672
673 if (attempts > 0)
674 {
675 // If the endpoint has already been executed, the effects of its
676 // execution should be dropped
677 ctx->reset_response();
678 }
679
680 if (!is_open())
681 {
682 ctx->set_error(
683 HTTP_STATUS_NOT_FOUND,
684 ccf::errors::FrontendNotOpen,
685 "Frontend is not open.");
686 return;
687 }
688
689 ++attempts;
690 update_history();
691
692 endpoint = find_endpoint(ctx, *tx_p);
693 if (endpoint == nullptr)
694 {
695 return;
696 }
697
698 try
699 {
700 if (!check_uri_allowed(ctx, endpoint))
701 {
702 return;
703 }
704
705 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
706 redirections = std::nullopt;
707
708 // If there's no interface ID, this is already forwarded or otherwise
709 // special - don't try to redirect it
710 if (ctx->get_session_context()->interface_id.has_value())
711 {
712 redirections = get_redirections_config(
713 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
714 *ctx->get_session_context()->interface_id);
715 }
716
717 // If a redirections config was specified, then redirections are used
718 // and no forwarding is done
719 if (redirections.has_value())
720 {
721 if (check_redirect(*tx_p, ctx, endpoint, *redirections))
722 {
723 return;
724 }
725 }
726 else
727 {
728 bool is_primary =
729 (consensus == nullptr) || consensus->can_replicate();
730 const bool forwardable = (consensus != nullptr);
731
732 if (!is_primary && forwardable)
733 {
734 switch (endpoint->properties.forwarding_required)
735 {
737 {
738 break;
739 }
740
742 {
743 if (ctx->get_session_context()->is_forwarding)
744 {
745 forward(ctx, *tx_p, endpoint);
746 return;
747 }
748 break;
749 }
750
752 {
753 forward(ctx, *tx_p, endpoint);
754 return;
755 }
756 }
757 }
758 }
759
760 std::unique_ptr<AuthnIdentity> identity =
761 get_authenticated_identity(ctx, *tx_p, endpoint);
762
763 auto args = ccf::EndpointContextImpl(ctx, std::move(tx_p));
764 // NB: tx_p is no longer valid, and must be accessed from args, which
765 // may change it!
766
767 // If any auth policy was required, check that at least one is
768 // accepted
769 if (!endpoint->authn_policies.empty())
770 {
771 if (identity == nullptr)
772 {
773 return;
774 }
775 args.caller = std::move(identity);
776 }
777
778 endpoints.execute_endpoint(endpoint, args);
779
780 // If we've seen a View change, abandon this transaction as
781 // inconsistent
782 if (!check_session_consistency(ctx))
783 {
784 return;
785 }
786
787 if (!ctx->should_apply_writes())
788 {
789 return;
790 }
791
792 if (ctx->response_is_pending)
793 {
794 return;
795 }
796
797 if (args.owned_tx == nullptr)
798 {
800 "Bad endpoint: During execution of {} {}, returned a non-pending "
801 "response but stole ownership of Tx object",
802 ctx->get_request_verb().c_str(),
803 ctx->get_request_path());
804
805 ctx->clear_response_headers();
806 ctx->set_error(
807 HTTP_STATUS_INTERNAL_SERVER_ERROR,
808 ccf::errors::InternalError,
809 "Illegal endpoint implementation");
810 return;
811 }
812 // else args owns a valid Tx relating to a non-pending response, which
813 // should be applied
814 ccf::kv::CommittableTx& tx = *args.owned_tx;
815 ccf::kv::CommitResult result = tx.commit(ctx->claims);
816
817 switch (result)
818 {
820 {
821 auto tx_id = tx.get_txid();
822 if (tx_id.has_value() && consensus != nullptr)
823 {
824 try
825 {
826 // Only transactions that acquired one or more map handles
827 // have a TxID, while others (e.g. unauthenticated commands)
828 // don't. Also, only report a TxID if the consensus is set, as
829 // the consensus is required to verify that a TxID is valid.
831 endpoint, args, tx_id.value());
832 }
833 catch (const std::exception& e)
834 {
835 // run default handler to set transaction id in header
836 ctx->clear_response_headers();
838 args, tx_id.value());
839 ctx->set_error(
840 HTTP_STATUS_INTERNAL_SERVER_ERROR,
841 ccf::errors::InternalError,
842 fmt::format(
843 "Failed to execute local commit handler func: {}",
844 e.what()));
845 }
846 catch (...)
847 {
848 // run default handler to set transaction id in header
849 ctx->clear_response_headers();
851 args, tx_id.value());
852 ctx->set_error(
853 HTTP_STATUS_INTERNAL_SERVER_ERROR,
854 ccf::errors::InternalError,
855 "Failed to execute local commit handler func");
856 }
857 }
858
859 if (
860 consensus != nullptr && consensus->can_replicate() &&
861 history != nullptr)
862 {
863 history->try_emit_signature();
864 }
865
866 return;
867 }
868
870 {
871 break;
872 }
873
875 {
876 ctx->clear_response_headers();
877 ctx->set_error(
878 HTTP_STATUS_SERVICE_UNAVAILABLE,
879 ccf::errors::TransactionReplicationFailed,
880 "Transaction failed to replicate.");
881
882 return;
883 }
884 }
885 }
886 catch (const ccf::kv::CompactedVersionConflict& e)
887 {
888 // The executing transaction failed because of a conflicting
889 // compaction. Reset and retry
891 "Transaction execution conflicted with compaction: {}", e.what());
892 continue;
893 }
894 catch (RpcException& e)
895 {
896 ctx->clear_response_headers();
897 ctx->set_error(std::move(e.error));
898
899 return;
900 }
901 catch (const ccf::JsonParseError& e)
902 {
903 ctx->clear_response_headers();
904 ctx->set_error(
905 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.describe());
906
907 return;
908 }
909 catch (const nlohmann::json::exception& e)
910 {
911 ctx->clear_response_headers();
912 ctx->set_error(
913 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
914
915 return;
916 }
917 catch (const ccf::kv::KvSerialiserException& e)
918 {
919 // If serialising the committed transaction fails, there is no way
920 // to recover safely (https://github.com/microsoft/CCF/issues/338).
921 // Better to abort.
922 LOG_DEBUG_FMT("Failed to serialise: {}", e.what());
923 LOG_FATAL_FMT("Failed to serialise");
924 abort();
925 }
926 catch (const std::exception& e)
927 {
928 ctx->clear_response_headers();
929 ctx->set_error(
930 HTTP_STATUS_INTERNAL_SERVER_ERROR,
931 ccf::errors::InternalError,
932 e.what());
933
934 return;
935 }
936 } // end of while loop
937
938 ctx->clear_response_headers();
939 ctx->set_error(
940 HTTP_STATUS_SERVICE_UNAVAILABLE,
941 ccf::errors::TransactionCommitAttemptsExceedLimit,
942 fmt::format(
943 "Transaction continued to conflict after {} attempts. Retry "
944 "later.",
945 max_attempts));
946
947 static constexpr size_t retry_after_seconds = 3;
948 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
949 }
950
951 public:
953 ccf::kv::Store& tables_,
955 ccf::AbstractNodeContext& node_context_) :
956 tables(tables_),
957 endpoints(handlers_),
958 node_context(node_context_)
959 {}
960
962 size_t sig_tx_interval_, size_t sig_ms_interval_) override
963 {
964 sig_tx_interval = sig_tx_interval_;
965 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
966 ms_to_sig = sig_ms_interval;
967 }
968
970 std::shared_ptr<AbstractForwarder> cmd_forwarder_) override
971 {
972 cmd_forwarder = cmd_forwarder_;
973 }
974
975 void open() override
976 {
977 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
978 if (!is_open_)
979 {
980 LOG_INFO_FMT("Opening frontend");
981 is_open_ = true;
983 }
984 }
985
986 bool is_open() override
987 {
988 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
989 return is_open_;
990 }
991
994 {
996 {
997 update_history();
998 if (history != nullptr)
999 {
1000 // Warning: Retrieving the current TxID and root from the history
1001 // should only ever be used for the proposal creation endpoint and
1002 // nothing else. Many bad things could happen otherwise (e.g. breaking
1003 // session consistency).
1004 const auto& [txid, root, term_of_next_version] =
1006 tx.set_read_txid(txid, term_of_next_version);
1007 tx.set_root_at_read_version(root);
1008 }
1009 }
1010 }
1011
1020 void process(std::shared_ptr<ccf::RpcContextImpl> ctx) override
1021 {
1022 update_consensus();
1023
1024 // NB: If we want to re-execute on backups, the original command could
1025 // be propagated from here
1026 process_command(ctx);
1027 }
1028
1033 void process_forwarded(std::shared_ptr<ccf::RpcContextImpl> ctx) override
1034 {
1035 if (!ctx->get_session_context()->is_forwarded)
1036 {
1037 throw std::logic_error(
1038 "Processing forwarded command with unitialised forwarded context");
1039 }
1040
1041 update_consensus();
1042 process_command(ctx);
1043 if (ctx->response_is_pending)
1044 {
1045 // This should never be called when process_command is called with a
1046 // forwarded RPC context
1047 throw std::logic_error("Forwarded RPC cannot be forwarded");
1048 }
1049 }
1050
1051 void tick(std::chrono::milliseconds elapsed) override
1052 {
1053 update_consensus();
1054
1055 endpoints.tick(elapsed);
1056 }
1057 };
1058}
Definition forwarder.h:19
static std::map< NodeId, NodeInfo > get_trusted_nodes(ccf::kv::ReadOnlyTx &tx)
Definition internal_tables_access.h:438
Definition json.h:26
std::string describe() const
Definition json.h:41
Definition rpc_context_impl.h:21
Definition frontend.h:34
ccf::kv::Store & tables
Definition frontend.h:36
RpcFrontend(ccf::kv::Store &tables_, endpoints::EndpointRegistry &handlers_, ccf::AbstractNodeContext &node_context_)
Definition frontend.h:952
void set_root_on_proposals(const ccf::RpcContextImpl &ctx, ccf::kv::CommittableTx &tx)
Definition frontend.h:992
void set_cmd_forwarder(std::shared_ptr< AbstractForwarder > cmd_forwarder_) override
Definition frontend.h:969
bool is_open() override
Definition frontend.h:986
void process(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:1020
ccf::AbstractNodeContext & node_context
Definition frontend.h:38
void set_sig_intervals(size_t sig_tx_interval_, size_t sig_ms_interval_) override
Definition frontend.h:961
void process_forwarded(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:1033
void open() override
Definition frontend.h:975
endpoints::EndpointRegistry & endpoints
Definition frontend.h:37
void tick(std::chrono::milliseconds elapsed) override
Definition frontend.h:1051
Definition rpc_handler.h:24
Definition endpoint_registry.h:117
virtual void execute_endpoint_locally_committed(EndpointDefinitionPtr e, CommandEndpointContext &ctx, const TxID &tx_id)
Definition endpoint_registry.cpp:489
virtual void handle_event_request_completed(const ccf::endpoints::RequestCompletedEvent &event)
Definition endpoint_registry.h:266
virtual void tick(std::chrono::milliseconds duration)
Definition endpoint_registry.cpp:563
virtual void init_handlers()
Definition endpoint_registry.cpp:393
void set_consensus(ccf::kv::Consensus *c)
Definition endpoint_registry.cpp:565
virtual void execute_endpoint(EndpointDefinitionPtr e, EndpointContext &ctx)
Definition endpoint_registry.cpp:475
virtual EndpointDefinitionPtr find_endpoint(ccf::kv::Tx &tx, ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:395
virtual bool request_needs_root(const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:536
virtual void handle_event_dispatch_failed(const ccf::endpoints::DispatchFailedEvent &event)
Definition endpoint_registry.h:270
void set_history(ccf::kv::TxHistory *h)
Definition endpoint_registry.cpp:570
virtual bool apply_uncommitted_tx_backpressure() const
Definition endpoint_registry.h:274
virtual std::set< RESTVerb > get_allowed_verbs(ccf::kv::Tx &tx, const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:504
Definition committable_tx.h:19
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:333
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, std::function< void(const std::vector< uint8_t > &write_set, const std::string &commit_evidence)> write_set_observer=nullptr)
Definition committable_tx.h:136
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:343
std::optional< TxID > get_txid() const
Definition committable_tx.h:308
Definition compacted_version_conflict.h:10
char const * what() const
Definition compacted_version_conflict.h:17
Definition kv_types.h:364
Definition kv_types.h:315
const char * what() const noexcept override
Definition kv_types.h:322
Definition tx.h:159
Definition store.h:89
std::shared_ptr< TxHistory > get_history() override
Definition store.h:195
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1287
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:182
Definition kv_types.h:329
virtual void try_emit_signature()=0
virtual std::tuple< ccf::TxID, ccf::crypto::Sha256Hash, ccf::kv::Term > get_replicated_state_txid_and_root()=0
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FATAL_FMT
Definition internal_logger.h:17
#define LOG_FAIL_FMT
Definition internal_logger.h:16
void default_locally_committed_func(CommandEndpointContext &ctx, const TxID &tx_id)
Definition endpoint_registry.cpp:198
std::shared_ptr< const EndpointDefinition > EndpointDefinitionPtr
Definition endpoint.h:240
CommitResult
Definition kv_types.h:209
@ FAIL_NO_REPLICATE
Definition kv_types.h:212
@ SUCCESS
Definition kv_types.h:210
@ FAIL_CONFLICT
Definition kv_types.h:211
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition consensus_types.h:23
STL namespace.
Definition node_context.h:12
std::shared_ptr< T > get_subsystem(const std::string &name) const
Definition node_context.h:37
Definition endpoint_context_impl.h:13
Definition node_info_network.h:119
RedirectionResolverConfig to_primary
Definition node_info_network.h:120
RedirectionResolverConfig to_backup
Definition node_info_network.h:121