Skip to content

Commit

Permalink
ARROW-14523: [C++] Fix potential data loss in S3 multipart upload
Browse files Browse the repository at this point in the history
Work around a critical bug in the AWS SDK for C++ that fails to detect errors returned by CompleteMultipartUpload in the body of a 200 OK response:
aws/aws-sdk-cpp#658

Closes #11594 from pitrou/ARROW-14523-s3-cmu-error-fix

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou committed Nov 4, 2021
1 parent 0ead7c9 commit 3626a08
Showing 1 changed file with 106 additions and 8 deletions.
114 changes: 106 additions & 8 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <sstream>
#include <thread>
#include <unordered_map>
#include <utility>

Expand All @@ -41,10 +44,12 @@
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/auth/STSCredentialsProvider.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/client/RetryStrategy.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
#include <aws/core/utils/xml/XmlSerializer.h>
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
Expand Down Expand Up @@ -563,6 +568,98 @@ class S3Client : public Aws::S3::S3Client {
req.SetBucket(ToAwsString(bucket));
return GetBucketRegion(req);
}

S3Model::CompleteMultipartUploadOutcome CompleteMultipartUploadWithErrorFixup(
S3Model::CompleteMultipartUploadRequest&& request) const {
// CompletedMultipartUpload can return a 200 OK response with an error
// encoded in the response body, in which case we should either retry
// or propagate the error to the user (see
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html).
//
// Unfortunately the AWS SDK doesn't detect such situations but lets them
// return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658).
//
// We work around the issue by registering a DataReceivedEventHandler
// which parses the XML response for embedded errors.

util::optional<AWSError<Aws::Client::CoreErrors>> aws_error;

auto handler = [&](const Aws::Http::HttpRequest* http_req,
Aws::Http::HttpResponse* http_resp,
long long) { // NOLINT runtime/int
auto& stream = http_resp->GetResponseBody();
const auto pos = stream.tellg();
const auto doc = Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream);
// Rewind stream for later
stream.clear();
stream.seekg(pos);

if (doc.WasParseSuccessful()) {
auto root = doc.GetRootElement();
if (!root.IsNull()) {
// Detect something that looks like an abnormal CompletedMultipartUpload
// response.
if (root.GetName() != "CompleteMultipartUploadResult" ||
!root.FirstChild("Error").IsNull() || !root.FirstChild("Errors").IsNull()) {
// Make sure the error marshaller doesn't see a 200 OK
http_resp->SetResponseCode(
Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
aws_error = GetErrorMarshaller()->Marshall(*http_resp);
// Rewind stream for later
stream.clear();
stream.seekg(pos);
}
}
}
};

request.SetDataReceivedEventHandler(std::move(handler));

// We don't have access to the configured AWS retry strategy
// (m_retryStrategy is a private member of AwsClient), so don't use that.
std::unique_ptr<Aws::Client::RetryStrategy> retry_strategy;
if (s3_retry_strategy_) {
retry_strategy.reset(new WrappedRetryStrategy(s3_retry_strategy_));
} else {
// Note that DefaultRetryStrategy, unlike StandardRetryStrategy,
// has empty definitions for RequestBookkeeping() and GetSendToken(),
// which simplifies the code below.
retry_strategy.reset(new Aws::Client::DefaultRetryStrategy());
}

for (int32_t retries = 0;; retries++) {
aws_error.reset();
auto outcome = Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request);
if (!outcome.IsSuccess()) {
// Error returned in HTTP headers (or client failure)
return outcome;
}
if (!aws_error.has_value()) {
// Genuinely successful outcome
return outcome;
}

const bool should_retry = retry_strategy->ShouldRetry(*aws_error, retries);

ARROW_LOG(WARNING)
<< "CompletedMultipartUpload got error embedded in a 200 OK response: "
<< aws_error->GetExceptionName() << " (\"" << aws_error->GetMessage()
<< "\"), retry = " << should_retry;

if (!should_retry) {
break;
}
const auto delay = std::chrono::milliseconds(
retry_strategy->CalculateDelayBeforeNextRetry(*aws_error, retries));
std::this_thread::sleep_for(delay);
}

DCHECK(aws_error.has_value());
auto s3_error = AWSError<S3Errors>(std::move(aws_error).value());
return S3Model::CompleteMultipartUploadOutcome(std::move(s3_error));
}

std::shared_ptr<S3RetryStrategy> s3_retry_strategy_;
};

// In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a bool.
Expand Down Expand Up @@ -617,7 +714,7 @@ class ClientBuilder {

const bool use_virtual_addressing = options_.endpoint_override.empty();

/// Set proxy options if provided
// Set proxy options if provided
if (!options_.proxy_options.scheme.empty()) {
if (options_.proxy_options.scheme == "http") {
client_config_.proxyScheme = Aws::Http::Scheme::HTTP;
Expand All @@ -641,10 +738,12 @@ class ClientBuilder {
client_config_.proxyPassword = ToAwsString(options_.proxy_options.password);
}

return std::make_shared<S3Client>(
auto client = std::make_shared<S3Client>(
credentials_provider_, client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing);
client->s3_retry_strategy_ = options_.retry_strategy;
return client;
}

const S3Options& options() const { return options_; }
Expand Down Expand Up @@ -1021,9 +1120,8 @@ class ObjectOutputStream final : public io::OutputStream {
struct UploadState;

public:
ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client,
const io::IOContext& io_context, const S3Path& path,
const S3Options& options,
ObjectOutputStream(std::shared_ptr<S3Client> client, const io::IOContext& io_context,
const S3Path& path, const S3Options& options,
const std::shared_ptr<const KeyValueMetadata>& metadata)
: client_(std::move(client)),
io_context_(io_context),
Expand Down Expand Up @@ -1118,7 +1216,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetUploadId(upload_id_);
req.SetMultipartUpload(std::move(completed_upload));

auto outcome = client_->CompleteMultipartUpload(req);
auto outcome = client_->CompleteMultipartUploadWithErrorFixup(std::move(req));
if (!outcome.IsSuccess()) {
return ErrorToStatus(
std::forward_as_tuple("When completing multiple part upload for key '",
Expand Down Expand Up @@ -1314,7 +1412,7 @@ class ObjectOutputStream final : public io::OutputStream {
}

protected:
std::shared_ptr<Aws::S3::S3Client> client_;
std::shared_ptr<S3Client> client_;
const io::IOContext io_context_;
const S3Path path_;
const std::shared_ptr<const KeyValueMetadata> metadata_;
Expand Down Expand Up @@ -1503,7 +1601,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
public:
ClientBuilder builder_;
io::IOContext io_context_;
std::shared_ptr<Aws::S3::S3Client> client_;
std::shared_ptr<S3Client> client_;
util::optional<S3Backend> backend_;

const int32_t kListObjectsMaxKeys = 1000;
Expand Down

0 comments on commit 3626a08

Please sign in to comment.