Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38364: [Python] Initialize S3 on first use #38375

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion python/pyarrow/_s3fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ cpdef enum S3LogLevel:
Debug = <int8_t> CS3LogLevel_Debug
Trace = <int8_t> CS3LogLevel_Trace

# Prevent registration of multiple `atexit` handlers
_initialized = False


def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal, int num_event_loop_threads=1):
"""
Expand All @@ -63,7 +66,13 @@ def ensure_s3_initialized():
"""
Initialize S3 (with default options) if not already initialized
"""
check_status(CEnsureS3Initialized())
global _initialized

if not _initialized:
check_status(CEnsureS3Initialized())
import atexit
atexit.register(finalize_s3)
_initialized = True


def finalize_s3():
Expand Down Expand Up @@ -93,6 +102,8 @@ def resolve_s3_region(bucket):
c_string c_bucket
c_string c_region

ensure_s3_initialized()

c_bucket = tobytes(bucket)
with nogil:
c_region = GetResultValue(ResolveS3BucketRegion(c_bucket))
Expand Down Expand Up @@ -260,6 +271,26 @@ cdef class S3FileSystem(FileSystem):
load_frequency=900, proxy_options=None,
allow_bucket_creation=False, allow_bucket_deletion=False,
retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)):
ensure_s3_initialized()

self._initialize_s3(access_key=access_key, secret_key=secret_key, session_token=session_token,
anonymous=anonymous, region=region, request_timeout=request_timeout,
connect_timeout=connect_timeout, scheme=scheme, endpoint_override=endpoint_override,
background_writes=background_writes, default_metadata=default_metadata,
role_arn=role_arn, session_name=session_name, external_id=external_id,
load_frequency=load_frequency, proxy_options=proxy_options,
allow_bucket_creation=allow_bucket_creation, allow_bucket_deletion=allow_bucket_deletion,
retry_strategy=retry_strategy)

def _initialize_s3(self, *, access_key=None, secret_key=None, session_token=None,
bint anonymous=False, region=None, request_timeout=None,
connect_timeout=None, scheme=None, endpoint_override=None,
bint background_writes=True, default_metadata=None,
role_arn=None, session_name=None, external_id=None,
load_frequency=900, proxy_options=None,
allow_bucket_creation=False, allow_bucket_deletion=False,
retry_strategy: S3RetryStrategy = AwsStandardS3RetryStrategy(max_attempts=3)):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?
It seems that __init__() already calls ensure_s3_initialized().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I had to move the implementation to the _initialize_s3 method are the cdefs in https://github.com/apache/arrow/pull/38375/files#diff-afa3ea99a387be221ef1f7230aa309b42001aed318cdc6969e700d5eb04d07b2R294-R296, they will instantiate objects that expect S3 to be already initialized (which is only guaranteed after ensure_s3_initialized() is called) and Cython will instantiate them before any code in the function runs, including ensure_s3_initialized().

Another alternative would be to make them unique_ptrs or something that won't instantiate the objects immediately at __init__()'s entry, but I think this would make the code more complex than the solution currently proposed. If a pointer is preferred here without deferring to another method I can work on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The given cdef's are std::optional and std::shared_ptr, so I don't understand the concern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed they are, thanks for calling my attention to that. Now I see what happened, I initially tested this in the 12.0.1 tag where CS3Options was not a std::optional and that was causing the CS3Options default constructor to be called before ensure_s3_initialized(). I can confirm it works as expected in main without that, and fixed it now via 58097e9 .

cdef:
optional[CS3Options] options
shared_ptr[CS3FileSystem] wrapped
Expand Down
4 changes: 0 additions & 4 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@
finalize_s3, initialize_s3, resolve_s3_region)
except ImportError:
_not_imported.append("S3FileSystem")
else:
ensure_s3_initialized()
import atexit
atexit.register(finalize_s3)
pitrou marked this conversation as resolved.
Show resolved Hide resolved


def __getattr__(name):
Expand Down