CCF
Loading...
Searching...
No Matches
frontend.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/http_status.h"
7#include "ccf/node_context.h"
8#include "ccf/pal/locking.h"
9#include "ccf/rpc_exception.h"
16#include "enclave/rpc_handler.h"
17#include "forwarder.h"
18#include "http/http_jwt.h"
20#include "kv/store.h"
24
25#define FMT_HEADER_ONLY
26
27#include <fmt/format.h>
28#include <utility>
29#include <vector>
30
31namespace ccf
32{
34 {
35 protected:
39
40 private:
41 ccf::pal::Mutex open_lock;
42 bool is_open_ = false;
43
45 std::shared_ptr<AbstractForwarder> cmd_forwarder;
46 ccf::kv::TxHistory* history;
47
48 size_t sig_tx_interval = 5000;
49 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
50 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
51
52 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
53 nullptr;
54
55 void update_consensus()
56 {
57 auto c = tables.get_consensus().get();
58
59 if (consensus != c)
60 {
61 consensus = c;
63 }
64 }
65
66 void update_history()
67 {
68 history = tables.get_history().get();
69 endpoints.set_history(history);
70 }
71
73 std::shared_ptr<ccf::RpcContextImpl> ctx, ccf::kv::CommittableTx& tx)
74 {
75 const auto endpoint = endpoints.find_endpoint(tx, *ctx);
76 if (endpoint == nullptr)
77 {
78 // Every path from here should populate an appropriate response error
79 const auto allowed_verbs = endpoints.get_allowed_verbs(tx, *ctx);
80 if (allowed_verbs.empty())
81 {
82 ctx->set_error(
83 HTTP_STATUS_NOT_FOUND,
84 ccf::errors::ResourceNotFound,
85 fmt::format("Unknown path: {}.", ctx->get_method()));
86 }
87 else
88 {
89 std::vector<char const*> allowed_verb_strs;
90 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
91 for (auto verb : allowed_verbs)
92 {
93 allowed_verb_strs.push_back(verb.c_str());
94 }
95 const std::string allow_header_value =
96 fmt::format("{}", fmt::join(allowed_verb_strs, ", "));
97 // List allowed methods in 2 places:
98 // - ALLOW header for standards compliance + machine parsing
99 // - Body for visiblity + human readability (unless this was an
100 // OPTIONS request, which returns a 204 No Content)
101 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
102 if (ctx->get_request_verb() == HTTP_OPTIONS)
103 {
104 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
105 }
106 else
107 {
108 ctx->set_error(
109 HTTP_STATUS_METHOD_NOT_ALLOWED,
110 ccf::errors::UnsupportedHttpVerb,
111 fmt::format(
112 "Allowed methods for '{}' are: {}.",
113 ctx->get_method(),
114 allow_header_value));
115 }
116 }
117 }
118
119 return endpoint;
120 }
121
122 bool check_uri_allowed(
123 std::shared_ptr<ccf::RpcContextImpl> ctx,
124 const endpoints::EndpointDefinitionPtr& endpoint)
125 {
126 auto interface_id = ctx->get_session_context()->interface_id;
127 if (consensus && interface_id)
128 {
129 if (!node_configuration_subsystem)
130 {
131 node_configuration_subsystem =
132 node_context.get_subsystem<NodeConfigurationSubsystem>();
133 if (!node_configuration_subsystem)
134 {
135 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
136 return false;
137 }
138 }
139
140 auto& ncs = node_configuration_subsystem->get();
141 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
142
143 if (rit != ncs.rpc_interface_regexes.end())
144 {
145 bool ok = false;
146 for (const auto& re : rit->second)
147 {
148 std::smatch m;
149 if (std::regex_match(endpoint->full_uri_path, m, re))
150 {
151 ok = true;
152 break;
153 }
154 }
155 if (!ok)
156 {
157 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
158 return false;
159 }
160 }
161 else
162 {
163 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
164 if (icfg.endorsement->authority == Authority::UNSECURED)
165 {
166 // Unsecured interfaces are opt-in only.
168 "Request for {} rejected because the interface is unsecured and "
169 "no accepted_endpoints have been configured.",
170 endpoint->full_uri_path);
171 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
172 return false;
173 }
174 }
175 }
176 else
177 {
178 // internal or forwarded: OK because they have been checked by the
179 // forwarder (forward() happens further down).
180 }
181
182 return true;
183 }
184
185 std::optional<std::string> resolve_redirect_location(
186 const RedirectionResolverConfig& resolver,
188 const ccf::ListenInterfaceID& incoming_interface)
189 {
190 switch (resolver.kind)
191 {
193 {
194 const auto role_it = resolver.target.find("role");
195 const bool seeking_primary =
196 role_it == resolver.target.end() || role_it.value() == "primary";
197 const bool seeking_backup =
198 !seeking_primary && role_it.value() == "backup";
199 if (!seeking_primary && !seeking_backup)
200 {
201 return std::nullopt;
202 }
203
204 const auto interface_it = resolver.target.find("interface");
205 const auto target_interface =
206 (interface_it == resolver.target.end()) ?
207 incoming_interface :
208 interface_it.value().get<std::string>();
209
210 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
211 target_node_its;
213 {
214 const auto primary_id = consensus->primary();
215 if (seeking_primary && primary_id.has_value())
216 {
217 target_node_its.push_back(nodes.find(primary_id.value()));
218 }
219 else if (seeking_backup)
220 {
221 for (auto it = nodes.begin(); it != nodes.end(); ++it)
222 {
223 if (it->first != primary_id)
224 {
225 target_node_its.push_back(it);
226 }
227 }
228 }
229 }
230 if (target_node_its.empty())
231 {
232 return std::nullopt;
233 }
234
235 const auto node_it = target_node_its[rand() % target_node_its.size()];
236 if (node_it != nodes.end())
237 {
238 const auto& interfaces = node_it->second.rpc_interfaces;
239
240 const auto target_interface_it = interfaces.find(target_interface);
241 if (target_interface_it != interfaces.end())
242 {
243 return target_interface_it->second.published_address;
244 }
245 }
246 else
247 {
248 return std::nullopt;
249 }
250 break;
251 }
252
254 {
255 return resolver.target["address"].get<std::string>();
256 break;
257 }
258 }
259
260 return std::nullopt;
261 }
262
263 bool check_redirect(
265 std::shared_ptr<ccf::RpcContextImpl> ctx,
266 const endpoints::EndpointDefinitionPtr& endpoint,
268 {
269 auto rs = endpoint->properties.redirection_strategy;
270
271 switch (rs)
272 {
274 {
275 return false;
276 }
277
279 {
280 const bool is_primary =
281 (consensus != nullptr) && consensus->can_replicate();
282
283 if (!is_primary)
284 {
285 auto resolver = redirections.to_primary;
286
287 const auto listen_interface =
288 ctx->get_session_context()->interface_id.value_or(
289 PRIMARY_RPC_INTERFACE);
290 const auto location =
291 resolve_redirect_location(resolver, tx, listen_interface);
292 if (location.has_value())
293 {
294 ctx->set_response_header(
295 http::headers::LOCATION,
296 fmt::format(
297 "https://{}{}", location.value(), ctx->get_request_url()));
298 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
299 return true;
300 }
301
302 // Should have redirected, but don't know how to. Return an error
303 ctx->set_error(
304 HTTP_STATUS_SERVICE_UNAVAILABLE,
305 ccf::errors::PrimaryNotFound,
306 "Request should be redirected to primary, but receiving node "
307 "does not know current primary address");
308 return true;
309 }
310 return false;
311 }
312
314 {
315 const bool is_backup =
316 (consensus != nullptr) && !consensus->can_replicate();
317
318 if (!is_backup)
319 {
320 auto resolver = redirections.to_backup;
321
322 const auto listen_interface =
323 ctx->get_session_context()->interface_id.value_or(
324 PRIMARY_RPC_INTERFACE);
325 const auto location =
326 resolve_redirect_location(resolver, tx, listen_interface);
327 if (location.has_value())
328 {
329 ctx->set_response_header(
330 http::headers::LOCATION,
331 fmt::format(
332 "https://{}{}", location.value(), ctx->get_request_url()));
333 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
334 return true;
335 }
336
337 // Should have redirected, but don't know how to. Return an error
338 ctx->set_error(
339 HTTP_STATUS_SERVICE_UNAVAILABLE,
340 ccf::errors::BackupNotFound,
341 "Request should be redirected to backup, but receiving node "
342 "does not know any current backup address");
343 return true;
344 }
345 return false;
346 }
347
348 default:
349 {
350 LOG_FAIL_FMT("Unhandled redirection strategy: {}", rs);
351 return false;
352 }
353 }
354 }
355
356 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
357 get_redirections_config(const ccf::ListenInterfaceID& incoming_interface)
358 {
359 if (!node_configuration_subsystem)
360 {
361 node_configuration_subsystem =
362 node_context.get_subsystem<NodeConfigurationSubsystem>();
363 if (!node_configuration_subsystem)
364 {
365 LOG_FAIL_FMT("Unable to access NodeConfigurationSubsystem");
366 return std::nullopt;
367 }
368 }
369
370 const auto& node_config_state = node_configuration_subsystem->get();
371 const auto& interfaces =
372 node_config_state.node_config.network.rpc_interfaces;
373 const auto interface_it = interfaces.find(incoming_interface);
374 if (interface_it == interfaces.end())
375 {
377 "Could not find startup config for interface {}", incoming_interface);
378 return std::nullopt;
379 }
380
381 return interface_it->second.redirections;
382 }
383
384 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
385 {
386 if (consensus != nullptr)
387 {
388 auto current_view = consensus->get_view();
389 auto session_ctx = ctx->get_session_context();
390 if (!session_ctx->active_view.has_value())
391 {
392 // First request on this session - assign the active term
393 session_ctx->active_view = current_view;
394 }
395 else if (current_view != session_ctx->active_view.value())
396 {
397 auto msg = fmt::format(
398 "Potential loss of session consistency on session {}. Started "
399 "in view {}, now in view {}. Closing session.",
400 session_ctx->client_session_id,
401 session_ctx->active_view.value(),
402 current_view);
403 LOG_INFO_FMT("{}", msg);
404
405 ctx->set_error(
406 HTTP_STATUS_INTERNAL_SERVER_ERROR,
407 ccf::errors::SessionConsistencyLost,
408 std::move(msg));
409 ctx->terminate_session = true;
410 return false;
411 }
412 }
413
414 return true;
415 }
416
417 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
418 std::shared_ptr<ccf::RpcContextImpl> ctx,
420 const endpoints::EndpointDefinitionPtr& endpoint)
421 {
422 if (endpoint->authn_policies.empty())
423 {
424 return nullptr;
425 }
426
427 std::unique_ptr<AuthnIdentity> identity = nullptr;
428
429 std::string auth_error_reason;
430 std::vector<ODataAuthErrorDetails> error_details;
431 for (const auto& policy : endpoint->authn_policies)
432 {
433 identity = policy->authenticate(tx, ctx, auth_error_reason);
434 if (identity != nullptr)
435 {
436 break;
437 }
438 else
439 {
440 // Collate error details
441 error_details.emplace_back(ODataAuthErrorDetails{
442 policy->get_security_scheme_name(),
443 ccf::errors::InvalidAuthenticationInfo,
444 auth_error_reason});
445 }
446 }
447
448 if (identity == nullptr)
449 {
450 // If none were accepted, let the last set the response header
451 endpoint->authn_policies.back()->set_unauthenticated_error(
452 ctx, std::move(auth_error_reason));
453 // Return collated error details for the auth policies
454 // declared in the request
455 std::vector<nlohmann::json> json_details;
456 for (auto& details : error_details)
457 {
458 json_details.push_back(details);
459 }
460 ctx->set_error(
461 HTTP_STATUS_UNAUTHORIZED,
462 ccf::errors::InvalidAuthenticationInfo,
463 "Invalid authentication credentials.",
464 std::move(json_details));
465 }
466
467 return identity;
468 }
469
470 std::chrono::milliseconds get_forwarding_timeout(
471 std::shared_ptr<ccf::RpcContextImpl> ctx) const
472 {
473 auto r = std::chrono::milliseconds(3'000);
474
475 auto interface_id = ctx->get_session_context()->interface_id;
476 if (interface_id.has_value())
477 {
478 auto& ncs = node_configuration_subsystem->get();
479 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
480 if (rit != ncs.node_config.network.rpc_interfaces.end())
481 {
482 if (rit->second.forwarding_timeout_ms.has_value())
483 {
484 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
485 }
486 }
487 }
488
489 return r;
490 }
491
492 void forward(
493 std::shared_ptr<ccf::RpcContextImpl> ctx,
495 const endpoints::EndpointDefinitionPtr& endpoint)
496 {
497 // HTTP/2 does not support forwarding
498 if (ctx->get_http_version() == HttpVersion::HTTP2)
499 {
500 ctx->set_error(
501 HTTP_STATUS_NOT_IMPLEMENTED,
502 ccf::errors::NotImplemented,
503 "Request cannot be forwarded to primary on HTTP/2 interface.");
504
505 return;
506 }
507
508 if (!cmd_forwarder || !consensus)
509 {
510 ctx->set_error(
511 HTTP_STATUS_INTERNAL_SERVER_ERROR,
512 ccf::errors::InternalError,
513 "No consensus or forwarder to forward request.");
514
515 return;
516 }
517
518 if (ctx->get_session_context()->is_forwarded)
519 {
520 // If the request was already forwarded, return an error to prevent
521 // daisy chains.
522 ctx->set_error(
523 HTTP_STATUS_SERVICE_UNAVAILABLE,
524 ccf::errors::RequestAlreadyForwarded,
525 "RPC was already forwarded.");
526
527 return;
528 }
529
530 // Before attempting to forward, make sure we're in the same View as we
531 // previously thought we were.
532 if (!check_session_consistency(ctx))
533 {
534 return;
535 }
536
537 auto primary_id = consensus->primary();
538 if (!primary_id.has_value())
539 {
540 ctx->set_error(
541 HTTP_STATUS_SERVICE_UNAVAILABLE,
542 ccf::errors::InternalError,
543 "RPC could not be forwarded to unknown primary.");
544
545 return;
546 }
547
548 if (!cmd_forwarder->forward_command(
549 ctx,
550 primary_id.value(),
551 ctx->get_session_context()->caller_cert,
552 get_forwarding_timeout(ctx)))
553 {
554 ctx->set_error(
555 HTTP_STATUS_SERVICE_UNAVAILABLE,
556 ccf::errors::InternalError,
557 "Unable to establish channel to forward to primary.");
558
559 return;
560 }
561
562 LOG_TRACE_FMT("RPC forwarded to primary {}", primary_id.value());
563
564 // Indicate that the RPC has been forwarded to primary
565 ctx->response_is_pending = true;
566
567 // Ensure future requests on this session are forwarded for session
568 // consistency
569 ctx->get_session_context()->is_forwarding = true;
570
571 return;
572 }
573
574 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
575 {
576 size_t attempts = 0;
577 endpoints::EndpointDefinitionPtr endpoint = nullptr;
578
579 const auto start_time = std::chrono::high_resolution_clock::now();
580
581 process_command_inner(ctx, endpoint, attempts);
582
583 const auto end_time = std::chrono::high_resolution_clock::now();
584
585 if (endpoint != nullptr)
586 {
587 endpoints::RequestCompletedEvent rce;
588 rce.method = endpoint->dispatch.verb.c_str();
589 rce.dispatch_path = endpoint->dispatch.uri_path;
590 rce.status = ctx->get_response_status();
591 // Although enclave time returns a microsecond value, the actual
592 // precision/granularity depends on the host's TimeUpdater. By default
593 // this only advances each millisecond. Avoid implying more precision
594 // than that, by rounding to milliseconds
595 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
596 end_time - start_time);
597 rce.attempts = attempts;
598
600 }
601 else
602 {
603 endpoints::DispatchFailedEvent dfe;
604 dfe.method = ctx->get_method();
605 dfe.status = ctx->get_response_status();
606
608 }
609 }
610
611 void process_command_inner(
612 std::shared_ptr<ccf::RpcContextImpl> ctx,
614 size_t& attempts)
615 {
616 constexpr auto max_attempts = 30;
617 while (attempts < max_attempts)
618 {
619 if (consensus != nullptr)
620 {
621 if (
623 consensus->is_at_max_capacity())
624 {
625 ctx->set_error(
626 HTTP_STATUS_SERVICE_UNAVAILABLE,
627 ccf::errors::TooManyPendingTransactions,
628 "Too many transactions pending commit on the service.");
629 return;
630 }
631 }
632
633 std::unique_ptr<ccf::kv::CommittableTx> tx_p = tables.create_tx_ptr();
634 set_root_on_proposals(*ctx, *tx_p);
635
636 if (attempts > 0)
637 {
638 // If the endpoint has already been executed, the effects of its
639 // execution should be dropped
640 ctx->reset_response();
641 }
642
643 if (!is_open())
644 {
645 ctx->set_error(
646 HTTP_STATUS_NOT_FOUND,
647 ccf::errors::FrontendNotOpen,
648 "Frontend is not open.");
649 return;
650 }
651
652 ++attempts;
653 update_history();
654
655 endpoint = find_endpoint(ctx, *tx_p);
656 if (endpoint == nullptr)
657 {
658 return;
659 }
660
661 try
662 {
663 if (!check_uri_allowed(ctx, endpoint))
664 {
665 return;
666 }
667
668 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
669 redirections = std::nullopt;
670
671 // If there's no interface ID, this is already forwarded or otherwise
672 // special - don't try to redirect it
673 if (ctx->get_session_context()->interface_id.has_value())
674 {
675 redirections = get_redirections_config(
676 ctx->get_session_context()->interface_id.value());
677 }
678
679 // If a redirections config was specified, then redirections are used
680 // and no forwarding is done
681 if (redirections.has_value())
682 {
683 if (check_redirect(*tx_p, ctx, endpoint, redirections.value()))
684 {
685 return;
686 }
687 }
688 else
689 {
690 bool is_primary =
691 (consensus == nullptr) || consensus->can_replicate();
692 const bool forwardable = (consensus != nullptr);
693
694 if (!is_primary && forwardable)
695 {
696 switch (endpoint->properties.forwarding_required)
697 {
699 {
700 break;
701 }
702
704 {
705 if (ctx->get_session_context()->is_forwarding)
706 {
707 forward(ctx, *tx_p, endpoint);
708 return;
709 }
710 break;
711 }
712
714 {
715 forward(ctx, *tx_p, endpoint);
716 return;
717 }
718 }
719 }
720 }
721
722 std::unique_ptr<AuthnIdentity> identity =
723 get_authenticated_identity(ctx, *tx_p, endpoint);
724
725 auto args = ccf::EndpointContextImpl(ctx, std::move(tx_p));
726 // NB: tx_p is no longer valid, and must be accessed from args, which
727 // may change it!
728
729 // If any auth policy was required, check that at least one is
730 // accepted
731 if (!endpoint->authn_policies.empty())
732 {
733 if (identity == nullptr)
734 {
735 return;
736 }
737 else
738 {
739 args.caller = std::move(identity);
740 }
741 }
742
743 endpoints.execute_endpoint(endpoint, args);
744
745 // If we've seen a View change, abandon this transaction as
746 // inconsistent
747 if (!check_session_consistency(ctx))
748 {
749 return;
750 }
751
752 if (!ctx->should_apply_writes())
753 {
754 return;
755 }
756
757 if (ctx->response_is_pending)
758 {
759 return;
760 }
761 else if (args.owned_tx == nullptr)
762 {
764 "Bad endpoint: During execution of {} {}, returned a non-pending "
765 "response but stole ownership of Tx object",
766 ctx->get_request_verb().c_str(),
767 ctx->get_request_path());
768
769 ctx->clear_response_headers();
770 ctx->set_error(
771 HTTP_STATUS_INTERNAL_SERVER_ERROR,
772 ccf::errors::InternalError,
773 "Illegal endpoint implementation");
774 return;
775 }
776 // else args owns a valid Tx relating to a non-pending response, which
777 // should be applied
778 ccf::kv::CommittableTx& tx = *args.owned_tx;
779 ccf::kv::CommitResult result = tx.commit(ctx->claims);
780
781 switch (result)
782 {
784 {
785 auto tx_id = tx.get_txid();
786 if (tx_id.has_value() && consensus != nullptr)
787 {
788 try
789 {
790 // Only transactions that acquired one or more map handles
791 // have a TxID, while others (e.g. unauthenticated commands)
792 // don't. Also, only report a TxID if the consensus is set, as
793 // the consensus is required to verify that a TxID is valid.
795 endpoint, args, tx_id.value());
796 }
797 catch (const std::exception& e)
798 {
799 // run default handler to set transaction id in header
800 ctx->clear_response_headers();
802 args, tx_id.value());
803 ctx->set_error(
804 HTTP_STATUS_INTERNAL_SERVER_ERROR,
805 ccf::errors::InternalError,
806 fmt::format(
807 "Failed to execute local commit handler func: {}",
808 e.what()));
809 }
810 catch (...)
811 {
812 // run default handler to set transaction id in header
813 ctx->clear_response_headers();
815 args, tx_id.value());
816 ctx->set_error(
817 HTTP_STATUS_INTERNAL_SERVER_ERROR,
818 ccf::errors::InternalError,
819 "Failed to execute local commit handler func");
820 }
821 }
822
823 if (
824 consensus != nullptr && consensus->can_replicate() &&
825 history != nullptr)
826 {
827 history->try_emit_signature();
828 }
829
830 return;
831 }
832
834 {
835 break;
836 }
837
839 {
840 ctx->clear_response_headers();
841 ctx->set_error(
842 HTTP_STATUS_SERVICE_UNAVAILABLE,
843 ccf::errors::TransactionReplicationFailed,
844 "Transaction failed to replicate.");
845
846 return;
847 }
848 }
849 }
850 catch (const ccf::kv::CompactedVersionConflict& e)
851 {
852 // The executing transaction failed because of a conflicting
853 // compaction. Reset and retry
855 "Transaction execution conflicted with compaction: {}", e.what());
856 continue;
857 }
858 catch (RpcException& e)
859 {
860 ctx->clear_response_headers();
861 ctx->set_error(std::move(e.error));
862
863 return;
864 }
865 catch (const ccf::JsonParseError& e)
866 {
867 ctx->clear_response_headers();
868 ctx->set_error(
869 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.describe());
870
871 return;
872 }
873 catch (const nlohmann::json::exception& e)
874 {
875 ctx->clear_response_headers();
876 ctx->set_error(
877 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
878
879 return;
880 }
881 catch (const ccf::kv::KvSerialiserException& e)
882 {
883 // If serialising the committed transaction fails, there is no way
884 // to recover safely (https://github.com/microsoft/CCF/issues/338).
885 // Better to abort.
886 LOG_DEBUG_FMT("Failed to serialise: {}", e.what());
887 LOG_FATAL_FMT("Failed to serialise");
888 abort();
889 }
890 catch (const std::exception& e)
891 {
892 ctx->clear_response_headers();
893 ctx->set_error(
894 HTTP_STATUS_INTERNAL_SERVER_ERROR,
895 ccf::errors::InternalError,
896 e.what());
897
898 return;
899 }
900 } // end of while loop
901
902 ctx->clear_response_headers();
903 ctx->set_error(
904 HTTP_STATUS_SERVICE_UNAVAILABLE,
905 ccf::errors::TransactionCommitAttemptsExceedLimit,
906 fmt::format(
907 "Transaction continued to conflict after {} attempts. Retry "
908 "later.",
909 max_attempts));
910
911 static constexpr size_t retry_after_seconds = 3;
912 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
913
914 return;
915 }
916
917 public:
919 ccf::kv::Store& tables_,
921 ccf::AbstractNodeContext& node_context_) :
922 tables(tables_),
923 endpoints(handlers_),
924 node_context(node_context_),
925 consensus(nullptr),
926 history(nullptr)
927 {}
928
930 size_t sig_tx_interval_, size_t sig_ms_interval_) override
931 {
932 sig_tx_interval = sig_tx_interval_;
933 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
934 ms_to_sig = sig_ms_interval;
935 }
936
938 std::shared_ptr<AbstractForwarder> cmd_forwarder_) override
939 {
940 cmd_forwarder = cmd_forwarder_;
941 }
942
943 void open() override
944 {
945 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
946 if (!is_open_)
947 {
948 LOG_INFO_FMT("Opening frontend");
949 is_open_ = true;
951 }
952 }
953
954 bool is_open() override
955 {
956 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
957 return is_open_;
958 }
959
962 {
964 {
965 update_history();
966 if (history)
967 {
968 // Warning: Retrieving the current TxID and root from the history
969 // should only ever be used for the proposal creation endpoint and
970 // nothing else. Many bad things could happen otherwise (e.g. breaking
971 // session consistency).
972 const auto& [txid, root, term_of_next_version] =
974 tx.set_read_txid(txid, term_of_next_version);
976 }
977 }
978 }
979
988 void process(std::shared_ptr<ccf::RpcContextImpl> ctx) override
989 {
990 update_consensus();
991
992 // NB: If we want to re-execute on backups, the original command could
993 // be propagated from here
994 process_command(ctx);
995 }
996
1001 void process_forwarded(std::shared_ptr<ccf::RpcContextImpl> ctx) override
1002 {
1003 if (!ctx->get_session_context()->is_forwarded)
1004 {
1005 throw std::logic_error(
1006 "Processing forwarded command with unitialised forwarded context");
1007 }
1008
1009 update_consensus();
1010 process_command(ctx);
1011 if (ctx->response_is_pending)
1012 {
1013 // This should never be called when process_command is called with a
1014 // forwarded RPC context
1015 throw std::logic_error("Forwarded RPC cannot be forwarded");
1016 }
1017 }
1018
1019 void tick(std::chrono::milliseconds elapsed) override
1020 {
1021 update_consensus();
1022
1023 endpoints.tick(elapsed);
1024 }
1025 };
1026}
Definition forwarder.h:17
static std::map< NodeId, NodeInfo > get_trusted_nodes(ccf::kv::ReadOnlyTx &tx)
Definition internal_tables_access.h:435
Definition json.h:26
std::string describe() const
Definition json.h:39
Definition rpc_context_impl.h:21
Definition frontend.h:34
ccf::kv::Store & tables
Definition frontend.h:36
RpcFrontend(ccf::kv::Store &tables_, endpoints::EndpointRegistry &handlers_, ccf::AbstractNodeContext &node_context_)
Definition frontend.h:918
void set_root_on_proposals(const ccf::RpcContextImpl &ctx, ccf::kv::CommittableTx &tx)
Definition frontend.h:960
void set_cmd_forwarder(std::shared_ptr< AbstractForwarder > cmd_forwarder_) override
Definition frontend.h:937
bool is_open() override
Definition frontend.h:954
void process(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:988
ccf::AbstractNodeContext & node_context
Definition frontend.h:38
void set_sig_intervals(size_t sig_tx_interval_, size_t sig_ms_interval_) override
Definition frontend.h:929
void process_forwarded(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:1001
void open() override
Definition frontend.h:943
endpoints::EndpointRegistry & endpoints
Definition frontend.h:37
void tick(std::chrono::milliseconds elapsed) override
Definition frontend.h:1019
Definition rpc_handler.h:24
Definition endpoint_registry.h:117
virtual void execute_endpoint_locally_committed(EndpointDefinitionPtr e, CommandEndpointContext &ctx, const TxID &tx_id)
Definition endpoint_registry.cpp:513
virtual void handle_event_request_completed(const ccf::endpoints::RequestCompletedEvent &event)
Definition endpoint_registry.h:286
virtual std::set< RESTVerb > get_allowed_verbs(ccf::kv::Tx &, const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:528
virtual void tick(std::chrono::milliseconds)
Definition endpoint_registry.cpp:587
virtual void init_handlers()
Definition endpoint_registry.cpp:417
void set_consensus(ccf::kv::Consensus *c)
Definition endpoint_registry.cpp:589
virtual void execute_endpoint(EndpointDefinitionPtr e, EndpointContext &ctx)
Definition endpoint_registry.cpp:499
virtual bool request_needs_root(const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:560
virtual void handle_event_dispatch_failed(const ccf::endpoints::DispatchFailedEvent &event)
Definition endpoint_registry.h:290
virtual EndpointDefinitionPtr find_endpoint(ccf::kv::Tx &, ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:419
void set_history(ccf::kv::TxHistory *h)
Definition endpoint_registry.cpp:594
virtual bool apply_uncommitted_tx_backpressure() const
Definition endpoint_registry.h:294
Definition committable_tx.h:18
std::optional< TxID > get_txid()
Definition committable_tx.h:305
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:355
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, std::function< void(const std::vector< uint8_t > &write_set, const std::string &commit_evidence)> write_set_observer=nullptr)
Definition committable_tx.h:131
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:365
Definition compacted_version_conflict.h:10
char const * what() const
Definition compacted_version_conflict.h:17
Definition kv_types.h:436
Definition kv_types.h:354
virtual const char * what() const
Definition kv_types.h:361
Definition tx.h:160
Definition store.h:88
std::shared_ptr< TxHistory > get_history() override
Definition store.h:201
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1295
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:188
Definition kv_types.h:368
virtual void try_emit_signature()=0
virtual std::tuple< ccf::kv::TxID, ccf::crypto::Sha256Hash, ccf::kv::Term > get_replicated_state_txid_and_root()=0
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FATAL_FMT
Definition logger.h:364
#define LOG_FAIL_FMT
Definition logger.h:363
void default_locally_committed_func(CommandEndpointContext &ctx, const TxID &tx_id)
Definition endpoint_registry.cpp:198
std::shared_ptr< const EndpointDefinition > EndpointDefinitionPtr
Definition endpoint.h:234
CommitResult
Definition kv_types.h:248
@ FAIL_NO_REPLICATE
Definition kv_types.h:251
@ SUCCESS
Definition kv_types.h:249
@ FAIL_CONFLICT
Definition kv_types.h:250
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition consensus_types.h:23
STL namespace.
Definition node_context.h:12
std::shared_ptr< T > get_subsystem(const std::string &name) const
Definition node_context.h:37
Definition endpoint_context_impl.h:13
Definition node_info_network.h:119
RedirectionResolverConfig to_primary
Definition node_info_network.h:120
RedirectionResolverConfig to_backup
Definition node_info_network.h:121