CCF
Loading...
Searching...
No Matches
curl.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
6#include "ccf/ds/nonstd.h"
7#include "ccf/rest_verb.h"
8#include "host/proxy.h"
9
10#include <cstddef>
11#include <cstdint>
12#include <curl/curl.h>
13#include <curl/multi.h>
14#include <memory>
15#include <mutex>
16#include <optional>
17#include <regex>
18#include <span>
19#include <stdexcept>
20#include <uv.h>
21
22#define CHECK_CURL_EASY(fn, ...) \
23 do \
24 { \
25 const auto res = fn(__VA_ARGS__); \
26 if (res != CURLE_OK) \
27 { \
28 throw std::runtime_error(fmt::format( \
29 "Error calling " #fn ": {} ({})", res, curl_easy_strerror(res))); \
30 } \
31 } while (0)
32
33#define CHECK_CURL_EASY_SETOPT(handle, info, arg) \
34 CHECK_CURL_EASY(curl_easy_setopt, handle, info, arg)
35#define CHECK_CURL_EASY_GETINFO(handle, info, arg) \
36 CHECK_CURL_EASY(curl_easy_getinfo, handle, info, arg)
37
38#define CHECK_CURL_MULTI(fn, ...) \
39 do \
40 { \
41 const auto res = fn(__VA_ARGS__); \
42 if (res != CURLM_OK) \
43 { \
44 throw std::runtime_error(fmt::format( \
45 "Error calling " #fn ": {} ({})", res, curl_multi_strerror(res))); \
46 } \
47 } while (0)
48
49namespace ccf::curl
50{
51
53 {
54 private:
55 std::unique_ptr<CURL, void (*)(CURL*)> p;
56
57 public:
58 UniqueCURL() : p(curl_easy_init(), [](auto x) { curl_easy_cleanup(x); })
59 {
60 if (p == nullptr)
61 {
62 throw std::runtime_error("Error initialising curl easy request");
63 }
64 }
65
66 operator CURL*() const
67 {
68 return p.get();
69 }
70
71 void set_blob_opt(auto option, const uint8_t* data, size_t length)
72 {
73 if (data == nullptr || length == 0)
74 {
75 throw std::invalid_argument(
76 "Data pointer cannot be null or length zero");
77 }
78
79 if (p == nullptr)
80 {
81 throw std::logic_error("Cannot set option on a null CURL handle");
82 }
83
84 struct curl_blob blob
85 {
86 .data = const_cast<uint8_t*>(data), .len = length,
87 .flags = CURL_BLOB_COPY,
88 };
89
90 CHECK_CURL_EASY_SETOPT(p.get(), option, &blob);
91 }
92
93 void set_opt(auto option, auto value)
94 {
95 CHECK_CURL_EASY_SETOPT(p.get(), option, value);
96 }
97 };
98
100 {
101 protected:
102 std::unique_ptr<CURLM, void (*)(CURLM*)> p;
103
104 public:
105 UniqueCURLM() : p(curl_multi_init(), [](auto x) { curl_multi_cleanup(x); })
106 {
107 if (p == nullptr)
108 {
109 throw std::runtime_error("Error initialising curl multi request");
110 }
111 }
112
113 ~UniqueCURLM() = default;
114 UniqueCURLM(const UniqueCURLM&) = delete;
116 UniqueCURLM(UniqueCURLM&& other) noexcept : p(std::move(other.p)) {}
118 {
119 p = std::move(other.p);
120 return *this;
121 }
122
123 [[nodiscard]] CURLM* release()
124 {
125 return p.release();
126 }
127
128 operator CURLM*() const
129 {
130 return p.get();
131 }
132 };
133
135 {
136 private:
137 std::unique_ptr<curl_slist, void (*)(curl_slist*)> p;
138
139 public:
140 UniqueSlist() : p(nullptr, [](auto x) { curl_slist_free_all(x); }) {}
141 ~UniqueSlist() = default;
142 UniqueSlist(const UniqueSlist&) = delete;
144 UniqueSlist(UniqueSlist&& other) noexcept : p(std::move(other.p)) {}
146 {
147 p = std::move(other.p);
148 return *this;
149 }
150
151 void append(const char* str)
152 {
153 p.reset(curl_slist_append(p.release(), str));
154 }
155
156 void append(const std::string& key, const std::string& value)
157 {
158 append(fmt::format("{}: {}", key, value).c_str());
159 }
160
161 [[nodiscard]] curl_slist* get() const
162 {
163 return p.get();
164 }
165 };
166
168 {
169 std::vector<uint8_t> buffer;
170 std::span<const uint8_t> unsent;
171
172 public:
173 RequestBody(std::vector<uint8_t>& buffer) : buffer(buffer)
174 {
175 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
176 }
177
178 RequestBody(std::vector<uint8_t>&& buffer) : buffer(std::move(buffer))
179 {
180 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
181 }
182
183 RequestBody(nlohmann::json json)
184 {
185 auto json_str = json.dump();
186 buffer = std::vector<uint8_t>(
187 json_str.begin(), json_str.end()); // Convert to vector of bytes
188 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
189 }
190
191 static size_t send_data(
192 char* ptr, size_t size, size_t nitems, RequestBody* data)
193 {
194 if (data == nullptr)
195 {
196 LOG_FAIL_FMT("send_data called with null userdata");
197 return 0;
198 }
199 auto bytes_to_copy = std::min(data->unsent.size(), size * nitems);
200 memcpy(ptr, data->unsent.data(), bytes_to_copy);
201 data->unsent = data->unsent.subspan(bytes_to_copy);
202 return bytes_to_copy;
203 }
204
205 void attach_to_curl(CURL* curl)
206 {
207 if (curl == nullptr)
208 {
209 throw std::logic_error(
210 "Cannot attach request body to a null CURL handle");
211 }
212 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_READDATA, this);
213 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_READFUNCTION, send_data);
214 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_INFILESIZE, unsent.size());
215 }
216 };
217
219 {
220 public:
221 std::vector<uint8_t> buffer;
223
224 // Ensure there is always a maximum size set
225 ResponseBody() = delete;
226
227 // _max_size is the maximum size of the response body
228 ResponseBody(size_t max_size_) : maximum_size(max_size_) {}
229
230 static size_t write_response_chunk(
231 uint8_t* ptr, size_t size, size_t nmemb, ResponseBody* response)
232 {
233 if (response == nullptr)
234 {
236 "write_response_chunk called with a null response pointer");
237 return CURL_WRITEFUNC_ERROR;
238 }
239 auto bytes_to_copy = size * nmemb;
240 if (response->buffer.size() + bytes_to_copy > response->maximum_size)
241 {
243 "Response size limit exceeded: {} bytes, maximum is {} bytes",
244 response->buffer.size() + bytes_to_copy,
245 response->maximum_size);
246 return CURL_WRITEFUNC_ERROR;
247 }
248
249 response->buffer.insert(response->buffer.end(), ptr, ptr + bytes_to_copy);
250 return bytes_to_copy;
251 }
252
253 void attach_to_curl(CURL* curl)
254 {
255 if (curl == nullptr)
256 {
257 throw std::logic_error("Cannot attach response to a null CURL handle");
258 }
259 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEDATA, this);
260 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEFUNCTION, write_response_chunk);
261 }
262
263 static size_t noop_write_function(
264 uint8_t* ptr, size_t size, size_t nmemb, ResponseBody* response)
265 {
266 (void)ptr;
267 (void)response;
268 return size * nmemb;
269 }
270
271 static void attach_noop_response(CURL* curl)
272 {
273 if (curl == nullptr)
274 {
275 throw std::logic_error(
276 "Cannot attach noop response to a null CURL handle");
277 }
278 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEDATA, nullptr);
279 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEFUNCTION, noop_write_function);
280 }
281 };
282
284 {
285 public:
286 using HeaderMap = std::unordered_map<std::string, std::string>;
287 bool is_first_header = true;
289
290 static size_t recv_header_line(
291 char* buffer, size_t size, size_t nitems, ResponseHeaders* response)
292 {
293 if (response == nullptr)
294 {
295 LOG_FAIL_FMT("recv_header_line called with a null response pointer");
296 return 0;
297 }
298 auto bytes_to_read = size * nitems;
299 std::string_view header(buffer, bytes_to_read);
300
301 // strip \r\n etc
302 header = ccf::nonstd::trim(header);
303
304 // Ignore the http status line (e.g. "HTTP/1.1 200") which should be the
305 // first header
306 static const std::regex http_status_line_regex(R"(^HTTP\/[1-9]+.*)");
307 if (response->is_first_header)
308 {
309 response->is_first_header = false;
310 if (!std::regex_match(std::string(header), http_status_line_regex))
311 {
313 "Expected HTTP status line as first header, got '{}'", header);
314 return bytes_to_read;
315 }
316 }
317 else
318 {
319 // ignore empty headers
320 if (!header.empty())
321 {
322 const auto [field, value] = ccf::nonstd::split_1(header, ": ");
323 if (!value.empty())
324 {
325 std::string field_str(field);
326 nonstd::to_lower(field_str);
327 if (response->data.contains(field_str))
328 {
329 auto current = response->data[field_str];
331 "Duplicate header for '{}', current = '{}', new = '{}'",
332 field_str,
333 current,
334 value);
335 }
336 response->data[field_str] = ccf::nonstd::trim(value);
337 }
338 else
339 {
340 LOG_INFO_FMT("Ignoring invalid-looking HTTP Header '{}'", header);
341 }
342 }
343 }
344
345 return bytes_to_read;
346 }
347
348 void attach_to_curl(CURL* curl)
349 {
350 if (curl == nullptr)
351 {
352 throw std::logic_error("Cannot attach response to a null CURL handle");
353 }
354 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERDATA, this);
355 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERFUNCTION, recv_header_line);
356 }
357 };
358
360 {
361 public:
362 using ResponseCallback = std::function<void(
363 CurlRequest& request, CURLcode curl_response_code, long status_code)>;
364
365 private:
366 UniqueCURL curl_handle;
367 RESTVerb method;
368 std::string url;
370 std::unique_ptr<ccf::curl::RequestBody> request_body;
371 std::unique_ptr<ccf::curl::ResponseBody> response;
372 ResponseHeaders response_headers;
373 std::optional<ResponseCallback> response_callback;
374
375 public:
377 UniqueCURL&& curl_handle_,
378 RESTVerb method_,
379 std::string&& url_,
380 UniqueSlist&& headers_,
381 std::unique_ptr<RequestBody>&& request_body_,
382 std::unique_ptr<ccf::curl::ResponseBody>&& response_,
383 std::optional<ResponseCallback>&& response_callback_) :
384 curl_handle(std::move(curl_handle_)),
385 method(method_),
386 url(std::move(url_)),
387 headers(std::move(headers_)),
388 request_body(std::move(request_body_)),
389 response(std::move(response_)),
390 response_callback(std::move(response_callback_))
391 {
392 if (url.empty())
393 {
394 throw std::invalid_argument("URL cannot be empty");
395 }
396 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_URL, url.c_str());
397
398 if (!method.get_http_method().has_value())
399 {
400 throw std::logic_error(
401 fmt::format("Unsupported HTTP method: {}", method.c_str()));
402 }
403
404 switch (method.get_http_method().value())
405 {
406 case HTTP_GET:
407 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_HTTPGET, 1L);
408 break;
409 case HTTP_HEAD:
410 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_NOBODY, 1L);
411 break;
412 case HTTP_PUT:
413 {
414 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_UPLOAD, 1L);
415 if (request_body == nullptr)
416 {
417 // If no request body is provided, curl will try reading from
418 // stdin, which causes a blockage
419 request_body =
420 std::make_unique<RequestBody>(std::vector<uint8_t>());
421 }
422 }
423 break;
424 case HTTP_POST:
425 // libcurl sets the post verb when CURLOPT_POSTFIELDS is set, so we
426 // skip doing so here, and we assume that the user has already set
427 // these fields
428 break;
429 default:
430 throw std::logic_error(
431 fmt::format("Unsupported HTTP method: {}", method.c_str()));
432 }
433
434 if (request_body != nullptr)
435 {
436 request_body->attach_to_curl(curl_handle);
437 }
438
439 if (response != nullptr)
440 {
441 response->attach_to_curl(curl_handle);
442 }
443 else
444 {
446 }
447
448 response_headers.attach_to_curl(curl_handle);
449
450 if (headers.get() != nullptr)
451 {
452 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_HTTPHEADER, headers.get());
453 }
454 }
455
456 void handle_response(CURLcode curl_response_code)
457 {
458 LOG_TRACE_FMT("Handling response for {}", url);
459 if (response_callback.has_value())
460 {
461 long status_code = 0;
463 curl_handle, CURLINFO_RESPONSE_CODE, &status_code);
464 response_callback.value()(*this, curl_response_code, status_code);
465 }
466 }
467
468 void synchronous_perform(CURLcode& curl_code, long& status_code)
469 {
470 if (curl_handle == nullptr)
471 {
472 throw std::logic_error(
473 "Cannot curl_easy_perform on a null CURL handle");
474 }
475
476 curl_code = curl_easy_perform(curl_handle);
477
478 handle_response(curl_code); // handle the response callback if set
479
481 curl_handle, CURLINFO_RESPONSE_CODE, &status_code);
482 }
483
484 [[nodiscard]] CURL* get_easy_handle() const
485 {
486 return curl_handle;
487 }
488
489 [[nodiscard]] RESTVerb get_method() const
490 {
491 return method;
492 }
493
494 [[nodiscard]] std::string get_url() const
495 {
496 return url;
497 }
498
500 {
501 return response.get();
502 }
503
504 [[nodiscard]] std::unique_ptr<ResponseBody>& get_response_ptr()
505 {
506 return response;
507 }
508
510 {
511 return response_headers;
512 }
513 };
514
516 {
517 public:
518 void attach_curl_request(std::unique_ptr<CurlRequest>&& request)
519 {
520 if (p == nullptr)
521 {
522 throw std::logic_error(
523 "Cannot attach CurlRequest to a null CURLM handle");
524 }
525 if (request == nullptr)
526 {
527 throw std::logic_error("Cannot attach a null CurlRequest");
528 }
529 LOG_TRACE_FMT("Attaching CurlRequest to {} to Curlm", request->get_url());
530 CURL* curl_handle = request->get_easy_handle();
531 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_PRIVATE, request.release());
532 CHECK_CURL_MULTI(curl_multi_add_handle, p.get(), curl_handle);
533 }
534
536 {
537 if (p == nullptr)
538 {
539 throw std::logic_error("Cannot perform on a null CURLM handle");
540 }
541
542 int running_handles = 0;
543 CHECK_CURL_MULTI(curl_multi_perform, p.get(), &running_handles);
544
545 // handle all completed curl requests
546 int msgq = 0;
547 CURLMsg* msg = nullptr;
548 do
549 {
550 msg = curl_multi_info_read(p.get(), &msgq);
551
552 if ((msg != nullptr) && msg->msg == CURLMSG_DONE)
553 {
554 auto* easy = msg->easy_handle;
555 auto result = msg->data.result;
556
557 // retrieve the request data and attach a lifetime to it
558 ccf::curl::CurlRequest* request = nullptr;
559 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
560 if (request == nullptr)
561 {
562 curl_multi_remove_handle(p.get(), easy);
563 throw std::runtime_error(
564 "CURLMSG_DONE received with no associated request data");
565 }
566 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
567
568 // detach the easy handle such that it can be cleaned up with the
569 // destructor of CurlRequest
570 curl_multi_remove_handle(p.get(), easy);
571 request->handle_response(result);
572 }
573 } while (msgq > 0);
574 return running_handles;
575 }
576 };
577
578 // Must be created on the same thread as the uv loop is running
580 {
581 /* Very high level:
582 * CURLM triggers timeout callback with some delay for libuv
583 * libuv calls the timeout callback which then triggers the curl socket
584 * action
585 * curl calls the socket callback to register the libuv polling
586 * libuv waits on the socket events and calls the socket poll callback
587 * socket poll callback triggers relevant libuv action
588 * etc.
589 *
590 * Example flow:
591 *
592 * Initially a CURL* is attached to the curl_multi CURLM* handle
593 * This calls the curl_multi's timeout function curl_timeout_callback with
594 * 0 delay which then registers the libuv timeout callback with 0 delay
595 * libuv_timeout_callback then registers a timeout socket_action with curl
596 * which then registers the socket polling at the libuv level
597 *
598 * At this point, either the relevant timeout will fire and call the
599 * relevant timeout callbacks, or the socket polling will trigger allowing
600 * data to be sent/received
601 */
602 private:
603 uv_loop_t* loop;
604 uv_timer_t uv_handle{};
605 CurlRequestCURLM curl_request_curlm;
606 std::atomic<bool> is_stopping = false;
607
608 class SocketContextImpl : public asynchost::with_uv_handle<uv_poll_t>
609 {
610 friend class CurlmLibuvContextImpl;
611
612 public:
613 curl_socket_t socket{};
614 CurlmLibuvContextImpl* context = nullptr;
615 };
616
618
619 uv_async_t async_requests_handle{};
620 std::mutex requests_mutex;
621 std::deque<std::unique_ptr<CurlRequest>> pending_requests;
622
623 static void async_requests_callback(uv_async_t* handle)
624 {
625 auto* self = static_cast<CurlmLibuvContextImpl*>(handle->data);
626 if (self == nullptr)
627 {
628 throw std::logic_error(
629 "async_requests_callback called with null self pointer");
630 }
631
632 if (self->is_stopping)
633 {
634 LOG_FAIL_FMT("async_requests_callback called while stopping");
635 return;
636 }
637
638 LOG_TRACE_FMT("Libuv: processing pending curl requests");
639
640 std::deque<std::unique_ptr<CurlRequest>> requests_to_add;
641 {
642 std::lock_guard<std::mutex> requests_lock(self->requests_mutex);
643 requests_to_add.swap(self->pending_requests);
644 }
645
646 for (auto& req : requests_to_add)
647 {
648 self->curl_request_curlm.attach_curl_request(std::move(req));
649 }
650 }
651
652 public:
653 static void libuv_timeout_callback(uv_timer_t* handle)
654 {
655 auto* self = static_cast<CurlmLibuvContextImpl*>(handle->data);
656 if (self == nullptr)
657 {
658 throw std::logic_error(
659 "libuv_timeout_callback called with null self pointer");
660 }
661
662 if (self->is_stopping)
663 {
664 LOG_FAIL_FMT("libuv_timeout_callback called while stopping");
665 return;
666 }
667
668 LOG_TRACE_FMT("Libuv timeout");
669
670 int running_handles = 0;
672 curl_multi_socket_action,
673 self->curl_request_curlm,
674 CURL_SOCKET_TIMEOUT,
675 0,
676 &running_handles);
677 self->curl_request_curlm.perform();
678 }
679
681 CURLM* multi, long timeout_ms, CurlmLibuvContextImpl* self)
682 {
683 (void)multi;
684 if (self == nullptr)
685 {
686 throw std::logic_error(
687 "libuv_timeout_callback called with null self pointer");
688 }
689
690 if (self->is_stopping)
691 {
692 LOG_FAIL_FMT("curl_timeout_callback called while stopping");
693 return 0;
694 }
695
696 LOG_TRACE_FMT("Curl timeout {}ms", timeout_ms);
697
698 if (timeout_ms < 0)
699 {
700 // No timeout set, stop the timer
701 uv_timer_stop(&self->uv_handle);
702 }
703 else
704 {
705 // If timeout is zero, this will trigger immediately, possibly within a
706 // callback so clamp it to at least 1ms
707 timeout_ms = std::max(timeout_ms, 1L);
708 uv_timer_start(&self->uv_handle, libuv_timeout_callback, timeout_ms, 0);
709 }
710 return 0;
711 }
712
713 // Called when libuv detects a socket event
715 uv_poll_t* req, int status, int events)
716 {
717 if (status < 0)
718 {
719 LOG_FAIL_FMT("Socket poll error: {}", uv_strerror(status));
720 return;
721 }
722
723 auto* socket_context = static_cast<SocketContextImpl*>(req->data);
724 if (socket_context == nullptr)
725 {
726 throw std::logic_error(
727 "libuv_socket_poll_callback called with null request context");
728 }
729
730 auto* self = socket_context->context;
731 if (self == nullptr)
732 {
733 throw std::logic_error(
734 "libuv_socket_poll_callback called with null self pointer");
735 }
736
737 if (self->is_stopping)
738 {
739 LOG_FAIL_FMT("libuv_socket_poll_callback called while stopping");
740 return;
741 }
742
744 "Libuv socket poll callback on {}: {}",
745 static_cast<int>(socket_context->socket),
746 static_cast<int>(events));
747
748 int action = 0;
749 action |= ((events & UV_READABLE) != 0) ? CURL_CSELECT_IN : 0;
750 action |= ((events & UV_WRITABLE) != 0) ? CURL_CSELECT_OUT : 0;
751 int running_handles = 0;
753 curl_multi_socket_action,
754 self->curl_request_curlm,
755 socket_context->socket,
756 action,
757 &running_handles);
758 self->curl_request_curlm.perform();
759 }
760
761 // Called when the status of a socket changes (creation/deletion)
763 CURL* easy,
764 curl_socket_t s,
765 int action,
767 SocketContextImpl* socket_context)
768 {
769 if (self == nullptr)
770 {
771 throw std::logic_error(
772 "curl_socket_callback called with null self pointer");
773 }
774 (void)easy;
775
776 switch (action)
777 {
778 case CURL_POLL_IN:
779 case CURL_POLL_OUT:
780 case CURL_POLL_INOUT:
781 {
782 // Possibly called during shutdown
783 if (self->is_stopping)
784 {
785 LOG_FAIL_FMT("curl_socket_callback called while stopping");
786 return 0;
787 }
788
790 "Curl socket callback: listen on socket {}", static_cast<int>(s));
791 if (socket_context == nullptr)
792 {
793 auto socket_context_ptr = std::make_unique<SocketContextImpl>();
794 socket_context_ptr->context = self;
795 socket_context_ptr->socket = s;
796 uv_poll_init_socket(self->loop, &socket_context_ptr->uv_handle, s);
797 socket_context_ptr->uv_handle.data =
798 socket_context_ptr.get(); // Attach the context
799 // attach the lifetime to the socket handle
800 socket_context = socket_context_ptr.release();
802 curl_multi_assign, self->curl_request_curlm, s, socket_context);
803 }
804
805 int events = 0;
806 events |= (action != CURL_POLL_IN) ? UV_WRITABLE : 0;
807 events |= (action != CURL_POLL_OUT) ? UV_READABLE : 0;
808
809 uv_poll_start(
810 &socket_context->uv_handle, events, libuv_socket_poll_callback);
811 break;
812 }
813 case CURL_POLL_REMOVE:
814 if (socket_context != nullptr)
815 {
817 "CurlmLibuv: curl socket callback: remove socket {}",
818 static_cast<int>(s));
819 SocketContext socket_context_ptr(socket_context);
820 uv_poll_stop(&socket_context->uv_handle);
822 curl_multi_assign, self->curl_request_curlm, s, nullptr);
823 }
824 break;
825 default:
826 throw std::runtime_error("Unknown action in curl_socket_callback");
827 }
828 return 0;
829 }
830
831 CurlmLibuvContextImpl(uv_loop_t* loop) : loop(loop)
832 {
833 uv_timer_init(loop, &uv_handle);
834 uv_handle.data = this; // Attach this instance to the timer
835
836 uv_async_init(loop, &async_requests_handle, async_requests_callback);
837 async_requests_handle.data = this;
838 uv_unref(reinterpret_cast<uv_handle_t*>(
839 &async_requests_handle)); // allow the loop to exit if this is the only
840 // active handle
841
842 // attach timeouts
844 curl_multi_setopt, curl_request_curlm, CURLMOPT_TIMERDATA, this);
846 curl_multi_setopt,
847 curl_request_curlm,
848 CURLMOPT_TIMERFUNCTION,
850
851 // attach socket events
853 curl_multi_setopt, curl_request_curlm, CURLMOPT_SOCKETDATA, this);
855 curl_multi_setopt,
856 curl_request_curlm,
857 CURLMOPT_SOCKETFUNCTION,
859 }
860
861 void attach_request(std::unique_ptr<CurlRequest>&& request)
862 {
863 if (is_stopping)
864 {
865 LOG_FAIL_FMT("CurlmLibuvContext already closed, cannot attach request");
866 return;
867 }
868 LOG_INFO_FMT("Adding request to {} to queue", request->get_url());
869 std::lock_guard<std::mutex> requests_lock(requests_mutex);
870 pending_requests.push_back(std::move(request));
871 uv_async_send(&async_requests_handle);
872 }
873
874 private:
875 // Interface to allow the proxy pointer to close and delete this safely
876 // Make the templated asynchost::close_ptr a friend so it can call close()
877 template <typename T>
878 friend class ::asynchost::close_ptr;
879 size_t closed_uv_handle_count = 0;
880
881 // called by the close_ptr within the destructor of the proxy_ptr
882 void close()
883 {
884 LOG_TRACE_FMT("Closing CurlmLibuvContext");
885
886 // Prevent multiple close calls
887 if (is_stopping)
888 {
890 "CurlmLibuvContext already closed, nothing to stop or remove");
891 return;
892 }
893 is_stopping = true;
894
895 // remove, stop and cleanup all curl easy handles
896 std::unique_ptr<CURL*, void (*)(CURL**)> easy_handles(
897 curl_multi_get_handles(curl_request_curlm),
898 [](CURL** handles) { curl_free(handles); });
899 // curl_multi_get_handles returns the handles as a null-terminated array
900 for (size_t i = 0; easy_handles.get()[i] != nullptr; ++i)
901 {
902 auto* easy = easy_handles.get()[i];
903 curl_multi_remove_handle(curl_request_curlm, easy);
904 if (easy != nullptr)
905 {
906 // attach a lifetime to the request
907 ccf::curl::CurlRequest* request = nullptr;
908 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
909 if (request == nullptr)
910 {
912 "CURLMSG_DONE received with no associated request data");
913 }
914 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
915 curl_easy_cleanup(easy);
916 }
917 }
918 // Drain the deque rather than letting it destruct
919 std::deque<std::unique_ptr<CurlRequest>> requests_to_cleanup;
920 {
921 std::lock_guard<std::mutex> requests_lock(requests_mutex);
922 requests_to_cleanup.swap(pending_requests);
923 }
924 // Dispatch uv_close to asynchronously close the timer handle
925 uv_close(
926 reinterpret_cast<uv_handle_t*>(&async_requests_handle), on_close);
927 uv_close(reinterpret_cast<uv_handle_t*>(&uv_handle), on_close);
928 }
929 static void on_close(uv_handle_t* handle)
930 {
931 auto& close_count = static_cast<CurlmLibuvContextImpl*>(handle->data)
932 ->closed_uv_handle_count;
933 close_count++;
934 if (close_count >= 2)
935 {
936 static_cast<CurlmLibuvContextImpl*>(handle->data)->on_close();
937 }
938 }
939 void on_close()
940 {
941 // We are being notified asynchronously that libuv has finished closing
942 delete this;
943 }
944 };
945
946 // Required destructor sequence triggered by proxy_ptr calling close
947 // 1. Detach CURLM handle from this object and clean up all easy handles.
948 // Detaching prevents new easy handles being added.
949 // curl_multi_cleanup detaches all sockets from libuv
950 // 2. Close the libuv timer handle.
951 // Prevents any further callbacks from the libuv timer
952 // 3. Delete CurlmLibuvContextImpl via the on_close callback
954
956 {
957 private:
958 static std::unique_ptr<CurlmLibuvContext>& instance()
959 {
960 static std::unique_ptr<CurlmLibuvContext> curlm_libuv_context_instance =
961 nullptr;
962 return curlm_libuv_context_instance;
963 }
964
965 public:
967 {
968 if (instance() == nullptr)
969 {
970 throw std::logic_error(
971 "CurlmLibuvContextSingleton instance not initialized");
972 }
973 return *instance();
974 }
976 {
977 if (instance() != nullptr)
978 {
979 throw std::logic_error(
980 "CurlmLibuvContextSingleton instance already initialized");
981 }
982 instance() = std::make_unique<CurlmLibuvContext>(loop);
983 }
985 {
986 instance().reset(); // Clean up the instance
987 }
988
991 delete;
994 default;
995 };
996} // namespace ccf::curl
Definition proxy.h:51
Definition proxy.h:82
Definition rest_verb.h:45
std::optional< llhttp_method > get_http_method() const
Definition rest_verb.h:57
const char * c_str() const
Definition rest_verb.h:62
Definition curl.h:516
void attach_curl_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:518
int perform()
Definition curl.h:535
Definition curl.h:360
void handle_response(CURLcode curl_response_code)
Definition curl.h:456
CurlRequest(UniqueCURL &&curl_handle_, RESTVerb method_, std::string &&url_, UniqueSlist &&headers_, std::unique_ptr< RequestBody > &&request_body_, std::unique_ptr< ccf::curl::ResponseBody > &&response_, std::optional< ResponseCallback > &&response_callback_)
Definition curl.h:376
CURL * get_easy_handle() const
Definition curl.h:484
std::function< void(CurlRequest &request, CURLcode curl_response_code, long status_code)> ResponseCallback
Definition curl.h:363
RESTVerb get_method() const
Definition curl.h:489
void synchronous_perform(CURLcode &curl_code, long &status_code)
Definition curl.h:468
std::string get_url() const
Definition curl.h:494
ResponseHeaders & get_response_headers()
Definition curl.h:509
std::unique_ptr< ResponseBody > & get_response_ptr()
Definition curl.h:504
ResponseBody * get_response_body()
Definition curl.h:499
static void libuv_timeout_callback(uv_timer_t *handle)
Definition curl.h:653
static void libuv_socket_poll_callback(uv_poll_t *req, int status, int events)
Definition curl.h:714
CurlmLibuvContextImpl(uv_loop_t *loop)
Definition curl.h:831
static int curl_socket_callback(CURL *easy, curl_socket_t s, int action, CurlmLibuvContextImpl *self, SocketContextImpl *socket_context)
Definition curl.h:762
static int curl_timeout_callback(CURLM *multi, long timeout_ms, CurlmLibuvContextImpl *self)
Definition curl.h:680
void attach_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:861
~CurlmLibuvContextSingleton()
Definition curl.h:984
CurlmLibuvContextSingleton(CurlmLibuvContextSingleton &&)=default
static CurlmLibuvContext & get_instance()
Definition curl.h:966
CurlmLibuvContextSingleton(uv_loop_t *loop)
Definition curl.h:975
CurlmLibuvContextSingleton & operator=(CurlmLibuvContextSingleton &&)=default
CurlmLibuvContextSingleton(const CurlmLibuvContextSingleton &)=delete
CurlmLibuvContextSingleton & operator=(const CurlmLibuvContextSingleton &)=delete
Definition curl.h:168
RequestBody(nlohmann::json json)
Definition curl.h:183
void attach_to_curl(CURL *curl)
Definition curl.h:205
static size_t send_data(char *ptr, size_t size, size_t nitems, RequestBody *data)
Definition curl.h:191
RequestBody(std::vector< uint8_t > &buffer)
Definition curl.h:173
RequestBody(std::vector< uint8_t > &&buffer)
Definition curl.h:178
Definition curl.h:219
void attach_to_curl(CURL *curl)
Definition curl.h:253
std::vector< uint8_t > buffer
Definition curl.h:221
static void attach_noop_response(CURL *curl)
Definition curl.h:271
static size_t write_response_chunk(uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:230
static size_t noop_write_function(uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:263
ResponseBody(size_t max_size_)
Definition curl.h:228
size_t maximum_size
Definition curl.h:222
Definition curl.h:284
HeaderMap data
Definition curl.h:288
static size_t recv_header_line(char *buffer, size_t size, size_t nitems, ResponseHeaders *response)
Definition curl.h:290
bool is_first_header
Definition curl.h:287
std::unordered_map< std::string, std::string > HeaderMap
Definition curl.h:286
void attach_to_curl(CURL *curl)
Definition curl.h:348
Definition curl.h:100
UniqueCURLM & operator=(UniqueCURLM &&other) noexcept
Definition curl.h:117
UniqueCURLM()
Definition curl.h:105
UniqueCURLM & operator=(const UniqueCURLM &)=delete
UniqueCURLM(const UniqueCURLM &)=delete
std::unique_ptr< CURLM, void(*)(CURLM *)> p
Definition curl.h:102
UniqueCURLM(UniqueCURLM &&other) noexcept
Definition curl.h:116
CURLM * release()
Definition curl.h:123
Definition curl.h:53
void set_blob_opt(auto option, const uint8_t *data, size_t length)
Definition curl.h:71
void set_opt(auto option, auto value)
Definition curl.h:93
UniqueCURL()
Definition curl.h:58
Definition curl.h:135
UniqueSlist(UniqueSlist &&other) noexcept
Definition curl.h:144
UniqueSlist(const UniqueSlist &)=delete
curl_slist * get() const
Definition curl.h:161
void append(const char *str)
Definition curl.h:151
UniqueSlist & operator=(const UniqueSlist &)=delete
UniqueSlist()
Definition curl.h:140
void append(const std::string &key, const std::string &value)
Definition curl.h:156
UniqueSlist & operator=(UniqueSlist &&other) noexcept
Definition curl.h:145
#define CHECK_CURL_EASY_SETOPT(handle, info, arg)
Definition curl.h:33
#define CHECK_CURL_MULTI(fn,...)
Definition curl.h:38
#define CHECK_CURL_EASY_GETINFO(handle, info, arg)
Definition curl.h:35
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_FAIL_FMT
Definition logger.h:363
Definition curl.h:50
Definition json_schema.h:15
STL namespace.