Skip to content

Commit

Permalink
S3Options
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Sep 26, 2019
1 parent 98bd91a commit 73e6625
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 23 deletions.
122 changes: 103 additions & 19 deletions python/pyarrow/_s3fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,105 @@ def finalize_s3():
check_status(CFinalizeS3())


cdef class S3Options:
"""Options for S3FileSystem.
If neither access_key nor secret_key are provided then attempts to
initialize from AWS environment variables, otherwise both access_key and
secret_key must be provided.
Parameters
----------
access_key: str, default None
AWS Access Key ID. Pass None to use the standard AWS environment
variables and/or configuration file.
secret_key: str, default None
AWS Secret Access key. Pass None to use the standard AWS environment
variables and/or configuration file.
region: str, default 'us-east-1'
AWS region to connect to.
scheme: str, default 'https'
S3 connection transport scheme.
endpoint_override: str, default None
Override region with a connect string such as "localhost:9000"
background_writes: boolean, default True
Whether OutputStream writes will be issued in the background, without
blocking.
"""
cdef:
CS3Options options

# Avoid mistakingly creating attributes
__slots__ = ()

def __init__(self, access_key=None, secret_key=None, region=None,
scheme=None, endpoint_override=None, background_writes=None):
if access_key is not None and secret_key is None:
raise ValueError(
'In order to initialize with explicit credentials both '
'access_key and secret_key must be provided, '
'`secret_key` is not set.'
)
elif access_key is None and secret_key is not None:
raise ValueError(
'In order to initialize with explicit credentials both '
'access_key and secret_key must be provided, '
'`access_key` is not set.'
)
elif access_key is not None or secret_key is not None:
self.options = CS3Options.FromAccessKey(
tobytes(access_key),
tobytes(secret_key)
)
else:
self.options = CS3Options.Defaults()

if region is not None:
self.region = region
if scheme is not None:
self.scheme = scheme
if endpoint_override is not None:
self.endpoint_override = endpoint_override
if background_writes is not None:
self.background_writes = background_writes

@property
def region(self):
"""AWS region to connect to."""
return frombytes(self.options.region)

@region.setter
def region(self, value):
self.options.region = tobytes(value)

@property
def scheme(self):
"""S3 connection transport scheme."""
return frombytes(self.options.scheme)

@scheme.setter
def scheme(self, value):
self.options.scheme = tobytes(value)

@property
def endpoint_override(self):
"""Override region with a connect string such as localhost:9000"""
return frombytes(self.options.endpoint_override)

@endpoint_override.setter
def endpoint_override(self, value):
self.options.endpoint_override = tobytes(value)

@property
def background_writes(self):
"""OutputStream writes will be issued in the background"""
return self.options.background_writes

@background_writes.setter
def background_writes(self, bint value):
self.options.background_writes = value


cdef class S3FileSystem(FileSystem):
"""S3-backed FileSystem implementation
Expand Down Expand Up @@ -75,25 +174,10 @@ cdef class S3FileSystem(FileSystem):
cdef:
CS3FileSystem* s3fs

def __init__(self, access_key=None, secret_key=None, region='us-east-1',
scheme='https', endpoint_override=None,
bint background_writes=True):
cdef:
CS3Options options
shared_ptr[CS3FileSystem] wrapped

options = CS3Options.Defaults()
if access_key is not None or secret_key is not None:
options.ConfigureAccessKey(tobytes(access_key),
tobytes(secret_key))

options.region = tobytes(region)
options.scheme = tobytes(scheme)
options.background_writes = background_writes
if endpoint_override is not None:
options.endpoint_override = tobytes(endpoint_override)

check_status(CS3FileSystem.Make(options, &wrapped))
def __init__(self, S3Options options=None):
cdef shared_ptr[CS3FileSystem] wrapped
options = options or S3Options()
check_status(CS3FileSystem.Make(options.options, &wrapped))
self.init(<shared_ptr[CFileSystem]> wrapped)

cdef init(self, const shared_ptr[CFileSystem]& wrapped):
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/s3fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pyarrow._s3fs import ( # noqa
initialize_s3,
finalize_s3,
S3Options,
S3FileSystem
)

Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
pass

try:
from pyarrow.fs import S3FileSystem # noqa
import pyarrow.s3fs # noqa
defaults['s3'] = True
except ImportError:
pass
Expand Down Expand Up @@ -248,8 +248,8 @@ def __exit__(self, exc_type, exc_value, traceback):
shutil.rmtree(self.tmp)


@pytest.fixture(scope='session')
@pytest.mark.s3
@pytest.fixture(scope='session')
def minio_server():
host, port = 'localhost', find_free_port()
access_key, secret_key = 'arrow', 'apachearrow'
Expand Down
40 changes: 38 additions & 2 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@ def subtree_localfs(request, tempdir, localfs):
)


@pytest.mark.s3
@pytest.fixture
def s3fs(request, minio_server):
from pyarrow.s3fs import S3FileSystem
from pyarrow.s3fs import S3Options, S3FileSystem

address, access_key, secret_key = minio_server
bucket = 'pyarrow-filesystem/'
fs = S3FileSystem(
options = S3Options(
endpoint_override=address,
access_key=access_key,
secret_key=secret_key,
scheme='http'
)
fs = S3FileSystem(options)
fs.create_dir(bucket)

return dict(
Expand Down Expand Up @@ -387,3 +389,37 @@ def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor,
else:
with pytest.raises(pa.ArrowNotImplementedError):
fs.open_append_stream(p, compression, buffer_size)


@pytest.mark.s3
def test_s3_options(minio_server):
from pyarrow.s3fs import S3Options

options = S3Options()

assert options.region == 'us-east-1'
options.region = 'us-west-1'
assert options.region == 'us-west-1'

assert options.scheme == 'https'
options.scheme = 'http'
assert options.scheme == 'http'

assert options.endpoint_override == ''
options.endpoint_override = 'localhost:8999'
assert options.endpoint_override == 'localhost:8999'

with pytest.raises(ValueError):
S3Options(access_key='access')
with pytest.raises(ValueError):
S3Options(secret_key='secret')

address, access_key, secret_key = minio_server
options = S3Options(
access_key=access_key,
secret_key=secret_key,
endpoint_override=address,
scheme='http'
)
assert options.scheme == 'http'
assert options.endpoint_override == address

0 comments on commit 73e6625

Please sign in to comment.