13#include <curl/multi.h>
22#define CHECK_CURL_EASY(fn, ...) \
25 const auto res = fn(__VA_ARGS__); \
26 if (res != CURLE_OK) \
28 throw std::runtime_error(fmt::format( \
29 "Error calling " #fn ": {} ({})", res, curl_easy_strerror(res))); \
33#define CHECK_CURL_EASY_SETOPT(handle, info, arg) \
34 CHECK_CURL_EASY(curl_easy_setopt, handle, info, arg)
35#define CHECK_CURL_EASY_GETINFO(handle, info, arg) \
36 CHECK_CURL_EASY(curl_easy_getinfo, handle, info, arg)
38#define CHECK_CURL_MULTI(fn, ...) \
41 const auto res = fn(__VA_ARGS__); \
42 if (res != CURLM_OK) \
44 throw std::runtime_error(fmt::format( \
45 "Error calling " #fn ": {} ({})", res, curl_multi_strerror(res))); \
55 std::unique_ptr<CURL, void (*)(CURL*)> p;
58 UniqueCURL() : p(curl_easy_init(), [](auto x) { curl_easy_cleanup(x); })
62 throw std::runtime_error(
"Error initialising curl easy request");
74 p = std::move(other.p);
80 operator CURL*()
const
87 if (data ==
nullptr || length == 0)
89 throw std::invalid_argument(
90 "Data pointer cannot be null or length zero");
95 throw std::logic_error(
"Cannot set option on a null CURL handle");
100 .data =
const_cast<uint8_t*
>(data), .len = length,
101 .flags = CURL_BLOB_COPY,
116 std::unique_ptr<CURLM, void (*)(CURLM*)>
p;
119 UniqueCURLM() :
p(curl_multi_init(), [](auto x) { curl_multi_cleanup(x); })
123 throw std::runtime_error(
"Error initialising curl multi request");
133 p = std::move(other.p);
142 operator CURLM*()
const
151 std::unique_ptr<curl_slist, void (*)(curl_slist*)> p;
154 UniqueSlist() : p(nullptr, [](auto x) { curl_slist_free_all(x); }) {}
161 p = std::move(other.p);
167 p.reset(curl_slist_append(p.release(), str));
170 void append(
const std::string& key,
const std::string& value)
172 append(fmt::format(
"{}: {}", key, value).c_str());
175 [[nodiscard]] curl_slist*
get()
const
183 std::vector<uint8_t> buffer;
184 std::span<const uint8_t> unsent;
189 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
194 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
199 auto json_str =
json.dump();
200 buffer = std::vector<uint8_t>(
201 json_str.begin(), json_str.end());
202 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
206 char* ptr,
size_t size,
size_t nitems,
RequestBody* data)
213 auto bytes_to_copy = std::min(data->unsent.size(), size * nitems);
214 memcpy(ptr, data->unsent.data(), bytes_to_copy);
215 data->unsent = data->unsent.subspan(bytes_to_copy);
216 return bytes_to_copy;
223 throw std::logic_error(
224 "Cannot attach request body to a null CURL handle");
245 const uint8_t* ptr,
size_t size,
size_t nmemb,
ResponseBody* response)
247 if (response ==
nullptr)
250 "write_response_chunk called with a null response pointer");
251 return CURL_WRITEFUNC_ERROR;
253 auto bytes_to_copy = size * nmemb;
257 "Response size limit exceeded: {} bytes, maximum is {} bytes",
258 response->
buffer.size() + bytes_to_copy,
260 return CURL_WRITEFUNC_ERROR;
263 response->
buffer.insert(response->
buffer.end(), ptr, ptr + bytes_to_copy);
264 return bytes_to_copy;
271 throw std::logic_error(
"Cannot attach response to a null CURL handle");
278 const uint8_t* ptr,
size_t size,
size_t nmemb,
ResponseBody* response)
289 throw std::logic_error(
290 "Cannot attach noop response to a null CURL handle");
300 using HeaderMap = std::unordered_map<std::string, std::string>;
307 if (response ==
nullptr)
309 LOG_FAIL_FMT(
"recv_header_line called with a null response pointer");
312 auto bytes_to_read = size * nitems;
313 std::string_view header(buffer, bytes_to_read);
316 header = ccf::nonstd::trim(header);
320 static const std::regex http_status_line_regex(R
"(^HTTP\/[1-9]+.*)");
324 if (!std::regex_match(std::string(header), http_status_line_regex))
327 "Expected HTTP status line as first header, got '{}'", header);
328 return bytes_to_read;
336 const auto [field, value] = ccf::nonstd::split_1(header,
": ");
339 std::string field_str(field);
340 nonstd::to_lower(field_str);
341 if (response->
data.contains(field_str))
343 auto current = response->
data[field_str];
345 "Duplicate header for '{}', current = '{}', new = '{}'",
350 response->
data[field_str] = ccf::nonstd::trim(value);
354 LOG_DEBUG_FMT(
"Ignoring invalid-looking HTTP Header '{}'", header);
359 return bytes_to_read;
366 throw std::logic_error(
"Cannot attach response to a null CURL handle");
377 std::unique_ptr<CurlRequest>&& request,
378 CURLcode curl_response_code,
386 std::unique_ptr<ccf::curl::RequestBody> request_body;
387 std::unique_ptr<ccf::curl::ResponseBody> response;
389 std::optional<ResponseCallback> response_callback;
397 std::unique_ptr<RequestBody>&& request_body_,
398 std::unique_ptr<ccf::curl::ResponseBody>&& response_,
399 std::optional<ResponseCallback>&& response_callback_) :
400 curl_handle(
std::move(curl_handle_)),
402 url(
std::move(url_)),
403 headers(
std::move(headers_)),
404 request_body(
std::move(request_body_)),
405 response(
std::move(response_)),
406 response_callback(
std::move(response_callback_))
410 throw std::invalid_argument(
"URL cannot be empty");
415 if (!http_method.has_value())
417 throw std::logic_error(
418 fmt::format(
"Unsupported HTTP method: {}", method.
c_str()));
421 switch (http_method.value())
432 if (request_body ==
nullptr)
437 std::make_unique<RequestBody>(std::vector<uint8_t>());
447 throw std::logic_error(
448 fmt::format(
"Unsupported HTTP method: {}", method.
c_str()));
451 if (request_body !=
nullptr)
453 request_body->attach_to_curl(curl_handle);
456 if (response !=
nullptr)
458 response->attach_to_curl(curl_handle);
467 if (headers.
get() !=
nullptr)
474 std::unique_ptr<CurlRequest>&& request, CURLcode curl_response_code)
477 auto& callback = request->response_callback;
478 if (callback.has_value())
480 if (callback.value() !=
nullptr)
482 long status_code = 0;
484 request->curl_handle, CURLINFO_RESPONSE_CODE, &status_code);
485 callback.value()(std::move(request), curl_response_code, status_code);
492 if (request ==
nullptr)
494 throw std::logic_error(
"Cannot perform a null CurlRequest");
496 if (request->curl_handle ==
nullptr)
498 throw std::logic_error(
499 "Cannot curl_easy_perform on a null CURL handle");
502 auto curl_code = curl_easy_perform(request->curl_handle);
531 return response.get();
541 return response_headers.
data;
552 throw std::logic_error(
553 "Cannot attach CurlRequest to a null CURLM handle");
555 if (request ==
nullptr)
557 throw std::logic_error(
"Cannot attach a null CurlRequest");
559 LOG_DEBUG_FMT(
"Attaching CurlRequest to {} to Curlm", request->get_url());
560 CURL* curl_handle = request->get_easy_handle();
569 throw std::logic_error(
"Cannot perform on a null CURLM handle");
572 int running_handles = 0;
577 CURLMsg* msg =
nullptr;
580 msg = curl_multi_info_read(
p.get(), &msgq);
582 if ((msg !=
nullptr) && msg->msg == CURLMSG_DONE)
584 auto* easy = msg->easy_handle;
585 auto result = msg->data.result;
589 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
590 if (request ==
nullptr)
592 curl_multi_remove_handle(
p.get(), easy);
593 throw std::runtime_error(
594 "CURLMSG_DONE received with no associated request data");
596 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
600 curl_multi_remove_handle(
p.get(), easy);
607 return running_handles;
637 uv_timer_t uv_handle{};
639 std::atomic<bool> is_stopping =
false;
646 curl_socket_t socket{};
652 uv_async_t async_requests_handle{};
653 std::mutex requests_mutex;
654 std::deque<std::unique_ptr<CurlRequest>> pending_requests;
656 static void async_requests_callback(uv_async_t* handle)
661 throw std::logic_error(
662 "async_requests_callback called with null self pointer");
665 if (self->is_stopping)
667 LOG_FAIL_FMT(
"async_requests_callback called while stopping");
673 std::deque<std::unique_ptr<CurlRequest>> requests_to_add;
675 std::lock_guard<std::mutex> requests_lock(self->requests_mutex);
676 requests_to_add.swap(self->pending_requests);
679 for (
auto& req : requests_to_add)
681 self->curl_request_curlm.attach_curl_request(std::move(req));
691 throw std::logic_error(
692 "libuv_timeout_callback called with null self pointer");
695 if (self->is_stopping)
697 LOG_FAIL_FMT(
"libuv_timeout_callback called while stopping");
703 int running_handles = 0;
705 curl_multi_socket_action,
706 self->curl_request_curlm,
710 self->curl_request_curlm.perform();
719 throw std::logic_error(
720 "libuv_timeout_callback called with null self pointer");
723 if (self->is_stopping)
725 LOG_FAIL_FMT(
"curl_timeout_callback called while stopping");
734 uv_timer_stop(&self->uv_handle);
740 timeout_ms = std::max(timeout_ms, 1L);
748 uv_poll_t* req,
int status,
int events)
750 auto* socket_context =
static_cast<SocketContextImpl*
>(req->data);
751 if (socket_context ==
nullptr)
753 throw std::logic_error(
754 "libuv_socket_poll_callback called with null request context");
757 auto* self = socket_context->context;
760 throw std::logic_error(
761 "libuv_socket_poll_callback called with null self pointer");
764 if (self->is_stopping)
767 "libuv_socket_poll_callback called on {} while stopped",
768 socket_context->socket);
774 if (status == UV_EBADF)
781 "Socket poll error on {}: {}",
782 socket_context->socket,
783 uv_strerror(status));
788 "Socket poll error on {}: {}",
789 socket_context->socket,
790 uv_strerror(status));
794 int running_handles = 0;
796 curl_multi_socket_action,
797 self->curl_request_curlm,
798 socket_context->socket,
801 self->curl_request_curlm.perform();
806 "Libuv socket poll callback on {}: {}",
807 static_cast<int>(socket_context->socket),
808 static_cast<int>(events));
811 action |= ((events & UV_READABLE) != 0) ? CURL_CSELECT_IN : 0;
812 action |= ((events & UV_WRITABLE) != 0) ? CURL_CSELECT_OUT : 0;
813 int running_handles = 0;
815 curl_multi_socket_action,
816 self->curl_request_curlm,
817 socket_context->socket,
820 self->curl_request_curlm.perform();
829 SocketContextImpl* socket_context)
833 throw std::logic_error(
834 "curl_socket_callback called with null self pointer");
842 case CURL_POLL_INOUT:
845 "Curl socket callback: listen on socket {}, {}",
847 static_cast<int>(action));
850 if (self->is_stopping)
855 if (socket_context ==
nullptr)
857 auto socket_context_ptr = std::make_unique<SocketContextImpl>();
858 socket_context_ptr->context = self;
859 socket_context_ptr->socket = s;
860 uv_poll_init_socket(self->loop, &socket_context_ptr->uv_handle, s);
861 socket_context_ptr->uv_handle.data =
862 socket_context_ptr.get();
864 socket_context = socket_context_ptr.release();
866 curl_multi_assign, self->curl_request_curlm, s, socket_context);
870 events |= (action != CURL_POLL_IN) ? UV_WRITABLE : 0;
871 events |= (action != CURL_POLL_OUT) ? UV_READABLE : 0;
877 case CURL_POLL_REMOVE:
878 if (socket_context !=
nullptr)
881 "CurlmLibuv: curl socket callback: remove socket {}",
882 static_cast<int>(s));
884 uv_poll_stop(&socket_context->uv_handle);
886 curl_multi_assign, self->curl_request_curlm, s,
nullptr);
890 throw std::runtime_error(
"Unknown action in curl_socket_callback");
897 uv_timer_init(loop, &uv_handle);
898 uv_handle.data =
this;
900 uv_async_init(loop, &async_requests_handle, async_requests_callback);
901 async_requests_handle.data =
this;
902 uv_unref(
reinterpret_cast<uv_handle_t*
>(
903 &async_requests_handle));
908 curl_multi_setopt, curl_request_curlm, CURLMOPT_TIMERDATA,
this);
912 CURLMOPT_TIMERFUNCTION,
917 curl_multi_setopt, curl_request_curlm, CURLMOPT_SOCKETDATA,
this);
921 CURLMOPT_SOCKETFUNCTION,
929 LOG_FAIL_FMT(
"CurlmLibuvContext already closed, cannot attach request");
932 LOG_DEBUG_FMT(
"Adding request to {} to queue", request->get_url());
933 std::lock_guard<std::mutex> requests_lock(requests_mutex);
934 pending_requests.push_back(std::move(request));
935 uv_async_send(&async_requests_handle);
941 template <
typename T>
942 friend class ::asynchost::close_ptr;
943 size_t closed_uv_handle_count = 0;
954 "CurlmLibuvContext already closed, nothing to stop or remove");
960 std::unique_ptr<CURL*, void (*)(CURL**)> easy_handles(
961 curl_multi_get_handles(curl_request_curlm),
963 [](CURL** handles) { curl_free(
static_cast<void*
>(handles)); });
965 for (
size_t i = 0; easy_handles.get()[i] !=
nullptr; ++i)
967 auto* easy = easy_handles.get()[i];
968 curl_multi_remove_handle(curl_request_curlm, easy);
973 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
974 if (request ==
nullptr)
977 "CURLMSG_DONE received with no associated request data");
979 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
980 curl_easy_cleanup(easy);
984 std::deque<std::unique_ptr<CurlRequest>> requests_to_cleanup;
986 std::lock_guard<std::mutex> requests_lock(requests_mutex);
987 requests_to_cleanup.swap(pending_requests);
991 reinterpret_cast<uv_handle_t*
>(&async_requests_handle), on_close);
992 uv_close(
reinterpret_cast<uv_handle_t*
>(&uv_handle), on_close);
994 static void on_close(uv_handle_t* handle)
997 ->closed_uv_handle_count;
999 if (close_count >= 2)
1023 static std::unique_ptr<CurlmLibuvContext>& instance()
1025 static std::unique_ptr<CurlmLibuvContext> curlm_libuv_context_instance =
1027 return curlm_libuv_context_instance;
1033 if (instance() ==
nullptr)
1035 throw std::logic_error(
1036 "CurlmLibuvContextSingleton instance not initialized");
1042 if (instance() !=
nullptr)
1044 throw std::logic_error(
1045 "CurlmLibuvContextSingleton instance already initialized");
1047 instance() = std::make_unique<CurlmLibuvContext>(loop);
Definition rest_verb.h:45
std::optional< llhttp_method > get_http_method() const
Definition rest_verb.h:57
const char * c_str() const
Definition rest_verb.h:62
void attach_curl_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:548
int perform()
Definition curl.h:565
CurlRequest(UniqueCURL &&curl_handle_, RESTVerb method_, std::string url_, UniqueSlist &&headers_, std::unique_ptr< RequestBody > &&request_body_, std::unique_ptr< ccf::curl::ResponseBody > &&response_, std::optional< ResponseCallback > &&response_callback_)
Definition curl.h:392
static void synchronous_perform(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:490
CURL * get_easy_handle() const
Definition curl.h:509
UniqueCURL & get_easy_handle_ptr()
Definition curl.h:514
const ResponseHeaders::HeaderMap & get_response_headers() const
Definition curl.h:539
RESTVerb get_method() const
Definition curl.h:519
static void handle_response(std::unique_ptr< CurlRequest > &&request, CURLcode curl_response_code)
Definition curl.h:473
std::string get_url() const
Definition curl.h:524
std::function< void(std::unique_ptr< CurlRequest > &&request, CURLcode curl_response_code, long status_code)> ResponseCallback
Definition curl.h:379
std::unique_ptr< ResponseBody > & get_response_ptr()
Definition curl.h:534
ResponseBody * get_response_body()
Definition curl.h:529
static void libuv_timeout_callback(uv_timer_t *handle)
Definition curl.h:686
static void libuv_socket_poll_callback(uv_poll_t *req, int status, int events)
Definition curl.h:747
CurlmLibuvContextImpl(uv_loop_t *loop)
Definition curl.h:895
static int curl_socket_callback(CURL *easy, curl_socket_t s, int action, CurlmLibuvContextImpl *self, SocketContextImpl *socket_context)
Definition curl.h:824
static int curl_timeout_callback(CURLM *multi, long timeout_ms, CurlmLibuvContextImpl *self)
Definition curl.h:713
void attach_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:925
~CurlmLibuvContextSingleton()
Definition curl.h:1049
CurlmLibuvContextSingleton(CurlmLibuvContextSingleton &&)=default
static CurlmLibuvContext & get_instance()
Definition curl.h:1031
CurlmLibuvContextSingleton(uv_loop_t *loop)
Definition curl.h:1040
CurlmLibuvContextSingleton & operator=(CurlmLibuvContextSingleton &&)=default
CurlmLibuvContextSingleton(const CurlmLibuvContextSingleton &)=delete
CurlmLibuvContextSingleton & operator=(const CurlmLibuvContextSingleton &)=delete
RequestBody(std::vector< uint8_t > &&buffer_)
Definition curl.h:192
RequestBody(nlohmann::json json)
Definition curl.h:197
void attach_to_curl(CURL *curl)
Definition curl.h:219
static size_t send_data(char *ptr, size_t size, size_t nitems, RequestBody *data)
Definition curl.h:205
RequestBody(std::vector< uint8_t > &buffer)
Definition curl.h:187
void attach_to_curl(CURL *curl)
Definition curl.h:267
static size_t write_response_chunk(const uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:244
std::vector< uint8_t > buffer
Definition curl.h:235
static size_t noop_write_function(const uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:277
static void attach_noop_response(CURL *curl)
Definition curl.h:285
ResponseBody(size_t max_size_)
Definition curl.h:242
size_t maximum_size
Definition curl.h:236
UniqueCURLM & operator=(UniqueCURLM &&other) noexcept
Definition curl.h:131
UniqueCURLM()
Definition curl.h:119
UniqueCURLM & operator=(const UniqueCURLM &)=delete
UniqueCURLM(const UniqueCURLM &)=delete
std::unique_ptr< CURLM, void(*)(CURLM *)> p
Definition curl.h:116
UniqueCURLM(UniqueCURLM &&other) noexcept
Definition curl.h:130
CURLM * release()
Definition curl.h:137
void set_blob_opt(auto option, const uint8_t *data, size_t length)
Definition curl.h:85
UniqueCURL(const UniqueCURL &)=delete
void set_opt(auto option, auto value)
Definition curl.h:107
UniqueCURL(UniqueCURL &&other) noexcept
Definition curl.h:71
UniqueCURL & operator=(UniqueCURL &&other) noexcept
Definition curl.h:72
UniqueCURL & operator=(const UniqueCURL &)=delete
UniqueCURL()
Definition curl.h:58
UniqueSlist(UniqueSlist &&other) noexcept
Definition curl.h:158
UniqueSlist(const UniqueSlist &)=delete
curl_slist * get() const
Definition curl.h:175
void append(const char *str)
Definition curl.h:165
UniqueSlist & operator=(const UniqueSlist &)=delete
UniqueSlist()
Definition curl.h:154
void append(const std::string &key, const std::string &value)
Definition curl.h:170
UniqueSlist & operator=(UniqueSlist &&other) noexcept
Definition curl.h:159
#define CHECK_CURL_EASY_SETOPT(handle, info, arg)
Definition curl.h:33
#define CHECK_CURL_MULTI(fn,...)
Definition curl.h:38
#define CHECK_CURL_EASY_GETINFO(handle, info, arg)
Definition curl.h:35
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition json_schema.h:15