Skip to content

Commit

Permalink
apacheGH-43535;only expose one sse_customer_key to the S3Options
Browse files Browse the repository at this point in the history
  • Loading branch information
Hang Zheng committed Sep 20, 2024
1 parent cdcbdd1 commit b5d1c0a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 76 deletions.
134 changes: 68 additions & 66 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <cctype>

#ifdef _WIN32
// Undefine preprocessor macros that interfere with AWS function / method names
Expand Down Expand Up @@ -170,50 +171,10 @@ static constexpr const char kAwsEndpointUrlEnvVar[] = "AWS_ENDPOINT_URL";
static constexpr const char kAwsEndpointUrlS3EnvVar[] = "AWS_ENDPOINT_URL_S3";
static constexpr const char kAwsDirectoryContentType[] = "application/x-directory";

template <typename S3RequestType>
void SetSSECustomerKey(S3RequestType& request, const std::string& sse_customer_algorithm,
const std::string& sse_customer_key,
const std::string& sse_customer_key_md5) {
if (!sse_customer_algorithm.empty()) {
request.SetSSECustomerAlgorithm(sse_customer_algorithm);
}
if (!sse_customer_key.empty()) {
request.SetSSECustomerKey(sse_customer_key);
}
if (!sse_customer_key_md5.empty()) {
request.SetSSECustomerKeyMD5(sse_customer_key_md5);
}
}

// -----------------------------------------------------------------------
// S3ProxyOptions implementation

std::string ComputeMD5Base64(const std::string& base64EncodedKey) {
// Decode the Base64-encoded key to get the raw binary key
Aws::Utils::ByteBuffer rawKey = Aws::Utils::HashingUtils::Base64Decode(base64EncodedKey);

// Convert the raw binary key to an Aws::String
Aws::String rawKeyStr(reinterpret_cast<const char*>(rawKey.GetUnderlyingData()),
rawKey.GetLength());

// Compute the MD5 hash of the raw binary key
Aws::Utils::ByteBuffer md5Hash = Aws::Utils::HashingUtils::CalculateMD5(rawKeyStr);

// Base64-encode the MD5 hash
Aws::String awsEncodedHash = Aws::Utils::HashingUtils::Base64Encode(md5Hash);

// Return the Base64-encoded MD5 hash as a std::string
return std::string(awsEncodedHash.begin(), awsEncodedHash.end());
}

/// Set the SSE-C customized key.
void S3Options::SetSSECKey(const std::string& c_key)
{
sse_customer_algorithm = "AES256";
sse_customer_key = c_key;
sse_customer_key_md5 = ComputeMD5Base64(c_key);
}

Result<S3ProxyOptions> S3ProxyOptions::FromUri(const Uri& uri) {
S3ProxyOptions options;

Expand Down Expand Up @@ -482,16 +443,68 @@ bool S3Options::Equals(const S3Options& other) const {
background_writes == other.background_writes &&
allow_bucket_creation == other.allow_bucket_creation &&
allow_bucket_deletion == other.allow_bucket_deletion &&
sse_customer_key == other.GetSSECKey() &&
sse_customer_algorithm == other.GetSSECAlgorithm() &&
sse_customer_key_md5 == other.GetSSECKeyMD5() &&
sse_customer_key == other.sse_customer_key &&
default_metadata_equals && GetAccessKey() == other.GetAccessKey() &&
GetSecretKey() == other.GetSecretKey() &&
GetSessionToken() == other.GetSessionToken());
}

namespace {

bool ComputeMD5Base64(const std::string& base64EncodedKey,
std::string& base64DecodedResult) {

if (base64EncodedKey.size() < 2) {
return false;
}
// Check if the string contains only valid Base64 characters
for (char c : base64EncodedKey) {
if (!std::isalnum(c) && c != '+' && c != '/' && c != '=') {
return false;
}
}

// Decode the Base64-encoded key to get the raw binary key
Aws::Utils::ByteBuffer rawKey =
Aws::Utils::HashingUtils::Base64Decode(base64EncodedKey);

// the key needs to be // 256 bits(32 bytes)according to https://docs.aws.amazon.com/AmazonS3/latest/userguide/ServerSideEncryptionCustomerKeys.html#specifying-s3-c-encryption
if (rawKey.GetLength() != 32) {
return false;
}

// Convert the raw binary key to an Aws::String
Aws::String rawKeyStr(reinterpret_cast<const char*>(rawKey.GetUnderlyingData()),
rawKey.GetLength());

// Compute the MD5 hash of the raw binary key
Aws::Utils::ByteBuffer md5Hash = Aws::Utils::HashingUtils::CalculateMD5(rawKeyStr);

// Base64-encode the MD5 hash
Aws::String awsEncodedHash = Aws::Utils::HashingUtils::Base64Encode(md5Hash);

// Return the Base64-encoded MD5 hash as a std::string
base64DecodedResult = std::string(awsEncodedHash.begin(), awsEncodedHash.end());
return true;
}

template <typename S3RequestType>
Status SetSSECustomerKey(S3RequestType& request,
const std::string& sse_customer_key) {
if (sse_customer_key.empty()) {
return Status::OK(); // do nothing if the sse_customer_key is not configured
}
std::string sse_customer_key_md5;
if (ComputeMD5Base64(sse_customer_key, sse_customer_key_md5)) {
request.SetSSECustomerKeyMD5(sse_customer_key_md5);
request.SetSSECustomerKey(sse_customer_key);
request.SetSSECustomerAlgorithm("AES256");
return Status::OK();
} else {
return Status::Invalid("sse_customer_key is not a vaild 256-bit base64-encoded encryption key");
}
}

Status ErrorS3Finalized() { return Status::Invalid("S3 subsystem is finalized"); }

Status CheckS3Initialized() {
Expand Down Expand Up @@ -1340,14 +1353,12 @@ Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
const S3Path& path,
const std::string& sse_customer_key,
const std::string& sse_customer_key_md5,
const std::string& sse_customer_algorithm,
int64_t start,
int64_t length, void* out) {
S3Model::GetObjectRequest req;
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));
SetSSECustomerKey(req, sse_customer_algorithm, sse_customer_key, sse_customer_key_md5);
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key));
req.SetRange(ToAwsString(FormatRange(start, length)));
req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
return OutcomeToResult("GetObject", client->GetObject(req));
Expand Down Expand Up @@ -1484,14 +1495,12 @@ bool IsDirectory(std::string_view key, const S3Model::HeadObjectResult& result)
class ObjectInputFile final : public io::RandomAccessFile {
public:
ObjectInputFile(std::shared_ptr<S3ClientHolder> holder, const io::IOContext& io_context,
const S3Path& path, int64_t size = kNoSize, const std::string& c_algorithm = "AES256", const std::string& c_key = "", const std::string& c_key_md5 = "")
const S3Path& path, int64_t size = kNoSize, const std::string& c_key = "")
: holder_(std::move(holder)),
io_context_(io_context),
path_(path),
content_length_(size),
sse_customer_algorithm(c_algorithm),
sse_customer_key(c_key),
sse_customer_key_md5(c_key_md5) {}
sse_customer_key(c_key) {}

Status Init() {
// Issue a HEAD Object to get the content-length and ensure any
Expand All @@ -1504,7 +1513,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
S3Model::HeadObjectRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
SetSSECustomerKey(req, sse_customer_algorithm, sse_customer_key, sse_customer_key_md5);
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key));

ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = client_lock.Move()->HeadObject(req);
Expand Down Expand Up @@ -1591,7 +1600,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
ARROW_ASSIGN_OR_RAISE(
S3Model::GetObjectResult result,
GetObjectRange(client_lock.get(), path_,sse_customer_key, sse_customer_key_md5, sse_customer_algorithm, position, nbytes, out));
GetObjectRange(client_lock.get(), path_,sse_customer_key, position, nbytes, out));

auto& stream = result.GetBody();
stream.ignore(nbytes);
Expand Down Expand Up @@ -1639,9 +1648,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
std::string sse_customer_algorithm;
std::string sse_customer_key;
std::string sse_customer_key_md5;
};

// Upload size per part. While AWS and Minio support different sizes for each
Expand Down Expand Up @@ -1679,9 +1686,7 @@ class ObjectOutputStream final : public io::OutputStream {
default_metadata_(options.default_metadata),
background_writes_(options.background_writes),
allow_delayed_open_(options.allow_delayed_open),
sse_customer_algorithm(options.GetSSECAlgorithm()),
sse_customer_key(options.GetSSECKey()),
sse_customer_key_md5(options.GetSSECKeyMD5()) {}
sse_customer_key(options.sse_customer_key) {}

~ObjectOutputStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
Expand Down Expand Up @@ -1729,7 +1734,7 @@ class ObjectOutputStream final : public io::OutputStream {
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
SetSSECustomerKey(req, sse_customer_algorithm, sse_customer_key, sse_customer_key_md5);
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key));

RETURN_NOT_OK(SetMetadataInRequest(&req));

Expand Down Expand Up @@ -1832,7 +1837,7 @@ class ObjectOutputStream final : public io::OutputStream {
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
SetSSECustomerKey(req, sse_customer_algorithm, sse_customer_key, sse_customer_key_md5);
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key));


req.SetUploadId(multipart_upload_id_);
Expand Down Expand Up @@ -2016,7 +2021,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetKey(ToAwsString(path_.key));
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
req.SetContentLength(nbytes);
SetSSECustomerKey(req, sse_customer_algorithm, sse_customer_key, sse_customer_key_md5);
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key));


if (!background_writes_) {
Expand Down Expand Up @@ -2239,9 +2244,7 @@ class ObjectOutputStream final : public io::OutputStream {
Future<> pending_uploads_completed = Future<>::MakeFinished(Status::OK());
};
std::shared_ptr<UploadState> upload_state_;
std::string sse_customer_algorithm;
std::string sse_customer_key;
std::string sse_customer_key_md5;
};

// This function assumes info->path() is already set
Expand Down Expand Up @@ -3043,8 +3046,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

RETURN_NOT_OK(CheckS3Initialized());

auto ptr = std::make_shared<ObjectInputFile>(holder_, fs->io_context(), path, kNoSize, fs->options().GetSSECAlgorithm(),
fs->options().GetSSECKey(), fs->options().GetSSECKeyMD5());
auto ptr = std::make_shared<ObjectInputFile>(holder_, fs->io_context(), path, kNoSize,
fs->options().sse_customer_key);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand All @@ -3065,8 +3068,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
RETURN_NOT_OK(CheckS3Initialized());

auto ptr =
std::make_shared<ObjectInputFile>(holder_, fs->io_context(), path, info.size(), fs->options().GetSSECAlgorithm(),
fs->options().GetSSECKey(), fs->options().GetSSECKeyMD5());
std::make_shared<ObjectInputFile>(holder_, fs->io_context(), path, info.size(), fs->options().sse_customer_key);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand Down
13 changes: 3 additions & 10 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ struct ARROW_EXPORT S3Options {
/// delay between retries.
std::shared_ptr<S3RetryStrategy> retry_strategy;

/// the SSE-C customized key.
std::string sse_customer_key;

S3Options();

/// Configure with the default AWS credentials provider chain.
Expand Down Expand Up @@ -257,16 +260,6 @@ struct ARROW_EXPORT S3Options {
std::string* out_path = NULLPTR);
static Result<S3Options> FromUri(const std::string& uri,
std::string* out_path = NULLPTR);
/// Set the SSE-C customized key.
void SetSSECKey(const std::string& sse_customer_key);
std::string GetSSECKey() const { return sse_customer_key; }
std::string GetSSECAlgorithm() const { return sse_customer_algorithm; }
std::string GetSSECKeyMD5() const { return sse_customer_key_md5; }

private:
std::string sse_customer_algorithm;
std::string sse_customer_key;
std::string sse_customer_key_md5;
};

/// S3-backed FileSystem implementation.
Expand Down

0 comments on commit b5d1c0a

Please sign in to comment.