Skip to content

Commit

Permalink
ARROW-17057: [Python] S3FileSystem has no parameter for retry strategy (
Browse files Browse the repository at this point in the history
  • Loading branch information
3dbrows authored Aug 10, 2022
1 parent 74f221c commit ae071bb
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 5 deletions.
48 changes: 48 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,54 @@ bool S3ProxyOptions::Equals(const S3ProxyOptions& other) const {
username == other.username && password == other.password);
}

// -----------------------------------------------------------------------
// AwsRetryStrategy implementation

class AwsRetryStrategy : public S3RetryStrategy {
public:
explicit AwsRetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> retry_strategy)
: retry_strategy_(std::move(retry_strategy)) {}

bool ShouldRetry(const AWSErrorDetail& detail, int64_t attempted_retries) override {
Aws::Client::AWSError<Aws::Client::CoreErrors> error = DetailToError(detail);
return retry_strategy_->ShouldRetry(
error, static_cast<long>(attempted_retries)); // NOLINT: runtime/int
}

int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& detail,
int64_t attempted_retries) override {
Aws::Client::AWSError<Aws::Client::CoreErrors> error = DetailToError(detail);
return retry_strategy_->CalculateDelayBeforeNextRetry(
error, static_cast<long>(attempted_retries)); // NOLINT: runtime/int
}

private:
std::shared_ptr<Aws::Client::RetryStrategy> retry_strategy_;
static Aws::Client::AWSError<Aws::Client::CoreErrors> DetailToError(
const S3RetryStrategy::AWSErrorDetail& detail) {
auto exception_name = ToAwsString(detail.exception_name);
auto message = ToAwsString(detail.message);
auto errors = Aws::Client::AWSError<Aws::Client::CoreErrors>(
static_cast<Aws::Client::CoreErrors>(detail.error_type), exception_name, message,
detail.should_retry);
return errors;
}
};

std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsDefaultRetryStrategy(
int64_t max_attempts) {
return std::make_shared<AwsRetryStrategy>(
std::make_shared<Aws::Client::DefaultRetryStrategy>(
static_cast<long>(max_attempts))); // NOLINT: runtime/int
}

std::shared_ptr<S3RetryStrategy> S3RetryStrategy::GetAwsStandardRetryStrategy(
int64_t max_attempts) {
return std::make_shared<AwsRetryStrategy>(
std::make_shared<Aws::Client::StandardRetryStrategy>(
static_cast<long>(max_attempts))); // NOLINT: runtime/int
}

// -----------------------------------------------------------------------
// S3Options implementation

Expand Down
8 changes: 7 additions & 1 deletion cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ enum class S3CredentialsKind : int8_t {
};

/// Pure virtual class for describing custom S3 retry strategies
class S3RetryStrategy {
class ARROW_EXPORT S3RetryStrategy {
public:
virtual ~S3RetryStrategy() = default;

Expand All @@ -90,6 +90,12 @@ class S3RetryStrategy {
/// Returns the time in milliseconds the S3 client should sleep for until retrying.
virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error,
int64_t attempted_retries) = 0;
/// Returns a stock AWS Default retry strategy.
static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy(
int64_t max_attempts);
/// Returns a stock AWS Standard retry strategy.
static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy(
int64_t max_attempts);
};

/// Options for the S3FileSystem implementation.
Expand Down
53 changes: 52 additions & 1 deletion python/pyarrow/_s3fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,44 @@ def resolve_s3_region(bucket):
return frombytes(c_region)


class S3RetryStrategy:
"""
Base class for AWS retry strategies for use with S3.
Parameters
----------
max_attempts : int, default 3
The maximum number of retry attempts to attempt before failing.
"""

def __init__(self, max_attempts=3):
self.max_attempts = max_attempts


class AwsStandardS3RetryStrategy(S3RetryStrategy):
"""
Represents an AWS Standard retry strategy for use with S3.
Parameters
----------
max_attempts : int, default 3
The maximum number of retry attempts to attempt before failing.
"""
pass


class AwsDefaultS3RetryStrategy(S3RetryStrategy):
"""
Represents an AWS Default retry strategy for use with S3.
Parameters
----------
max_attempts : int, default 3
The maximum number of retry attempts to attempt before failing.
"""
pass


cdef class S3FileSystem(FileSystem):
"""
S3-backed FileSystem implementation
Expand Down Expand Up @@ -173,6 +211,9 @@ cdef class S3FileSystem(FileSystem):
allow_bucket_deletion : bool, default False
Whether to allow DeleteDir at the bucket-level. This option may also be
passed in a URI query parameter.
retry_strategy : S3RetryStrategy, default AwsStandardS3RetryStrategy(max_attempts=3)
The retry strategy to use with S3; fail after max_attempts. Available
strategies are AwsStandardS3RetryStrategy, AwsDefaultS3RetryStrategy.
Examples
--------
Expand All @@ -195,7 +236,8 @@ cdef class S3FileSystem(FileSystem):
bint background_writes=True, default_metadata=None,
role_arn=None, session_name=None, external_id=None,
load_frequency=900, proxy_options=None,
allow_bucket_creation=False, allow_bucket_deletion=False):
allow_bucket_creation=False, allow_bucket_deletion=False,
retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)):
cdef:
CS3Options options
shared_ptr[CS3FileSystem] wrapped
Expand Down Expand Up @@ -300,6 +342,15 @@ cdef class S3FileSystem(FileSystem):
options.allow_bucket_creation = allow_bucket_creation
options.allow_bucket_deletion = allow_bucket_deletion

if isinstance(retry_strategy, AwsStandardS3RetryStrategy):
options.retry_strategy = CS3RetryStrategy.GetAwsStandardRetryStrategy(
retry_strategy.max_attempts)
elif isinstance(retry_strategy, AwsDefaultS3RetryStrategy):
options.retry_strategy = CS3RetryStrategy.GetAwsDefaultRetryStrategy(
retry_strategy.max_attempts)
else:
raise ValueError(f'Invalid retry_strategy {retry_strategy!r}')

with nogil:
wrapped = GetResultValue(CS3FileSystem.Make(options))

Expand Down
5 changes: 3 additions & 2 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@

try:
from pyarrow._s3fs import ( # noqa
S3FileSystem, S3LogLevel, initialize_s3, finalize_s3,
resolve_s3_region)
AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy,
S3FileSystem, S3LogLevel, S3RetryStrategy, finalize_s3,
initialize_s3, resolve_s3_region)
except ImportError:
_not_imported.append("S3FileSystem")
else:
Expand Down
8 changes: 8 additions & 0 deletions python/pyarrow/includes/libarrow_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
CS3CredentialsKind_WebIdentity \
"arrow::fs::S3CredentialsKind::WebIdentity"

cdef cppclass CS3RetryStrategy "arrow::fs::S3RetryStrategy":
@staticmethod
shared_ptr[CS3RetryStrategy] GetAwsDefaultRetryStrategy(int64_t max_attempts)

@staticmethod
shared_ptr[CS3RetryStrategy] GetAwsStandardRetryStrategy(int64_t max_attempts)

cdef cppclass CS3Options "arrow::fs::S3Options":
c_string region
double connect_timeout
Expand All @@ -166,6 +173,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
int load_frequency
CS3ProxyOptions proxy_options
CS3CredentialsKind credentials_kind
shared_ptr[CS3RetryStrategy] retry_strategy
void ConfigureDefaultCredentials()
void ConfigureAccessKey(const c_string& access_key,
const c_string& secret_key,
Expand Down
15 changes: 14 additions & 1 deletion python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,9 @@ def test_gcs_options():

@pytest.mark.s3
def test_s3_options():
from pyarrow.fs import S3FileSystem
from pyarrow.fs import (AwsDefaultS3RetryStrategy,
AwsStandardS3RetryStrategy, S3FileSystem,
S3RetryStrategy)

fs = S3FileSystem(access_key='access', secret_key='secret',
session_token='token', region='us-east-2',
Expand All @@ -1107,6 +1109,15 @@ def test_s3_options():
assert isinstance(fs, S3FileSystem)
assert pickle.loads(pickle.dumps(fs)) == fs

# Note that the retry strategy won't survive pickling for now
fs = S3FileSystem(
retry_strategy=AwsStandardS3RetryStrategy(max_attempts=5))
assert isinstance(fs, S3FileSystem)

fs = S3FileSystem(
retry_strategy=AwsDefaultS3RetryStrategy(max_attempts=5))
assert isinstance(fs, S3FileSystem)

fs2 = S3FileSystem(role_arn='role')
assert isinstance(fs2, S3FileSystem)
assert pickle.loads(pickle.dumps(fs2)) == fs2
Expand Down Expand Up @@ -1160,6 +1171,8 @@ def test_s3_options():
S3FileSystem(role_arn="arn", anonymous=True)
with pytest.raises(ValueError):
S3FileSystem(default_metadata=["foo", "bar"])
with pytest.raises(ValueError):
S3FileSystem(retry_strategy=S3RetryStrategy())


@pytest.mark.s3
Expand Down

0 comments on commit ae071bb

Please sign in to comment.