13#include <curl/multi.h>
22#define CHECK_CURL_EASY(fn, ...) \
25 const auto res = fn(__VA_ARGS__); \
26 if (res != CURLE_OK) \
28 throw std::runtime_error(fmt::format( \
29 "Error calling " #fn ": {} ({})", res, curl_easy_strerror(res))); \
33#define CHECK_CURL_EASY_SETOPT(handle, info, arg) \
34 CHECK_CURL_EASY(curl_easy_setopt, handle, info, arg)
35#define CHECK_CURL_EASY_GETINFO(handle, info, arg) \
36 CHECK_CURL_EASY(curl_easy_getinfo, handle, info, arg)
38#define CHECK_CURL_MULTI(fn, ...) \
41 const auto res = fn(__VA_ARGS__); \
42 if (res != CURLM_OK) \
44 throw std::runtime_error(fmt::format( \
45 "Error calling " #fn ": {} ({})", res, curl_multi_strerror(res))); \
55 std::unique_ptr<CURL, void (*)(CURL*)> p;
58 UniqueCURL() : p(curl_easy_init(), [](auto x) { curl_easy_cleanup(x); })
62 throw std::runtime_error(
"Error initialising curl easy request");
66 operator CURL*()
const
73 if (data ==
nullptr || length == 0)
75 throw std::invalid_argument(
76 "Data pointer cannot be null or length zero");
81 throw std::logic_error(
"Cannot set option on a null CURL handle");
86 .data =
const_cast<uint8_t*
>(data), .len = length,
87 .flags = CURL_BLOB_COPY,
102 std::unique_ptr<CURLM, void (*)(CURLM*)>
p;
105 UniqueCURLM() :
p(curl_multi_init(), [](auto x) { curl_multi_cleanup(x); })
109 throw std::runtime_error(
"Error initialising curl multi request");
119 p = std::move(other.p);
128 operator CURLM*()
const
137 std::unique_ptr<curl_slist, void (*)(curl_slist*)> p;
140 UniqueSlist() : p(nullptr, [](auto x) { curl_slist_free_all(x); }) {}
147 p = std::move(other.p);
153 p.reset(curl_slist_append(p.release(), str));
156 void append(
const std::string& key,
const std::string& value)
158 append(fmt::format(
"{}: {}", key, value).c_str());
161 [[nodiscard]] curl_slist*
get()
const
169 std::vector<uint8_t> buffer;
170 std::span<const uint8_t> unsent;
175 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
180 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
185 auto json_str =
json.dump();
186 buffer = std::vector<uint8_t>(
187 json_str.begin(), json_str.end());
188 unsent = std::span<const uint8_t>(buffer.data(), buffer.size());
192 char* ptr,
size_t size,
size_t nitems,
RequestBody* data)
199 auto bytes_to_copy = std::min(data->unsent.size(), size * nitems);
200 memcpy(ptr, data->unsent.data(), bytes_to_copy);
201 data->unsent = data->unsent.subspan(bytes_to_copy);
202 return bytes_to_copy;
209 throw std::logic_error(
210 "Cannot attach request body to a null CURL handle");
231 uint8_t* ptr,
size_t size,
size_t nmemb,
ResponseBody* response)
233 if (response ==
nullptr)
236 "write_response_chunk called with a null response pointer");
237 return CURL_WRITEFUNC_ERROR;
239 auto bytes_to_copy = size * nmemb;
243 "Response size limit exceeded: {} bytes, maximum is {} bytes",
244 response->
buffer.size() + bytes_to_copy,
246 return CURL_WRITEFUNC_ERROR;
249 response->
buffer.insert(response->
buffer.end(), ptr, ptr + bytes_to_copy);
250 return bytes_to_copy;
257 throw std::logic_error(
"Cannot attach response to a null CURL handle");
264 uint8_t* ptr,
size_t size,
size_t nmemb,
ResponseBody* response)
275 throw std::logic_error(
276 "Cannot attach noop response to a null CURL handle");
286 using HeaderMap = std::unordered_map<std::string, std::string>;
293 if (response ==
nullptr)
295 LOG_FAIL_FMT(
"recv_header_line called with a null response pointer");
298 auto bytes_to_read = size * nitems;
299 std::string_view header(buffer, bytes_to_read);
302 header = ccf::nonstd::trim(header);
306 static const std::regex http_status_line_regex(R
"(^HTTP\/[1-9]+.*)");
310 if (!std::regex_match(std::string(header), http_status_line_regex))
313 "Expected HTTP status line as first header, got '{}'", header);
314 return bytes_to_read;
322 const auto [field, value] = ccf::nonstd::split_1(header,
": ");
325 std::string field_str(field);
326 nonstd::to_lower(field_str);
327 if (response->
data.contains(field_str))
329 auto current = response->
data[field_str];
331 "Duplicate header for '{}', current = '{}', new = '{}'",
336 response->
data[field_str] = ccf::nonstd::trim(value);
340 LOG_INFO_FMT(
"Ignoring invalid-looking HTTP Header '{}'", header);
345 return bytes_to_read;
352 throw std::logic_error(
"Cannot attach response to a null CURL handle");
363 CurlRequest& request, CURLcode curl_response_code,
long status_code)>;
370 std::unique_ptr<ccf::curl::RequestBody> request_body;
371 std::unique_ptr<ccf::curl::ResponseBody> response;
373 std::optional<ResponseCallback> response_callback;
381 std::unique_ptr<RequestBody>&& request_body_,
382 std::unique_ptr<ccf::curl::ResponseBody>&& response_,
383 std::optional<ResponseCallback>&& response_callback_) :
384 curl_handle(
std::move(curl_handle_)),
386 url(
std::move(url_)),
387 headers(
std::move(headers_)),
388 request_body(
std::move(request_body_)),
389 response(
std::move(response_)),
390 response_callback(
std::move(response_callback_))
394 throw std::invalid_argument(
"URL cannot be empty");
400 throw std::logic_error(
401 fmt::format(
"Unsupported HTTP method: {}", method.
c_str()));
415 if (request_body ==
nullptr)
420 std::make_unique<RequestBody>(std::vector<uint8_t>());
430 throw std::logic_error(
431 fmt::format(
"Unsupported HTTP method: {}", method.
c_str()));
434 if (request_body !=
nullptr)
436 request_body->attach_to_curl(curl_handle);
439 if (response !=
nullptr)
441 response->attach_to_curl(curl_handle);
450 if (headers.
get() !=
nullptr)
459 if (response_callback.has_value())
461 long status_code = 0;
463 curl_handle, CURLINFO_RESPONSE_CODE, &status_code);
464 response_callback.value()(*
this, curl_response_code, status_code);
470 if (curl_handle ==
nullptr)
472 throw std::logic_error(
473 "Cannot curl_easy_perform on a null CURL handle");
476 curl_code = curl_easy_perform(curl_handle);
481 curl_handle, CURLINFO_RESPONSE_CODE, &status_code);
501 return response.get();
511 return response_headers;
522 throw std::logic_error(
523 "Cannot attach CurlRequest to a null CURLM handle");
525 if (request ==
nullptr)
527 throw std::logic_error(
"Cannot attach a null CurlRequest");
529 LOG_TRACE_FMT(
"Attaching CurlRequest to {} to Curlm", request->get_url());
530 CURL* curl_handle = request->get_easy_handle();
539 throw std::logic_error(
"Cannot perform on a null CURLM handle");
542 int running_handles = 0;
547 CURLMsg* msg =
nullptr;
550 msg = curl_multi_info_read(
p.get(), &msgq);
552 if ((msg !=
nullptr) && msg->msg == CURLMSG_DONE)
554 auto* easy = msg->easy_handle;
555 auto result = msg->data.result;
559 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
560 if (request ==
nullptr)
562 curl_multi_remove_handle(
p.get(), easy);
563 throw std::runtime_error(
564 "CURLMSG_DONE received with no associated request data");
566 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
570 curl_multi_remove_handle(
p.get(), easy);
574 return running_handles;
604 uv_timer_t uv_handle{};
606 std::atomic<bool> is_stopping =
false;
613 curl_socket_t socket{};
619 uv_async_t async_requests_handle{};
620 std::mutex requests_mutex;
621 std::deque<std::unique_ptr<CurlRequest>> pending_requests;
623 static void async_requests_callback(uv_async_t* handle)
628 throw std::logic_error(
629 "async_requests_callback called with null self pointer");
632 if (self->is_stopping)
634 LOG_FAIL_FMT(
"async_requests_callback called while stopping");
640 std::deque<std::unique_ptr<CurlRequest>> requests_to_add;
642 std::lock_guard<std::mutex> requests_lock(self->requests_mutex);
643 requests_to_add.swap(self->pending_requests);
646 for (
auto& req : requests_to_add)
648 self->curl_request_curlm.attach_curl_request(std::move(req));
658 throw std::logic_error(
659 "libuv_timeout_callback called with null self pointer");
662 if (self->is_stopping)
664 LOG_FAIL_FMT(
"libuv_timeout_callback called while stopping");
670 int running_handles = 0;
672 curl_multi_socket_action,
673 self->curl_request_curlm,
677 self->curl_request_curlm.perform();
686 throw std::logic_error(
687 "libuv_timeout_callback called with null self pointer");
690 if (self->is_stopping)
692 LOG_FAIL_FMT(
"curl_timeout_callback called while stopping");
701 uv_timer_stop(&self->uv_handle);
707 timeout_ms = std::max(timeout_ms, 1L);
715 uv_poll_t* req,
int status,
int events)
719 LOG_FAIL_FMT(
"Socket poll error: {}", uv_strerror(status));
723 auto* socket_context =
static_cast<SocketContextImpl*
>(req->data);
724 if (socket_context ==
nullptr)
726 throw std::logic_error(
727 "libuv_socket_poll_callback called with null request context");
730 auto* self = socket_context->context;
733 throw std::logic_error(
734 "libuv_socket_poll_callback called with null self pointer");
737 if (self->is_stopping)
739 LOG_FAIL_FMT(
"libuv_socket_poll_callback called while stopping");
744 "Libuv socket poll callback on {}: {}",
745 static_cast<int>(socket_context->socket),
746 static_cast<int>(events));
749 action |= ((events & UV_READABLE) != 0) ? CURL_CSELECT_IN : 0;
750 action |= ((events & UV_WRITABLE) != 0) ? CURL_CSELECT_OUT : 0;
751 int running_handles = 0;
753 curl_multi_socket_action,
754 self->curl_request_curlm,
755 socket_context->socket,
758 self->curl_request_curlm.perform();
767 SocketContextImpl* socket_context)
771 throw std::logic_error(
772 "curl_socket_callback called with null self pointer");
780 case CURL_POLL_INOUT:
783 if (self->is_stopping)
785 LOG_FAIL_FMT(
"curl_socket_callback called while stopping");
790 "Curl socket callback: listen on socket {}",
static_cast<int>(s));
791 if (socket_context ==
nullptr)
793 auto socket_context_ptr = std::make_unique<SocketContextImpl>();
794 socket_context_ptr->context = self;
795 socket_context_ptr->socket = s;
796 uv_poll_init_socket(self->loop, &socket_context_ptr->uv_handle, s);
797 socket_context_ptr->uv_handle.data =
798 socket_context_ptr.get();
800 socket_context = socket_context_ptr.release();
802 curl_multi_assign, self->curl_request_curlm, s, socket_context);
806 events |= (action != CURL_POLL_IN) ? UV_WRITABLE : 0;
807 events |= (action != CURL_POLL_OUT) ? UV_READABLE : 0;
813 case CURL_POLL_REMOVE:
814 if (socket_context !=
nullptr)
817 "CurlmLibuv: curl socket callback: remove socket {}",
818 static_cast<int>(s));
820 uv_poll_stop(&socket_context->uv_handle);
822 curl_multi_assign, self->curl_request_curlm, s,
nullptr);
826 throw std::runtime_error(
"Unknown action in curl_socket_callback");
833 uv_timer_init(loop, &uv_handle);
834 uv_handle.data =
this;
836 uv_async_init(loop, &async_requests_handle, async_requests_callback);
837 async_requests_handle.data =
this;
838 uv_unref(
reinterpret_cast<uv_handle_t*
>(
839 &async_requests_handle));
844 curl_multi_setopt, curl_request_curlm, CURLMOPT_TIMERDATA,
this);
848 CURLMOPT_TIMERFUNCTION,
853 curl_multi_setopt, curl_request_curlm, CURLMOPT_SOCKETDATA,
this);
857 CURLMOPT_SOCKETFUNCTION,
865 LOG_FAIL_FMT(
"CurlmLibuvContext already closed, cannot attach request");
868 LOG_INFO_FMT(
"Adding request to {} to queue", request->get_url());
869 std::lock_guard<std::mutex> requests_lock(requests_mutex);
870 pending_requests.push_back(std::move(request));
871 uv_async_send(&async_requests_handle);
877 template <
typename T>
878 friend class ::asynchost::close_ptr;
879 size_t closed_uv_handle_count = 0;
890 "CurlmLibuvContext already closed, nothing to stop or remove");
896 std::unique_ptr<CURL*, void (*)(CURL**)> easy_handles(
897 curl_multi_get_handles(curl_request_curlm),
898 [](CURL** handles) { curl_free(handles); });
900 for (
size_t i = 0; easy_handles.get()[i] !=
nullptr; ++i)
902 auto* easy = easy_handles.get()[i];
903 curl_multi_remove_handle(curl_request_curlm, easy);
908 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &request);
909 if (request ==
nullptr)
912 "CURLMSG_DONE received with no associated request data");
914 std::unique_ptr<ccf::curl::CurlRequest> request_data_ptr(request);
915 curl_easy_cleanup(easy);
919 std::deque<std::unique_ptr<CurlRequest>> requests_to_cleanup;
921 std::lock_guard<std::mutex> requests_lock(requests_mutex);
922 requests_to_cleanup.swap(pending_requests);
926 reinterpret_cast<uv_handle_t*
>(&async_requests_handle), on_close);
927 uv_close(
reinterpret_cast<uv_handle_t*
>(&uv_handle), on_close);
929 static void on_close(uv_handle_t* handle)
932 ->closed_uv_handle_count;
934 if (close_count >= 2)
958 static std::unique_ptr<CurlmLibuvContext>& instance()
960 static std::unique_ptr<CurlmLibuvContext> curlm_libuv_context_instance =
962 return curlm_libuv_context_instance;
968 if (instance() ==
nullptr)
970 throw std::logic_error(
971 "CurlmLibuvContextSingleton instance not initialized");
977 if (instance() !=
nullptr)
979 throw std::logic_error(
980 "CurlmLibuvContextSingleton instance already initialized");
982 instance() = std::make_unique<CurlmLibuvContext>(loop);
Definition rest_verb.h:45
std::optional< llhttp_method > get_http_method() const
Definition rest_verb.h:57
const char * c_str() const
Definition rest_verb.h:62
void attach_curl_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:518
int perform()
Definition curl.h:535
void handle_response(CURLcode curl_response_code)
Definition curl.h:456
CurlRequest(UniqueCURL &&curl_handle_, RESTVerb method_, std::string &&url_, UniqueSlist &&headers_, std::unique_ptr< RequestBody > &&request_body_, std::unique_ptr< ccf::curl::ResponseBody > &&response_, std::optional< ResponseCallback > &&response_callback_)
Definition curl.h:376
CURL * get_easy_handle() const
Definition curl.h:484
std::function< void(CurlRequest &request, CURLcode curl_response_code, long status_code)> ResponseCallback
Definition curl.h:363
RESTVerb get_method() const
Definition curl.h:489
void synchronous_perform(CURLcode &curl_code, long &status_code)
Definition curl.h:468
std::string get_url() const
Definition curl.h:494
ResponseHeaders & get_response_headers()
Definition curl.h:509
std::unique_ptr< ResponseBody > & get_response_ptr()
Definition curl.h:504
ResponseBody * get_response_body()
Definition curl.h:499
static void libuv_timeout_callback(uv_timer_t *handle)
Definition curl.h:653
static void libuv_socket_poll_callback(uv_poll_t *req, int status, int events)
Definition curl.h:714
CurlmLibuvContextImpl(uv_loop_t *loop)
Definition curl.h:831
static int curl_socket_callback(CURL *easy, curl_socket_t s, int action, CurlmLibuvContextImpl *self, SocketContextImpl *socket_context)
Definition curl.h:762
static int curl_timeout_callback(CURLM *multi, long timeout_ms, CurlmLibuvContextImpl *self)
Definition curl.h:680
void attach_request(std::unique_ptr< CurlRequest > &&request)
Definition curl.h:861
~CurlmLibuvContextSingleton()
Definition curl.h:984
CurlmLibuvContextSingleton(CurlmLibuvContextSingleton &&)=default
static CurlmLibuvContext & get_instance()
Definition curl.h:966
CurlmLibuvContextSingleton(uv_loop_t *loop)
Definition curl.h:975
CurlmLibuvContextSingleton & operator=(CurlmLibuvContextSingleton &&)=default
CurlmLibuvContextSingleton(const CurlmLibuvContextSingleton &)=delete
CurlmLibuvContextSingleton & operator=(const CurlmLibuvContextSingleton &)=delete
RequestBody(nlohmann::json json)
Definition curl.h:183
void attach_to_curl(CURL *curl)
Definition curl.h:205
static size_t send_data(char *ptr, size_t size, size_t nitems, RequestBody *data)
Definition curl.h:191
RequestBody(std::vector< uint8_t > &buffer)
Definition curl.h:173
RequestBody(std::vector< uint8_t > &&buffer)
Definition curl.h:178
void attach_to_curl(CURL *curl)
Definition curl.h:253
std::vector< uint8_t > buffer
Definition curl.h:221
static void attach_noop_response(CURL *curl)
Definition curl.h:271
static size_t write_response_chunk(uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:230
static size_t noop_write_function(uint8_t *ptr, size_t size, size_t nmemb, ResponseBody *response)
Definition curl.h:263
ResponseBody(size_t max_size_)
Definition curl.h:228
size_t maximum_size
Definition curl.h:222
UniqueCURLM & operator=(UniqueCURLM &&other) noexcept
Definition curl.h:117
UniqueCURLM()
Definition curl.h:105
UniqueCURLM & operator=(const UniqueCURLM &)=delete
UniqueCURLM(const UniqueCURLM &)=delete
std::unique_ptr< CURLM, void(*)(CURLM *)> p
Definition curl.h:102
UniqueCURLM(UniqueCURLM &&other) noexcept
Definition curl.h:116
CURLM * release()
Definition curl.h:123
void set_blob_opt(auto option, const uint8_t *data, size_t length)
Definition curl.h:71
void set_opt(auto option, auto value)
Definition curl.h:93
UniqueCURL()
Definition curl.h:58
UniqueSlist(UniqueSlist &&other) noexcept
Definition curl.h:144
UniqueSlist(const UniqueSlist &)=delete
curl_slist * get() const
Definition curl.h:161
void append(const char *str)
Definition curl.h:151
UniqueSlist & operator=(const UniqueSlist &)=delete
UniqueSlist()
Definition curl.h:140
void append(const std::string &key, const std::string &value)
Definition curl.h:156
UniqueSlist & operator=(UniqueSlist &&other) noexcept
Definition curl.h:145
#define CHECK_CURL_EASY_SETOPT(handle, info, arg)
Definition curl.h:33
#define CHECK_CURL_MULTI(fn,...)
Definition curl.h:38
#define CHECK_CURL_EASY_GETINFO(handle, info, arg)
Definition curl.h:35
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_FAIL_FMT
Definition logger.h:363
Definition json_schema.h:15