CCF
Loading...
Searching...
No Matches
forwarder.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
7#include "enclave/rpc_map.h"
9#include "kv/kv_types.h"
10#include "node/node_to_node.h"
11#include "tasks/basic_task.h"
12#include "tasks/task_system.h"
13
14namespace ccf
15{
16 class RpcContextImpl;
17
19 {
20 public:
21 virtual ~ForwardedRpcHandler() = default;
22
23 virtual void process_forwarded(
24 std::shared_ptr<ccf::RpcContextImpl> fwd_ctx) = 0;
25 };
26
27 template <typename ChannelProxy>
29 {
30 private:
31 std::weak_ptr<ccf::AbstractRPCResponder> rpcresponder;
32 std::shared_ptr<ChannelProxy> n2n_channels;
33 std::weak_ptr<ccf::RPCMap> rpc_map;
34 NodeId self;
35
36 using ForwardedCommandId = ForwardedHeader_v2::ForwardedCommandId;
37 ForwardedCommandId next_command_id = 0;
38
39 std::unordered_map<ForwardedCommandId, ccf::tasks::Task> timeout_tasks;
40 ccf::pal::Mutex timeout_tasks_lock;
41
42 using IsCallerCertForwarded = bool;
43
44 void send_timeout_error_response(
45 NodeId to,
46 size_t client_session_id,
47 const std::chrono::milliseconds& timeout)
48 {
49 auto rpc_responder_shared = rpcresponder.lock();
50 if (rpc_responder_shared)
51 {
52 auto response = ::http::Response(HTTP_STATUS_GATEWAY_TIMEOUT);
53 auto body = fmt::format(
54 "Request was forwarded to node {}, but no response was received "
55 "after {}ms",
56 to,
57 timeout.count());
58 response.set_body(body);
59 response.set_header(
60 http::headers::CONTENT_TYPE, http::headervalues::contenttype::TEXT);
61 rpc_responder_shared->reply_async(
62 client_session_id, false, response.build_response());
63 }
64 }
65
66 public:
68 std::weak_ptr<ccf::AbstractRPCResponder> rpcresponder,
69 std::shared_ptr<ChannelProxy> n2n_channels,
70 std::weak_ptr<ccf::RPCMap> rpc_map_) :
71 rpcresponder(std::move(rpcresponder)),
72 n2n_channels(std::move(n2n_channels)),
73 rpc_map(std::move(rpc_map_))
74 {}
75
76 void initialize(const NodeId& self_)
77 {
78 self = self_;
79 }
80
82 std::shared_ptr<ccf::RpcContextImpl> rpc_ctx,
83 const NodeId& to,
84 const std::vector<uint8_t>& caller_cert,
85 const std::chrono::milliseconds& timeout) override
86 {
87 auto session_ctx = rpc_ctx->get_session_context();
88
89 IsCallerCertForwarded include_caller = false;
90 const auto& raw_request = rpc_ctx->get_serialised_request();
91 auto client_session_id = session_ctx->client_session_id;
92 size_t size = sizeof(client_session_id) + sizeof(IsCallerCertForwarded) +
93 raw_request.size();
94 if (!caller_cert.empty())
95 {
96 size += sizeof(size_t) + caller_cert.size();
97 include_caller = true;
98 }
99
100 std::vector<uint8_t> plain(size);
101 auto* data_ = plain.data();
102 auto size_ = plain.size();
103 serialized::write(data_, size_, client_session_id);
104 serialized::write(data_, size_, include_caller);
105 if (include_caller)
106 {
107 serialized::write(data_, size_, caller_cert.size());
108 serialized::write(data_, size_, caller_cert.data(), caller_cert.size());
109 }
110 serialized::write(data_, size_, raw_request.data(), raw_request.size());
111
112 ForwardedCommandId command_id = 0;
113 {
114 std::lock_guard<ccf::pal::Mutex> guard(timeout_tasks_lock);
115 command_id = next_command_id++;
116 auto task =
117 ccf::tasks::make_basic_task([this, to, client_session_id, timeout]() {
118 this->send_timeout_error_response(to, client_session_id, timeout);
119 });
120 timeout_tasks[command_id] = task;
121 ccf::tasks::add_delayed_task(task, timeout);
122 }
123
124 const auto view_opt = session_ctx->active_view;
125 if (!view_opt.has_value())
126 {
127 throw std::logic_error(
128 "Expected active_view to be set before forwarding");
129 }
130 ForwardedCommandHeader_v3 header(command_id, view_opt.value());
131
132 return n2n_channels->send_encrypted(
133 to, NodeMsgType::forwarded_msg, plain, header);
134 }
135
136 template <typename TFwdHdr>
137 std::shared_ptr<::http::HttpRpcContext> recv_forwarded_command(
138 const NodeId& from, const uint8_t* data, size_t size)
139 {
140 std::pair<TFwdHdr, std::vector<uint8_t>> r;
141 try
142 {
143 LOG_TRACE_FMT("Receiving forwarded command of {} bytes", size);
144 LOG_TRACE_FMT(" => {:02x}", fmt::join(data, data + size, ""));
145
146 r = n2n_channels->template recv_encrypted<TFwdHdr>(from, data, size);
147 }
148 catch (const std::logic_error& err)
149 {
150 LOG_FAIL_FMT("Invalid forwarded command");
151 LOG_DEBUG_FMT("Invalid forwarded command: {}", err.what());
152 return nullptr;
153 }
154
155 std::vector<uint8_t> caller_cert;
156 const auto& plain_ = r.second;
157 auto data_ = plain_.data();
158 auto size_ = plain_.size();
159 auto client_session_id = serialized::read<size_t>(data_, size_);
160 auto includes_caller =
161 serialized::read<IsCallerCertForwarded>(data_, size_);
162 if (includes_caller)
163 {
164 auto caller_size = serialized::read<size_t>(data_, size_);
165 // NOLINTNEXTLINE(readability-suspicious-call-argument)
166 caller_cert = serialized::read(data_, size_, caller_size);
167 }
168 std::vector<uint8_t> raw_request = serialized::read(data_, size_, size_);
169
170 auto session =
171 std::make_shared<ccf::SessionContext>(client_session_id, caller_cert);
172 session->is_forwarded = true;
173
174 if constexpr (std::is_same_v<TFwdHdr, ForwardedCommandHeader_v3>)
175 {
176 ccf::View view = r.first.active_view;
177 session->active_view = view;
178 }
179
180 try
181 {
183 session, raw_request, r.first.frame_format);
184 }
185 catch (const ::http::RequestTooLargeException& rexc)
186 {
187 LOG_FAIL_FMT("Forwarded request exceeded limit: {}", rexc.what());
188 return nullptr;
189 }
190 catch (const std::exception& err)
191 {
192 LOG_FAIL_FMT("Invalid forwarded request");
193 LOG_DEBUG_FMT("Invalid forwarded request: {}", err.what());
194 return nullptr;
195 }
196 }
197
198 template <typename TFwdHdr>
200 size_t client_session_id,
201 const NodeId& from_node,
202 const TFwdHdr& header,
203 const std::vector<uint8_t>& data)
204 {
205 std::vector<uint8_t> plain(sizeof(client_session_id) + data.size());
206 auto* data_ = plain.data();
207 auto size_ = plain.size();
208 serialized::write(data_, size_, client_session_id);
209 serialized::write(data_, size_, data.data(), data.size());
210
211 if (!n2n_channels->send_encrypted(
212 from_node, NodeMsgType::forwarded_msg, plain, header))
213 {
214 LOG_FAIL_FMT("Failed to send forwarded response to {}", from_node);
215 }
216 }
217
219 {
221 std::vector<uint8_t> response_body;
223 };
224
225 template <typename TFwdHdr>
226 std::optional<ForwardedResponseResult> recv_forwarded_response(
227 const NodeId& from, const uint8_t* data, size_t size)
228 {
229 std::pair<TFwdHdr, std::vector<uint8_t>> r;
230 try
231 {
232 LOG_TRACE_FMT("Receiving response of {} bytes", size);
233 LOG_TRACE_FMT(" => {:02x}", fmt::join(data, data + size, ""));
234
235 r = n2n_channels->template recv_encrypted<TFwdHdr>(from, data, size);
236 }
237 catch (const std::logic_error& err)
238 {
239 LOG_FAIL_FMT("Invalid forwarded response");
240 LOG_DEBUG_FMT("Invalid forwarded response: {}", err.what());
241 return std::nullopt;
242 }
243
245 if constexpr (std::is_same_v<TFwdHdr, ForwardedResponseHeader_v3>)
246 {
247 ret.should_terminate_session = r.first.terminate_session;
248 }
249
250 const auto& plain_ = r.second;
251 auto data_ = plain_.data();
252 auto size_ = plain_.size();
253 ret.client_session_id = serialized::read<size_t>(data_, size_);
254 ret.response_body = serialized::read(data_, size_, size_);
255
256 return ret;
257 }
258
259 std::shared_ptr<ForwardedRpcHandler> get_forwarder_handler(
260 std::shared_ptr<::http::HttpRpcContext>& ctx)
261 {
262 if (ctx == nullptr)
263 {
264 LOG_FAIL_FMT("Failed to receive forwarded command");
265 return nullptr;
266 }
267
268 std::shared_ptr<ccf::RPCMap> rpc_map_shared = rpc_map.lock();
269 if (rpc_map_shared == nullptr)
270 {
271 LOG_FAIL_FMT("Failed to obtain RPCMap");
272 return nullptr;
273 }
274
275 std::shared_ptr<ccf::RpcHandler> search =
276 ::http::fetch_rpc_handler(ctx, rpc_map_shared);
277
278 auto fwd_handler = std::dynamic_pointer_cast<ForwardedRpcHandler>(search);
279 if (!fwd_handler)
280 {
282 "Failed to process forwarded command: handler is not a "
283 "ForwardedRpcHandler");
284 return nullptr;
285 }
286
287 return fwd_handler;
288 }
289
290 void recv_message(const ccf::NodeId& from, const uint8_t* data, size_t size)
291 {
292 try
293 {
294 const auto forwarded_msg = serialized::peek<ForwardedMsg>(data, size);
296 "recv_message({}, {} bytes) (type={})",
297 from,
298 size,
299 (size_t)forwarded_msg);
300
301 switch (forwarded_msg)
302 {
304 {
305 auto ctx =
306 recv_forwarded_command<ForwardedHeader_v1>(from, data, size);
307
308 auto fwd_handler = get_forwarder_handler(ctx);
309 if (fwd_handler == nullptr)
310 {
311 return;
312 }
313
314 // frame_format is deliberately unset, the forwarder ignores it
315 // and expects the same format they forwarded.
316 ForwardedHeader_v1 response_header{
318
319 LOG_DEBUG_FMT("Sending forwarded response to {}", from);
320 fwd_handler->process_forwarded(ctx);
321
323 ctx->get_session_context()->client_session_id,
324 from,
325 response_header,
326 ctx->serialise_response());
327 break;
328 }
329
331 {
332 auto ctx =
333 recv_forwarded_command<ForwardedHeader_v2>(from, data, size);
334
335 auto fwd_handler = get_forwarder_handler(ctx);
336 if (fwd_handler == nullptr)
337 {
338 return;
339 }
340
341 const auto forwarded_hdr_v2 =
342 serialized::peek<ForwardedHeader_v2>(data, size);
343 const auto cmd_id = forwarded_hdr_v2.id;
344
345 fwd_handler->process_forwarded(ctx);
346
347 // frame_format is deliberately unset, the forwarder ignores it
348 // and expects the same format they forwarded.
349 ForwardedHeader_v2 response_header{
351
352 LOG_DEBUG_FMT("Sending forwarded response to {}", from);
353
355 ctx->get_session_context()->client_session_id,
356 from,
357 response_header,
358 ctx->serialise_response());
359 break;
360 }
361
363 {
364 auto ctx = recv_forwarded_command<ForwardedCommandHeader_v3>(
365 from, data, size);
366
367 auto fwd_handler = get_forwarder_handler(ctx);
368 if (fwd_handler == nullptr)
369 {
370 return;
371 }
372
373 const auto forwarded_hdr_v3 =
374 serialized::peek<ForwardedCommandHeader_v3>(data, size);
375 const auto cmd_id = forwarded_hdr_v3.id;
376
377 fwd_handler->process_forwarded(ctx);
378
379 // frame_format is deliberately unset, the forwarder ignores it
380 // and expects the same format they forwarded.
381 ForwardedResponseHeader_v3 response_header(
382 cmd_id, ctx->terminate_session);
383
384 LOG_DEBUG_FMT("Sending forwarded response to {}", from);
385
387 ctx->get_session_context()->client_session_id,
388 from,
389 response_header,
390 ctx->serialise_response());
391 break;
392 }
393
396 {
397 const auto forwarded_hdr_v2 =
398 serialized::peek<ForwardedHeader_v2>(data, size);
399 const auto cmd_id = forwarded_hdr_v2.id;
400
401 // Cancel and delete the corresponding timeout task, so it will no
402 // longer trigger a timeout error
403 std::lock_guard<ccf::pal::Mutex> guard(timeout_tasks_lock);
404 auto it = timeout_tasks.find(cmd_id);
405 if (it != timeout_tasks.end())
406 {
407 it->second->cancel_task();
408 it = timeout_tasks.erase(it);
409 }
410 else
411 {
413 "Response for {} received too late - already sent timeout "
414 "error to client",
415 cmd_id);
416 return;
417 }
418 // Deliberate fall-through
419 }
420
422 {
423 std::optional<ForwardedResponseResult> rep;
425 {
426 rep = recv_forwarded_response<ForwardedResponseHeader_v3>(
427 from, data, size);
428 }
430 {
431 rep =
432 recv_forwarded_response<ForwardedHeader_v2>(from, data, size);
433 }
434 else
435 {
436 rep =
437 recv_forwarded_response<ForwardedHeader_v1>(from, data, size);
438 }
439
440 if (!rep.has_value())
441 {
442 return;
443 }
444
446 "Sending forwarded response to RPC endpoint {}",
447 rep->client_session_id);
448
449 auto rpc_responder_shared = rpcresponder.lock();
450 if (
451 rpc_responder_shared &&
452 !rpc_responder_shared->reply_async(
453 rep->client_session_id,
454 rep->should_terminate_session,
455 std::move(rep->response_body)))
456 {
457 return;
458 }
459
460 break;
461 }
462
463 default:
464 {
465 LOG_FAIL_FMT("Unknown frontend msg type: {}", forwarded_msg);
466 break;
467 }
468 }
469 }
470 catch (const std::exception& e)
471 {
472 LOG_FAIL_FMT("Exception in {}", __PRETTY_FUNCTION__);
473 LOG_DEBUG_FMT("Error: {}", e.what());
474 return;
475 }
476 }
477 };
478}
Definition forwarder_types.h:22
Definition forwarder.h:19
virtual ~ForwardedRpcHandler()=default
virtual void process_forwarded(std::shared_ptr< ccf::RpcContextImpl > fwd_ctx)=0
Definition forwarder.h:29
void initialize(const NodeId &self_)
Definition forwarder.h:76
std::shared_ptr<::http::HttpRpcContext > recv_forwarded_command(const NodeId &from, const uint8_t *data, size_t size)
Definition forwarder.h:137
void recv_message(const ccf::NodeId &from, const uint8_t *data, size_t size)
Definition forwarder.h:290
Forwarder(std::weak_ptr< ccf::AbstractRPCResponder > rpcresponder, std::shared_ptr< ChannelProxy > n2n_channels, std::weak_ptr< ccf::RPCMap > rpc_map_)
Definition forwarder.h:67
void send_forwarded_response(size_t client_session_id, const NodeId &from_node, const TFwdHdr &header, const std::vector< uint8_t > &data)
Definition forwarder.h:199
std::shared_ptr< ForwardedRpcHandler > get_forwarder_handler(std::shared_ptr<::http::HttpRpcContext > &ctx)
Definition forwarder.h:259
std::optional< ForwardedResponseResult > recv_forwarded_response(const NodeId &from, const uint8_t *data, size_t size)
Definition forwarder.h:226
bool forward_command(std::shared_ptr< ccf::RpcContextImpl > rpc_ctx, const NodeId &to, const std::vector< uint8_t > &caller_cert, const std::chrono::milliseconds &timeout) override
Definition forwarder.h:81
Definition http_builder.h:200
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
std::mutex Mutex
Definition locking.h:12
Task make_basic_task(Ts &&... ts)
Definition basic_task.h:33
void add_delayed_task(Task task, std::chrono::milliseconds delay)
Definition task_system.cpp:70
Definition app_interface.h:14
std::shared_ptr<::http::HttpRpcContext > make_fwd_rpc_context(std::shared_ptr< ccf::SessionContext > s, const std::vector< uint8_t > &packed, ccf::FrameFormat frame_format)
Definition http_rpc_context.h:402
view
Definition signatures.h:54
@ forwarded_cmd_v3
Definition node_types.h:52
@ forwarded_cmd_v2
Definition node_types.h:46
@ forwarded_response_v1
Definition node_types.h:42
@ forwarded_response_v3
Definition node_types.h:53
@ forwarded_response_v2
Definition node_types.h:47
@ forwarded_cmd_v1
Definition node_types.h:41
uint64_t View
Definition tx_id.h:23
@ forwarded_msg
Definition node_types.h:24
void write(uint8_t *&data, size_t &size, const T &v)
Definition serialized.h:105
T read(const uint8_t *&data, size_t &size)
Definition serialized.h:58
STL namespace.
Definition node_types.h:80
Definition node_types.h:68
Definition node_types.h:74
size_t ForwardedCommandId
Definition node_types.h:75
Definition node_types.h:96
Definition forwarder.h:219
size_t client_session_id
Definition forwarder.h:220
std::vector< uint8_t > response_body
Definition forwarder.h:221
bool should_terminate_session
Definition forwarder.h:222