CCF
Loading...
Searching...
No Matches
curl.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
6#include "ccf/rest_verb.h"
8#include "host/proxy.h"
9
10#include <cstddef>
11#include <cstdint>
12#include <curl/curl.h>
13#include <curl/multi.h>
14#include <memory>
15#include <mutex>
16#include <optional>
17#include <regex>
18#include <span>
19#include <stdexcept>
20#include <uv.h>
21
22#define CHECK_CURL_EASY(fn, ...) \
23 do \
24 { \
25 const auto res = fn(__VA_ARGS__); \
26 if (res != CURLE_OK) \
27 { \
28 throw std::runtime_error(fmt::format( \
29 "Error calling " #fn ": {} ({})", res, curl_easy_strerror(res))); \
30 } \
31 } while (0)
32
33#define CHECK_CURL_EASY_SETOPT(handle, info, arg) \
34 CHECK_CURL_EASY(curl_easy_setopt, handle, info, arg)
35#define CHECK_CURL_EASY_GETINFO(handle, info, arg) \
36 CHECK_CURL_EASY(curl_easy_getinfo, handle, info, arg)
37
38#define CHECK_CURL_MULTI(fn, ...) \
39 do \
40 { \
41 const auto res = fn(__VA_ARGS__); \
42 if (res != CURLM_OK) \
43 { \
44 throw std::runtime_error(fmt::format( \
45 "Error calling " #fn ": {} ({})", res, curl_multi_strerror(res))); \
46 } \
47 } while (0)
48
49namespace ccf::curl
50{
51
53 {
54 private:
55 std::unique_ptr<CURL, void (*)(CURL*)> p;
56
57 public:
58 UniqueCURL() : p(curl_easy_init(), [](auto x) { curl_easy_cleanup(x); })
59 {
60 if (p == nullptr)
61 {
62 throw std::runtime_error("Error initialising curl easy request");
63 }
64 }
65
66 // No implicit copying: unique ownership of the CURL handle
67 UniqueCURL(const UniqueCURL&) = delete;
68 UniqueCURL& operator=(const UniqueCURL&) = delete;
69
70 // Move semantics
71 UniqueCURL(UniqueCURL&& other) noexcept : p(std::move(other.p)) {}
72 UniqueCURL& operator=(UniqueCURL&& other) noexcept
73 {
74 p = std::move(other.p);
75 return *this;
76 }
77
78 ~UniqueCURL() = default;
79
80 operator CURL*() const
81 {
82 return p.get();
83 }
84
85 void set_blob_opt(auto option, const uint8_t* data, size_t length)
86 {
87 if (data == nullptr || length == 0)
88 {
89 throw std::invalid_argument(
90 "Data pointer cannot be null or length zero");
91 }
92
93 if (p == nullptr)
94 {
95 throw std::logic_error("Cannot set option on a null CURL handle");
96 }
97
98 struct curl_blob blob
99 {
100 .data = const_cast<uint8_t*>(data), .len = length,
101 .flags = CURL_BLOB_COPY,
102 };
103
104 CHECK_CURL_EASY_SETOPT(p.get(), option, &blob);
105 }
106
107 void set_opt(auto option, auto value)
108 {
109 CHECK_CURL_EASY_SETOPT(p.get(), option, value);
110 }
111 };
112
114 {
115 protected:
116 std::unique_ptr<CURLM, void (*)(CURLM*)> p;
117
118 public:
119 UniqueCURLM() : p(curl_multi_init(), [](auto x) { curl_multi_cleanup(x); })
120 {
121 if (p == nullptr)
122 {
123 throw std::runtime_error("Error initialising curl multi request");
124 }
125 }
126
127 ~UniqueCURLM() = default;
128 UniqueCURLM(const UniqueCURLM&) = delete;
130 UniqueCURLM(UniqueCURLM&& other) noexcept : p(std::move(other.p)) {}
132 {
133 p = std::move(other.p);
134 return *this;
135 }
136
137 [[nodiscard]] CURLM* release()
138 {
139 return p.release();
140 }
141
142 operator CURLM*() const
143 {
144 return p.get();
145 }
146 };
147
149 {
150 private:
151 std::unique_ptr<curl_slist, void (*)(curl_slist*)> p;
152
153 public:
154 UniqueSlist() : p(nullptr, [](auto x) { curl_slist_free_all(x); }) {}
155 ~UniqueSlist() = default;
156 UniqueSlist(const UniqueSlist&) = delete;
158 UniqueSlist(UniqueSlist&& other) noexcept : p(std::move(other.p)) {}
160 {
161 p = std::move(other.p);
162 return *this;
163 }
164
165 void append(const char* str)
166 {
167 p.reset(curl_slist_append(p.release(), str));
168 }
169
170 void append(const std::string& key, const std::string& value)
171 {
172 append(fmt::format("{}: {}", key, value).c_str());
173 }
174
175 [[nodiscard]] curl_slist* get() const
176 {
177 return p.get();
178 }
179 };
180
182 {
183 std::vector<uint8_t> buffer;
184 std::span<const uint8_t> unsent;
185
186 public:
187 RequestBody(std::vector<uint8_t>& buffer) : buffer(buffer)
188 {
189 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
190 }
191
192 RequestBody(std::vector<uint8_t>&& buffer_) : buffer(std::move(buffer_))
193 {
194 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
195 }
196
197 RequestBody(nlohmann::json json)
198 {
199 auto json_str = json.dump();
200 buffer = std::vector<uint8_t>(
201 json_str.begin(), json_str.end()); // Convert to vector of bytes
202 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
203 }
204
205 static size_t send_data(
206 char* ptr, size_t size, size_t nitems, RequestBody* data)
207 {
208 if (data == nullptr)
209 {
210 LOG_FAIL_FMT("send_data called with null userdata");
211 return 0;
212 }
213 auto bytes_to_copy = std::min(data->unsent.size(), size * nitems);
214 memcpy(ptr, data->unsent.data(), bytes_to_copy);
215 data->unsent = data->unsent.subspan(bytes_to_copy);
216 return bytes_to_copy;
217 }
218
219 void attach_to_curl(CURL* curl)
220 {
221 if (curl == nullptr)
222 {
223 throw std::logic_error(
224 "Cannot attach request body to a null CURL handle");
225 }
226 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_READDATA, this);
227 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_READFUNCTION, send_data);
228 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_INFILESIZE, unsent.size());
229 }
230 };
231
233 {
234 public:
235 std::vector<uint8_t> buffer;
237
238 // Ensure there is always a maximum size set
239 ResponseBody() = delete;
240
241 // _max_size is the maximum size of the response body
242 ResponseBody(size_t max_size_) : maximum_size(max_size_) {}
243
244 static size_t write_response_chunk(
245 const uint8_t* ptr, size_t size, size_t nmemb, ResponseBody* response)
246 {
247 if (response == nullptr)
248 {
250 "write_response_chunk called with a null response pointer");
251 return CURL_WRITEFUNC_ERROR;
252 }
253 auto bytes_to_copy = size * nmemb;
254 if (response->buffer.size() + bytes_to_copy > response->maximum_size)
255 {
257 "Response size limit exceeded: {} bytes, maximum is {} bytes",
258 response->buffer.size() + bytes_to_copy,
259 response->maximum_size);
260 return CURL_WRITEFUNC_ERROR;
261 }
262
263 response->buffer.insert(response->buffer.end(), ptr, ptr + bytes_to_copy);
264 return bytes_to_copy;
265 }
266
267 void attach_to_curl(CURL* curl)
268 {
269 if (curl == nullptr)
270 {
271 throw std::logic_error("Cannot attach response to a null CURL handle");
272 }
273 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEDATA, this);
274 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEFUNCTION, write_response_chunk);
275 }
276
277 static size_t noop_write_function(
278 const uint8_t* ptr, size_t size, size_t nmemb, ResponseBody* response)
279 {
280 (void)ptr;
281 (void)response;
282 return size * nmemb;
283 }
284
285 static void attach_noop_response(CURL* curl)
286 {
287 if (curl == nullptr)
288 {
289 throw std::logic_error(
290 "Cannot attach noop response to a null CURL handle");
291 }
292 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEDATA, nullptr);
293 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEFUNCTION, noop_write_function);
294 }
295 };
296
298 {
299 public:
300 using HeaderMap = std::unordered_map<std::string, std::string>;
301 bool is_first_header = true;
303
304 static size_t recv_header_line(
305 char* buffer, size_t size, size_t nitems, ResponseHeaders* response)
306 {
307 if (response == nullptr)
308 {
309 LOG_FAIL_FMT("recv_header_line called with a null response pointer");
310 return 0;
311 }
312 auto bytes_to_read = size * nitems;
313 std::string_view header(buffer, bytes_to_read);
314
315 // strip \r\n etc
316 header = ccf::nonstd::trim(header);
317
318 // Ignore the http status line (e.g. "HTTP/1.1 200") which should be the
319 // first header
320 static const std::regex http_status_line_regex(R"(^HTTP\/[1-9]+.*)");
321 if (response->is_first_header)
322 {
323 response->is_first_header = false;
324 if (!std::regex_match(std::string(header), http_status_line_regex))
325 {
327 "Expected HTTP status line as first header, got '{}'", header);
328 return bytes_to_read;
329 }
330 }
331 else
332 {
333 // ignore empty headers
334 if (!header.empty())
335 {
336 const auto [field, value] = ccf::nonstd::split_1(header, ": ");
337 if (!value.empty())
338 {
339 std::string field_str(field);
340 nonstd::to_lower(field_str);
341 if (response->data.contains(field_str))
342 {
343 auto current = response->data[field_str];
345 "Duplicate header for '{}', current = '{}', new = '{}'",
346 field_str,
347 current,
348 value);
349 }
350 response->data[field_str] = ccf::nonstd::trim(value);
351 }
352 else
353 {
354 LOG_DEBUG_FMT("Ignoring invalid-looking HTTP Header '{}'", header);
355 }
356 }
357 }
358
359 return bytes_to_read;
360 }
361
362 void attach_to_curl(CURL* curl)
363 {
364 if (curl == nullptr)
365 {
366 throw std::logic_error("Cannot attach response to a null CURL handle");
367 }
368 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERDATA, this);
369 CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERFUNCTION, recv_header_line);
370 }
371 };
372
374 {
375 public:
376 using ResponseCallback = std::function<void(
377 std::unique_ptr<CurlRequest>&& request,
378 CURLcode curl_response_code,
379 long status_code)>;
380
381 private:
382 UniqueCURL curl_handle;
383 RESTVerb method;
384 std::string url;
386 std::unique_ptr<ccf::curl::RequestBody> request_body;
387 std::unique_ptr<ccf::curl::ResponseBody> response;
388 ResponseHeaders response_headers;
389 std::optional<ResponseCallback> response_callback;
390
391 public:
393 UniqueCURL&& curl_handle_,
394 RESTVerb method_,
395 std::string url_,
396 UniqueSlist&& headers_,
397 std::unique_ptr<RequestBody>&& request_body_,
398 std::unique_ptr<ccf::curl::ResponseBody>&& response_,
399 std::optional<ResponseCallback>&& response_callback_) :
400 curl_handle(std::move(curl_handle_)),
401 method(method_),
402 url(std::move(url_)),
403 headers(std::move(headers_)),
404 request_body(std::move(request_body_)),
405 response(std::move(response_)),
406 response_callback(std::move(response_callback_))
407 {
408 if (url.empty())
409 {
410 throw std::invalid_argument("URL cannot be empty");
411 }
412 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_URL, url.c_str());
413
414 auto http_method = method.get_http_method();
415 if (!http_method.has_value())
416 {
417 throw std::logic_error(
418 fmt::format("Unsupported HTTP method: {}", method.c_str()));
419 }
420
421 switch (http_method.value())
422 {
423 case HTTP_GET:
424 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_HTTPGET, 1L);
425 break;
426 case HTTP_HEAD:
427 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_NOBODY, 1L);
428 break;
429 case HTTP_PUT:
430 {
431 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_UPLOAD, 1L);
432 if (request_body == nullptr)
433 {
434 // If no request body is provided, curl will try reading from
435 // stdin, which causes a blockage
436 request_body =
437 std::make_unique<RequestBody>(std::vector<uint8_t>());
438 }
439 }
440 break;
441 case HTTP_POST:
442 // libcurl sets the post verb when CURLOPT_POSTFIELDS is set, so we
443 // skip doing so here, and we assume that the user has already set
444 // these fields
445 break;
446 default:
447 throw std::logic_error(
448 fmt::format("Unsupported HTTP method: {}", method.c_str()));
449 }
450
451 if (request_body != nullptr)
452 {
453 request_body->attach_to_curl(curl_handle);
454 }
455
456 if (response != nullptr)
457 {
458 response->attach_to_curl(curl_handle);
459 }
460 else
461 {
463 }
464
465 response_headers.attach_to_curl(curl_handle);
466
467 if (headers.get() != nullptr)
468 {
469 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_HTTPHEADER, headers.get());
470 }
471 }
472
473 static void handle_response(
474 std::unique_ptr<CurlRequest>&& request, CURLcode curl_response_code)
475 {
476 LOG_TRACE_FMT("Handling response for {}", request->url);
477 auto& callback = request->response_callback;
478 if (callback.has_value())
479 {
480 if (callback.value() != nullptr)
481 {
482 long status_code = 0;
484 request->curl_handle, CURLINFO_RESPONSE_CODE, &status_code);
485 callback.value()(std::move(request), curl_response_code, status_code);
486 }
487 }
488 }
489
490 static void synchronous_perform(std::unique_ptr<CurlRequest>&& request)
491 {
492 if (request == nullptr)
493 {
494 throw std::logic_error("Cannot perform a null CurlRequest");
495 }
496 if (request->curl_handle == nullptr)
497 {
498 throw std::logic_error(
499 "Cannot curl_easy_perform on a null CURL handle");
500 }
501
502 auto curl_code = curl_easy_perform(request->curl_handle);
503
505 std::move(request),
506 curl_code); // handle the response callback if set
507 }
508
509 [[nodiscard]] CURL* get_easy_handle() const
510 {
511 return curl_handle;
512 }
513
515 {
516 return curl_handle;
517 }
518
519 [[nodiscard]] RESTVerb get_method() const
520 {
521 return method;
522 }
523
524 [[nodiscard]] std::string get_url() const
525 {
526 return url;
527 }
528
530 {
531 return response.get();
532 }
533
534 [[nodiscard]] std::unique_ptr<ResponseBody>& get_response_ptr()
535 {
536 return response;
537 }
538
540 {
541 return response_headers.data;
542 }
543 };
544
546 {
547 public:
548 void attach_curl_request(std::unique_ptr<CurlRequest>&& request)
549 {
550 if (p == nullptr)
551 {
552 throw std::logic_error(
553 "Cannot attach CurlRequest to a null CURLM handle");
554 }
555 if (request == nullptr)
556 {
557 throw std::logic_error("Cannot attach a null CurlRequest");
558 }
559 LOG_DEBUG_FMT("Attaching CurlRequest to {} to Curlm", request->get_url());
560 CURL* curl_handle = request->get_easy_handle();
561 CHECK_CURL_EASY_SETOPT(curl_handle, CURLOPT_PRIVATE, request.release());
562 CHECK_CURL_MULTI(curl_multi_add_handle, p.get(), curl_handle);
563 }
564
566 {
567 if (p == nullptr)
568 {
569 throw std::logic_error("Cannot perform on a null CURLM handle");
570 }
571
572 int running_handles = 0;
573 CHECK_CURL_MULTI(curl_multi_perform, p.get(), &running_handles);
574
575 // handle all completed curl requests
576 int msgq = 0;
577 CURLMsg* msg = nullptr;
578 do
579 {
580 msg = curl_multi_info_read(p.get(), &msgq);
581
582 if ((msg != nullptr) && msg->msg == CURLMSG_DONE)
583 {
584 auto* easy = msg->easy_handle;
585 auto result = msg->data.result;
586
587 // retrieve the request data and attach a lifetime to it
588 ccf::curl::CurlRequest* request = nullptr;
589 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
590 if (request == nullptr)
591 {
592 curl_multi_remove_handle(p.get(), easy);
593 throw std::runtime_error(
594 "CURLMSG_DONE received with no associated request data");
595 }
596 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
597
598 // detach the easy handle such that it can be cleaned up with the
599 // destructor of CurlRequest
600 curl_multi_remove_handle(p.get(), easy);
601
602 // handle response inline. Note that if this is expensive, it should
603 // defer its work to a task
604 CurlRequest::handle_response(std::move(request_data_ptr), result);
605 }
606 } while (msgq > 0);
607 return running_handles;
608 }
609 };
610
611 // Must be created on the same thread as the uv loop is running
613 {
614 /* Very high level:
615 * CURLM triggers timeout callback with some delay for libuv
616 * libuv calls the timeout callback which then triggers the curl socket
617 * action
618 * curl calls the socket callback to register the libuv polling
619 * libuv waits on the socket events and calls the socket poll callback
620 * socket poll callback triggers relevant libuv action
621 * etc.
622 *
623 * Example flow:
624 *
625 * Initially a CURL* is attached to the curl_multi CURLM* handle
626 * This calls the curl_multi's timeout function curl_timeout_callback with
627 * 0 delay which then registers the libuv timeout callback with 0 delay
628 * libuv_timeout_callback then registers a timeout socket_action with curl
629 * which then registers the socket polling at the libuv level
630 *
631 * At this point, either the relevant timeout will fire and call the
632 * relevant timeout callbacks, or the socket polling will trigger allowing
633 * data to be sent/received
634 */
635 private:
636 uv_loop_t* loop;
637 uv_timer_t uv_handle{};
638 CurlRequestCURLM curl_request_curlm;
639 std::atomic<bool> is_stopping = false;
640
641 class SocketContextImpl : public asynchost::with_uv_handle<uv_poll_t>
642 {
643 friend class CurlmLibuvContextImpl;
644
645 public:
646 curl_socket_t socket{};
647 CurlmLibuvContextImpl* context = nullptr;
648 };
649
651
652 uv_async_t async_requests_handle{};
653 std::mutex requests_mutex;
654 std::deque<std::unique_ptr<CurlRequest>> pending_requests;
655
656 static void async_requests_callback(uv_async_t* handle)
657 {
658 auto* self = static_cast<CurlmLibuvContextImpl*>(handle->data);
659 if (self == nullptr)
660 {
661 throw std::logic_error(
662 "async_requests_callback called with null self pointer");
663 }
664
665 if (self->is_stopping)
666 {
667 LOG_FAIL_FMT("async_requests_callback called while stopping");
668 return;
669 }
670
671 LOG_DEBUG_FMT("Libuv: processing pending curl requests");
672
673 std::deque<std::unique_ptr<CurlRequest>> requests_to_add;
674 {
675 std::lock_guard<std::mutex> requests_lock(self->requests_mutex);
676 requests_to_add.swap(self->pending_requests);
677 }
678
679 for (auto& req : requests_to_add)
680 {
681 self->curl_request_curlm.attach_curl_request(std::move(req));
682 }
683 }
684
685 public:
686 static void libuv_timeout_callback(uv_timer_t* handle)
687 {
688 auto* self = static_cast<CurlmLibuvContextImpl*>(handle->data);
689 if (self == nullptr)
690 {
691 throw std::logic_error(
692 "libuv_timeout_callback called with null self pointer");
693 }
694
695 if (self->is_stopping)
696 {
697 LOG_FAIL_FMT("libuv_timeout_callback called while stopping");
698 return;
699 }
700
701 LOG_DEBUG_FMT("Libuv timeout");
702
703 int running_handles = 0;
705 curl_multi_socket_action,
706 self->curl_request_curlm,
707 CURL_SOCKET_TIMEOUT,
708 0,
709 &running_handles);
710 self->curl_request_curlm.perform();
711 }
712
714 CURLM* multi, long timeout_ms, CurlmLibuvContextImpl* self)
715 {
716 (void)multi;
717 if (self == nullptr)
718 {
719 throw std::logic_error(
720 "libuv_timeout_callback called with null self pointer");
721 }
722
723 if (self->is_stopping)
724 {
725 LOG_FAIL_FMT("curl_timeout_callback called while stopping");
726 return 0;
727 }
728
729 LOG_DEBUG_FMT("Curl timeout {}ms", timeout_ms);
730
731 if (timeout_ms < 0)
732 {
733 // No timeout set, stop the timer
734 uv_timer_stop(&self->uv_handle);
735 }
736 else
737 {
738 // If timeout is zero, this will trigger immediately, possibly within a
739 // callback so clamp it to at least 1ms
740 timeout_ms = std::max(timeout_ms, 1L);
741 uv_timer_start(&self->uv_handle, libuv_timeout_callback, timeout_ms, 0);
742 }
743 return 0;
744 }
745
746 // Called when libuv detects a socket event
748 uv_poll_t* req, int status, int events)
749 {
750 auto* socket_context = static_cast<SocketContextImpl*>(req->data);
751 if (socket_context == nullptr)
752 {
753 throw std::logic_error(
754 "libuv_socket_poll_callback called with null request context");
755 }
756
757 auto* self = socket_context->context;
758 if (self == nullptr)
759 {
760 throw std::logic_error(
761 "libuv_socket_poll_callback called with null self pointer");
762 }
763
764 if (self->is_stopping)
765 {
767 "libuv_socket_poll_callback called on {} while stopped",
768 socket_context->socket);
769 return;
770 }
771
772 if (status < 0)
773 {
774 if (status == UV_EBADF)
775 {
776 // Thrown when POLLERR is thrown by the epoll socket, such as when a
777 // TCP socket received a reset at a bad time
778 // https://docs.libuv.org/en/v1.x/poll.html#c.uv_poll_start
779 // https://github.com/libuv/libuv/issues/3796
781 "Socket poll error on {}: {}",
782 socket_context->socket,
783 uv_strerror(status));
784 }
785 else
786 {
788 "Socket poll error on {}: {}",
789 socket_context->socket,
790 uv_strerror(status));
791 }
792
793 // Notify curl of the error
794 int running_handles = 0;
796 curl_multi_socket_action,
797 self->curl_request_curlm,
798 socket_context->socket,
799 CURL_CSELECT_ERR,
800 &running_handles);
801 self->curl_request_curlm.perform();
802 return;
803 }
804
806 "Libuv socket poll callback on {}: {}",
807 static_cast<int>(socket_context->socket),
808 static_cast<int>(events));
809
810 int action = 0;
811 action |= ((events & UV_READABLE) != 0) ? CURL_CSELECT_IN : 0;
812 action |= ((events & UV_WRITABLE) != 0) ? CURL_CSELECT_OUT : 0;
813 int running_handles = 0;
815 curl_multi_socket_action,
816 self->curl_request_curlm,
817 socket_context->socket,
818 action,
819 &running_handles);
820 self->curl_request_curlm.perform();
821 }
822
823 // Called when the status of a socket changes (creation/deletion)
825 CURL* easy,
826 curl_socket_t s,
827 int action,
829 SocketContextImpl* socket_context)
830 {
831 if (self == nullptr)
832 {
833 throw std::logic_error(
834 "curl_socket_callback called with null self pointer");
835 }
836 (void)easy;
837
838 switch (action)
839 {
840 case CURL_POLL_IN:
841 case CURL_POLL_OUT:
842 case CURL_POLL_INOUT:
843 {
845 "Curl socket callback: listen on socket {}, {}",
846 static_cast<int>(s),
847 static_cast<int>(action));
848
849 // During shutdown ignore requests to add new sockets
850 if (self->is_stopping)
851 {
852 return 0;
853 }
854
855 if (socket_context == nullptr)
856 {
857 auto socket_context_ptr = std::make_unique<SocketContextImpl>();
858 socket_context_ptr->context = self;
859 socket_context_ptr->socket = s;
860 uv_poll_init_socket(self->loop, &socket_context_ptr->uv_handle, s);
861 socket_context_ptr->uv_handle.data =
862 socket_context_ptr.get(); // Attach the context
863 // attach the lifetime to the socket handle
864 socket_context = socket_context_ptr.release();
866 curl_multi_assign, self->curl_request_curlm, s, socket_context);
867 }
868
869 int events = 0;
870 events |= (action != CURL_POLL_IN) ? UV_WRITABLE : 0;
871 events |= (action != CURL_POLL_OUT) ? UV_READABLE : 0;
872
873 uv_poll_start(
874 &socket_context->uv_handle, events, libuv_socket_poll_callback);
875 break;
876 }
877 case CURL_POLL_REMOVE:
878 if (socket_context != nullptr)
879 {
881 "CurlmLibuv: curl socket callback: remove socket {}",
882 static_cast<int>(s));
883 SocketContext socket_context_ptr(socket_context);
884 uv_poll_stop(&socket_context->uv_handle);
886 curl_multi_assign, self->curl_request_curlm, s, nullptr);
887 }
888 break;
889 default:
890 throw std::runtime_error("Unknown action in curl_socket_callback");
891 }
892 return 0;
893 }
894
895 CurlmLibuvContextImpl(uv_loop_t* loop) : loop(loop)
896 {
897 uv_timer_init(loop, &uv_handle);
898 uv_handle.data = this; // Attach this instance to the timer
899
900 uv_async_init(loop, &async_requests_handle, async_requests_callback);
901 async_requests_handle.data = this;
902 uv_unref(reinterpret_cast<uv_handle_t*>(
903 &async_requests_handle)); // allow the loop to exit if this is the only
904 // active handle
905
906 // attach timeouts
908 curl_multi_setopt, curl_request_curlm, CURLMOPT_TIMERDATA, this);
910 curl_multi_setopt,
911 curl_request_curlm,
912 CURLMOPT_TIMERFUNCTION,
914
915 // attach socket events
917 curl_multi_setopt, curl_request_curlm, CURLMOPT_SOCKETDATA, this);
919 curl_multi_setopt,
920 curl_request_curlm,
921 CURLMOPT_SOCKETFUNCTION,
923 }
924
925 void attach_request(std::unique_ptr<CurlRequest>&& request)
926 {
927 if (is_stopping)
928 {
929 LOG_FAIL_FMT("CurlmLibuvContext already closed, cannot attach request");
930 return;
931 }
932 LOG_DEBUG_FMT("Adding request to {} to queue", request->get_url());
933 std::lock_guard<std::mutex> requests_lock(requests_mutex);
934 pending_requests.push_back(std::move(request));
935 uv_async_send(&async_requests_handle);
936 }
937
938 private:
939 // Interface to allow the proxy pointer to close and delete this safely
940 // Make the templated asynchost::close_ptr a friend so it can call close()
941 template <typename T>
942 friend class ::asynchost::close_ptr;
943 size_t closed_uv_handle_count = 0;
944
945 // called by the close_ptr within the destructor of the proxy_ptr
946 void close()
947 {
948 LOG_TRACE_FMT("Closing CurlmLibuvContext");
949
950 // Prevent multiple close calls
951 if (is_stopping)
952 {
954 "CurlmLibuvContext already closed, nothing to stop or remove");
955 return;
956 }
957 is_stopping = true;
958
959 // remove, stop and cleanup all curl easy handles
960 std::unique_ptr<CURL*, void (*)(CURL**)> easy_handles(
961 curl_multi_get_handles(curl_request_curlm),
962 // NOLINTNEXTLINE(bugprone-multi-level-implicit-pointer-conversion)
963 [](CURL** handles) { curl_free(static_cast<void*>(handles)); });
964 // curl_multi_get_handles returns the handles as a null-terminated array
965 for (size_t i = 0; easy_handles.get()[i] != nullptr; ++i)
966 {
967 auto* easy = easy_handles.get()[i];
968 curl_multi_remove_handle(curl_request_curlm, easy);
969 if (easy != nullptr)
970 {
971 // attach a lifetime to the request
972 ccf::curl::CurlRequest* request = nullptr;
973 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
974 if (request == nullptr)
975 {
977 "CURLMSG_DONE received with no associated request data");
978 }
979 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
980 curl_easy_cleanup(easy);
981 }
982 }
983 // Drain the deque rather than letting it destruct
984 std::deque<std::unique_ptr<CurlRequest>> requests_to_cleanup;
985 {
986 std::lock_guard<std::mutex> requests_lock(requests_mutex);
987 requests_to_cleanup.swap(pending_requests);
988 }
989 // Dispatch uv_close to asynchronously close the timer handle
990 uv_close(
991 reinterpret_cast<uv_handle_t*>(&async_requests_handle), on_close);
992 uv_close(reinterpret_cast<uv_handle_t*>(&uv_handle), on_close);
993 }
994 static void on_close(uv_handle_t* handle)
995 {
996 auto& close_count = static_cast<CurlmLibuvContextImpl*>(handle->data)
997 ->closed_uv_handle_count;
998 close_count++;
999 if (close_count >= 2)
1000 {
1001 static_cast<CurlmLibuvContextImpl*>(handle->data)->on_close();
1002 }
1003 }
1004 void on_close()
1005 {
1006 // We are being notified asynchronously that libuv has finished closing
1007 delete this;
1008 }
1009 };
1010
1011 // Required destructor sequence triggered by proxy_ptr calling close
1012 // 1. Detach CURLM handle from this object and clean up all easy handles.
1013 // Detaching prevents new easy handles being added.
1014 // curl_multi_cleanup detaches all sockets from libuv
1015 // 2. Close the libuv timer handle.
1016 // Prevents any further callbacks from the libuv timer
1017 // 3. Delete CurlmLibuvContextImpl via the on_close callback
1019
1021 {
1022 private:
1023 static std::unique_ptr<CurlmLibuvContext>& instance()
1024 {
1025 static std::unique_ptr<CurlmLibuvContext> curlm_libuv_context_instance =
1026 nullptr;
1027 return curlm_libuv_context_instance;
1028 }
1029
1030 public:
1032 {
1033 if (instance() == nullptr)
1034 {
1035 throw std::logic_error(
1036 "CurlmLibuvContextSingleton instance not initialized");
1037 }
1038 return *instance();
1039 }
1041 {
1042 if (instance() != nullptr)
1043 {
1044 throw std::logic_error(
1045 "CurlmLibuvContextSingleton instance already initialized");
1046 }
1047 instance() = std::make_unique<CurlmLibuvContext>(loop);
1048 }
1050 {
1051 instance().reset(); // Clean up the instance
1052 }
1053
1056 delete;
1059 default;
1060 };
1061} // namespace ccf::curl
Definition proxy.h:51
Definition proxy.h:84
Definition rest_verb.h:45
std::optional< llhttp_method > get_http_method() const
Definition rest_verb.h:57
const char * c_str() const
Definition rest_verb.h:62
Definition curl.h:546
void attach_curl_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:548
int perform()
Definition curl.h:565
Definition curl.h:374
CurlRequest(UniqueCURL &&curl_handle_, RESTVerb method_, std::string url_, UniqueSlist &&headers_, std::unique_ptr< RequestBody > &&request_body_, std::unique_ptr< ccf::curl::ResponseBody > &&response_, std::optional< ResponseCallback > &&response_callback_)
Definition curl.h:392
static void synchronous_perform(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:490
CURL * get_easy_handle() const
Definition curl.h:509
UniqueCURL & get_easy_handle_ptr()
Definition curl.h:514
const ResponseHeaders::HeaderMap & get_response_headers() const
Definition curl.h:539
RESTVerb get_method() const
Definition curl.h:519
static void handle_response(std::unique_ptr< CurlRequest > &&request, CURLcode curl_response_code)
Definition curl.h:473
std::string get_url() const
Definition curl.h:524
std::function< void(std::unique_ptr< CurlRequest > &&request, CURLcode curl_response_code, long status_code)> ResponseCallback
Definition curl.h:379
std::unique_ptr< ResponseBody > & get_response_ptr()
Definition curl.h:534
ResponseBody * get_response_body()
Definition curl.h:529
static void libuv_timeout_callback(uv_timer_t *handle)
Definition curl.h:686
static void libuv_socket_poll_callback(uv_poll_t *req, int status, int events)
Definition curl.h:747
CurlmLibuvContextImpl(uv_loop_t *loop)
Definition curl.h:895
static int curl_socket_callback(CURL *easy, curl_socket_t s, int action, CurlmLibuvContextImpl *self, SocketContextImpl *socket_context)
Definition curl.h:824
static int curl_timeout_callback(CURLM *multi, long timeout_ms, CurlmLibuvContextImpl *self)
Definition curl.h:713
void attach_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:925
~CurlmLibuvContextSingleton()
Definition curl.h:1049
CurlmLibuvContextSingleton(CurlmLibuvContextSingleton &&)=default
static CurlmLibuvContext & get_instance()
Definition curl.h:1031
CurlmLibuvContextSingleton(uv_loop_t *loop)
Definition curl.h:1040
CurlmLibuvContextSingleton & operator=(CurlmLibuvContextSingleton &&)=default
CurlmLibuvContextSingleton(const CurlmLibuvContextSingleton &)=delete
CurlmLibuvContextSingleton & operator=(const CurlmLibuvContextSingleton &)=delete
Definition curl.h:182
RequestBody(std::vector< uint8_t > &&buffer_)
Definition curl.h:192
RequestBody(nlohmann::json json)
Definition curl.h:197
void attach_to_curl(CURL *curl)
Definition curl.h:219
static size_t send_data(char *ptr, size_t size, size_t nitems, RequestBody *data)
Definition curl.h:205
RequestBody(std::vector< uint8_t > &buffer)
Definition curl.h:187
Definition curl.h:233
void attach_to_curl(CURL *curl)
Definition curl.h:267
static size_t write_response_chunk(const uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:244
std::vector< uint8_t > buffer
Definition curl.h:235
static size_t noop_write_function(const uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:277
static void attach_noop_response(CURL *curl)
Definition curl.h:285
ResponseBody(size_t max_size_)
Definition curl.h:242
size_t maximum_size
Definition curl.h:236
Definition curl.h:298
HeaderMap data
Definition curl.h:302
static size_t recv_header_line(char *buffer, size_t size, size_t nitems, ResponseHeaders *response)
Definition curl.h:304
bool is_first_header
Definition curl.h:301
std::unordered_map< std::string, std::string > HeaderMap
Definition curl.h:300
void attach_to_curl(CURL *curl)
Definition curl.h:362
Definition curl.h:114
UniqueCURLM & operator=(UniqueCURLM &&other) noexcept
Definition curl.h:131
UniqueCURLM()
Definition curl.h:119
UniqueCURLM & operator=(const UniqueCURLM &)=delete
UniqueCURLM(const UniqueCURLM &)=delete
std::unique_ptr< CURLM, void(*)(CURLM *)> p
Definition curl.h:116
UniqueCURLM(UniqueCURLM &&other) noexcept
Definition curl.h:130
CURLM * release()
Definition curl.h:137
Definition curl.h:53
void set_blob_opt(auto option, const uint8_t *data, size_t length)
Definition curl.h:85
UniqueCURL(const UniqueCURL &)=delete
void set_opt(auto option, auto value)
Definition curl.h:107
UniqueCURL(UniqueCURL &&other) noexcept
Definition curl.h:71
UniqueCURL & operator=(UniqueCURL &&other) noexcept
Definition curl.h:72
UniqueCURL & operator=(const UniqueCURL &)=delete
UniqueCURL()
Definition curl.h:58
Definition curl.h:149
UniqueSlist(UniqueSlist &&other) noexcept
Definition curl.h:158
UniqueSlist(const UniqueSlist &)=delete
curl_slist * get() const
Definition curl.h:175
void append(const char *str)
Definition curl.h:165
UniqueSlist & operator=(const UniqueSlist &)=delete
UniqueSlist()
Definition curl.h:154
void append(const std::string &key, const std::string &value)
Definition curl.h:170
UniqueSlist & operator=(UniqueSlist &&other) noexcept
Definition curl.h:159
#define CHECK_CURL_EASY_SETOPT(handle, info, arg)
Definition curl.h:33
#define CHECK_CURL_MULTI(fn,...)
Definition curl.h:38
#define CHECK_CURL_EASY_GETINFO(handle, info, arg)
Definition curl.h:35
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition curl.h:50
Definition json_schema.h:15
STL namespace.