Skip to content

Commit

Permalink
[fix] Add the curl wrapper to avoid inconsistent curl options (apache…
Browse files Browse the repository at this point in the history
…#313)

### Motivation

When libcurl is used in `AuthOauth2`, the `CURLOPT_NOSIGNAL` option is
not set, i.e. it will be the default value so that the
`Curl_resolv_timeout` function might crash in multi-threading
environment.

```
apache#2 0xf630 in _L_unlock_13 from /lib64/libpthread.so.0 (0x34)
apache#3 0x2e6c7f in Curl_failf from /usr/local/bin/***/libpulsar.so (0x6f)
apache#4 0x30a285 in Curl_resolv_timeout from /usr/local/bin/***/libpulsar.so (0x95)
```

Since there are many duplicated code when calling curl C APIs, it's hard to
notice that `CURLOPT_NOSIGNAL` is not configured in `AuthOauth2`.

### Modifications

Introduce a `CurlWrapper` class that sets the same options to reduce the
duplicated code and adapting consistent behaviors unless a few options.

(cherry picked from commit 787bfd0)
  • Loading branch information
BewareMyPower committed Sep 14, 2023
1 parent 7424d56 commit 57f848f
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 239 deletions.
180 changes: 180 additions & 0 deletions lib/CurlWrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <assert.h>
#include <curl/curl.h>

#include <string>

namespace pulsar {

struct CurlInitializer {
CurlInitializer() { curl_global_init(CURL_GLOBAL_ALL); }
~CurlInitializer() { curl_global_cleanup(); }
};
static CurlInitializer curlInitializer;

class CurlWrapper {
public:
CurlWrapper() noexcept {}
~CurlWrapper() {
if (handle_) {
curl_easy_cleanup(handle_);
}
}

char* escape(const std::string& s) const {
assert(handle_);
return curl_easy_escape(handle_, s.c_str(), s.length());
}

// It must be called before calling other methods
bool init() {
handle_ = curl_easy_init();
return handle_ != nullptr;
}

struct Options {
std::string method;
std::string postFields;
std::string userAgent;
int timeoutInSeconds{0};
int maxLookupRedirects{-1};
};

struct TlsContext {
std::string trustCertsFilePath;
bool validateHostname{true};
bool allowInsecure{false};
std::string certPath;
std::string keyPath;
};

struct Result {
CURLcode code;
std::string responseData;
long responseCode;
std::string redirectUrl;
std::string error;
std::string serverError;
};

Result get(const std::string& url, const std::string& header, const Options& options,
const TlsContext* tlsContext) const;

private:
CURL* handle_;

struct CurlListGuard {
curl_slist*& headers;

CurlListGuard(curl_slist*& headers) : headers(headers) {}
~CurlListGuard() {
if (headers) {
curl_slist_free_all(headers);
}
}
};
};

inline CurlWrapper::Result CurlWrapper::get(const std::string& url, const std::string& header,
const Options& options, const TlsContext* tlsContext) const {
assert(handle_);
curl_easy_setopt(handle_, CURLOPT_URL, url.c_str());

if (!options.postFields.empty()) {
curl_easy_setopt(handle_, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(handle_, CURLOPT_POSTFIELDS, options.postFields.c_str());
}

// Write response
curl_easy_setopt(
handle_, CURLOPT_WRITEFUNCTION,
+[](char* buffer, size_t size, size_t nitems, void* outstream) -> size_t {
static_cast<std::string*>(outstream)->append(buffer, size * nitems);
return size * nitems;
});
std::string response;
curl_easy_setopt(handle_, CURLOPT_WRITEDATA, &response);

// New connection is made for each call
curl_easy_setopt(handle_, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle_, CURLOPT_FORBID_REUSE, 1L);

// Skipping signal handling - results in timeouts not honored during the DNS lookup
// Without this config, Curl_resolv_timeout might crash in multi-threads environment
curl_easy_setopt(handle_, CURLOPT_NOSIGNAL, 1L);

curl_easy_setopt(handle_, CURLOPT_TIMEOUT, options.timeoutInSeconds);
if (!options.userAgent.empty()) {
curl_easy_setopt(handle_, CURLOPT_USERAGENT, options.userAgent.c_str());
}
curl_easy_setopt(handle_, CURLOPT_FAILONERROR, 1L);

// Redirects
curl_easy_setopt(handle_, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(handle_, CURLOPT_MAXREDIRS, options.maxLookupRedirects);

char errorBuffer[CURL_ERROR_SIZE] = "";
curl_easy_setopt(handle_, CURLOPT_ERRORBUFFER, errorBuffer);

curl_slist* headers = nullptr;
CurlListGuard headersGuard{headers};
if (!header.empty()) {
headers = curl_slist_append(headers, header.c_str());
curl_easy_setopt(handle_, CURLOPT_HTTPHEADER, headers);
}

if (tlsContext) {
CURLcode code;
code = curl_easy_setopt(handle_, CURLOPT_SSLENGINE, nullptr);
if (code != CURLE_OK) {
return {code, "", -1, "",
"Unable to load SSL engine for url " + url + ": " + curl_easy_strerror(code)};
}
code = curl_easy_setopt(handle_, CURLOPT_SSLENGINE_DEFAULT, 1L);
if (code != CURLE_OK) {
return {code, "", -1, "",
"Unable to load SSL engine as default for url " + url + ": " + curl_easy_strerror(code)};
}
curl_easy_setopt(handle_, CURLOPT_SSL_VERIFYHOST, tlsContext->validateHostname ? 1L : 0L);
curl_easy_setopt(handle_, CURLOPT_SSL_VERIFYPEER, tlsContext->allowInsecure ? 0L : 1L);
if (!tlsContext->trustCertsFilePath.empty()) {
curl_easy_setopt(handle_, CURLOPT_CAINFO, tlsContext->trustCertsFilePath.c_str());
}
if (!tlsContext->certPath.empty() && !tlsContext->keyPath.empty()) {
curl_easy_setopt(handle_, CURLOPT_SSLCERT, tlsContext->certPath.c_str());
curl_easy_setopt(handle_, CURLOPT_SSLKEY, tlsContext->keyPath.c_str());
}
}

auto res = curl_easy_perform(handle_);
long responseCode;
curl_easy_getinfo(handle_, CURLINFO_RESPONSE_CODE, &responseCode);
Result result{res, response, responseCode, "", "", std::string(errorBuffer)};
if (responseCode == 307 || responseCode == 302 || responseCode == 301) {
char* url;
curl_easy_getinfo(handle_, CURLINFO_REDIRECT_URL, &url);
result.redirectUrl = url;
}
return result;
}

} // namespace pulsar
137 changes: 33 additions & 104 deletions lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
#include "HTTPLookupService.h"

#include <curl/curl.h>
#include <pulsar/Version.h>

#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

#include "CurlWrapper.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "NamespaceName.h"
Expand All @@ -45,16 +45,6 @@ const static int MAX_HTTP_REDIRECTS = 20;
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;

static inline bool needRedirection(long code) { return (code == 307 || code == 302 || code == 301); }

HTTPLookupService::CurlInitializer::CurlInitializer() {
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
curl_global_init(CURL_GLOBAL_ALL);
}
HTTPLookupService::CurlInitializer::~CurlInitializer() { curl_global_cleanup(); }

HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;

HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
Expand Down Expand Up @@ -143,11 +133,6 @@ Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync(
return promise.getFuture();
}

static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) {
((std::string *)responseDataPtr)->append((char *)contents, size * nmemb);
return size * nmemb;
}

void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise,
const std::string completeUrl) {
std::string responseData;
Expand All @@ -164,115 +149,60 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
uint16_t reqCount = 0;
Result retResult = ResultOk;
while (++reqCount <= MAX_HTTP_REDIRECTS) {
CURL *handle;
CURLcode res;
std::string version = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR;
handle = curl_easy_init();

if (!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
// No curl_easy_cleanup required since handle not initialized
return ResultLookupError;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());

// Write callback
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);

// New connection is made for each call
curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);

// Skipping signal handling - results in timeouts not honored during the DNS lookup
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);

// Timer
curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);

// Set User Agent
curl_easy_setopt(handle, CURLOPT_USERAGENT, version.c_str());

// Fail if HTTP return code >=400
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);

// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("Failed to getAuthData: " << authResult);
curl_easy_cleanup(handle);
return authResult;
}
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str());

CurlWrapper curl;
if (!curl.init()) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
return ResultLookupError;
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);

// TLS
std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
if (isUseTls_) {
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE, NULL) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine for url " << completeUrl);
curl_easy_cleanup(handle);
return ResultConnectError;
}
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE_DEFAULT, 1L) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine as default, for url " << completeUrl);
curl_easy_cleanup(handle);
return ResultConnectError;
}
curl_easy_setopt(handle, CURLOPT_SSLCERTTYPE, "PEM");

if (tlsAllowInsecure_) {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
} else {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
}

if (!tlsTrustCertsFilePath_.empty()) {
curl_easy_setopt(handle, CURLOPT_CAINFO, tlsTrustCertsFilePath_.c_str());
}

curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, tlsValidateHostname_ ? 1L : 0L);

tlsContext.reset(new CurlWrapper::TlsContext);
tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_;
tlsContext->validateHostname = tlsValidateHostname_;
tlsContext->allowInsecure = tlsAllowInsecure_;
if (authDataContent->hasDataForTls()) {
curl_easy_setopt(handle, CURLOPT_SSLCERT, authDataContent->getTlsCertificates().c_str());
curl_easy_setopt(handle, CURLOPT_SSLKEY, authDataContent->getTlsPrivateKey().c_str());
tlsContext->certPath = authDataContent->getTlsCertificates();
tlsContext->keyPath = authDataContent->getTlsPrivateKey();
} else {
if (!tlsPrivateFilePath_.empty() && !tlsCertificateFilePath_.empty()) {
curl_easy_setopt(handle, CURLOPT_SSLCERT, tlsCertificateFilePath_.c_str());
curl_easy_setopt(handle, CURLOPT_SSLKEY, tlsPrivateFilePath_.c_str());
}
tlsContext->certPath = tlsCertificateFilePath_;
tlsContext->keyPath = tlsPrivateFilePath_;
}
}

LOG_INFO("Curl [" << reqCount << "] Lookup Request sent for " << completeUrl);
CurlWrapper::Options options;
options.timeoutInSeconds = lookupTimeoutInSeconds_;
options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR;
options.maxLookupRedirects = 1; // redirection is implemented by the outer loop
auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get());
const auto &error = result.error;
if (!error.empty()) {
LOG_ERROR(completeUrl << " failed: " << error);
return ResultConnectError;
}

// Make get call to server
res = curl_easy_perform(handle);

long response_code = -1;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " response_code " << response_code
<< " curl res " << res);

// Free header list
curl_slist_free_all(list);
responseData = result.responseData;
long response_code = result.responseCode;
auto res = result.code;

const auto &redirectUrl = result.redirectUrl;
switch (res) {
case CURLE_OK:
long response_code;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " code " << response_code);
if (response_code == 200) {
retResult = ResultOk;
} else if (needRedirection(response_code)) {
char *url = NULL;
curl_easy_getinfo(handle, CURLINFO_REDIRECT_URL, &url);
LOG_INFO("Response from url " << completeUrl << " to new url " << url);
completeUrl = url;
} else if (!redirectUrl.empty()) {
LOG_INFO("Response from url " << completeUrl << " to new url " << redirectUrl);
completeUrl = redirectUrl;
retResult = ResultLookupError;
} else {
retResult = ResultLookupError;
Expand Down Expand Up @@ -301,8 +231,7 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
retResult = ResultLookupError;
break;
}
curl_easy_cleanup(handle);
if (!needRedirection(response_code)) {
if (redirectUrl.empty()) {
break;
}
}
Expand Down
7 changes: 0 additions & 7 deletions lib/HTTPLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;

class HTTPLookupService : public LookupService, public std::enable_shared_from_this<HTTPLookupService> {
class CurlInitializer {
public:
CurlInitializer();
~CurlInitializer();
};
static CurlInitializer curlInitializer;

enum RequestType
{
Lookup,
Expand Down
Loading

0 comments on commit 57f848f

Please sign in to comment.