From 73e6625f9c9f93837b4466e6aeedae4ad463fe04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Thu, 26 Sep 2019 16:27:51 +0200 Subject: [PATCH] S3Options --- python/pyarrow/_s3fs.pyx | 122 ++++++++++++++++++++++++++----- python/pyarrow/s3fs.py | 1 + python/pyarrow/tests/conftest.py | 4 +- python/pyarrow/tests/test_fs.py | 40 +++++++++- 4 files changed, 144 insertions(+), 23 deletions(-) diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx index 532a290607d91..09c1c98ac01a0 100644 --- a/python/pyarrow/_s3fs.pyx +++ b/python/pyarrow/_s3fs.pyx @@ -47,6 +47,105 @@ def finalize_s3(): check_status(CFinalizeS3()) +cdef class S3Options: + """Options for S3FileSystem. + + If neither access_key nor secret_key are provided then attempts to + initialize from AWS environment variables, otherwise both access_key and + secret_key must be provided. + + Parameters + ---------- + access_key: str, default None + AWS Access Key ID. Pass None to use the standard AWS environment + variables and/or configuration file. + secret_key: str, default None + AWS Secret Access key. Pass None to use the standard AWS environment + variables and/or configuration file. + region: str, default 'us-east-1' + AWS region to connect to. + scheme: str, default 'https' + S3 connection transport scheme. + endpoint_override: str, default None + Override region with a connect string such as "localhost:9000" + background_writes: boolean, default True + Whether OutputStream writes will be issued in the background, without + blocking. + """ + cdef: + CS3Options options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, access_key=None, secret_key=None, region=None, + scheme=None, endpoint_override=None, background_writes=None): + if access_key is not None and secret_key is None: + raise ValueError( + 'In order to initialize with explicit credentials both ' + 'access_key and secret_key must be provided, ' + '`secret_key` is not set.' + ) + elif access_key is None and secret_key is not None: + raise ValueError( + 'In order to initialize with explicit credentials both ' + 'access_key and secret_key must be provided, ' + '`access_key` is not set.' + ) + elif access_key is not None or secret_key is not None: + self.options = CS3Options.FromAccessKey( + tobytes(access_key), + tobytes(secret_key) + ) + else: + self.options = CS3Options.Defaults() + + if region is not None: + self.region = region + if scheme is not None: + self.scheme = scheme + if endpoint_override is not None: + self.endpoint_override = endpoint_override + if background_writes is not None: + self.background_writes = background_writes + + @property + def region(self): + """AWS region to connect to.""" + return frombytes(self.options.region) + + @region.setter + def region(self, value): + self.options.region = tobytes(value) + + @property + def scheme(self): + """S3 connection transport scheme.""" + return frombytes(self.options.scheme) + + @scheme.setter + def scheme(self, value): + self.options.scheme = tobytes(value) + + @property + def endpoint_override(self): + """Override region with a connect string such as localhost:9000""" + return frombytes(self.options.endpoint_override) + + @endpoint_override.setter + def endpoint_override(self, value): + self.options.endpoint_override = tobytes(value) + + @property + def background_writes(self): + """OutputStream writes will be issued in the background""" + return self.options.background_writes + + @background_writes.setter + def background_writes(self, bint value): + self.options.background_writes = value + + cdef class S3FileSystem(FileSystem): """S3-backed FileSystem implementation @@ -75,25 +174,10 @@ cdef class S3FileSystem(FileSystem): cdef: CS3FileSystem* s3fs - def __init__(self, access_key=None, secret_key=None, region='us-east-1', - scheme='https', endpoint_override=None, - bint background_writes=True): - cdef: - CS3Options options - shared_ptr[CS3FileSystem] wrapped - - options = CS3Options.Defaults() - if access_key is not None or secret_key is not None: - options.ConfigureAccessKey(tobytes(access_key), - tobytes(secret_key)) - - options.region = tobytes(region) - options.scheme = tobytes(scheme) - options.background_writes = background_writes - if endpoint_override is not None: - options.endpoint_override = tobytes(endpoint_override) - - check_status(CS3FileSystem.Make(options, &wrapped)) + def __init__(self, S3Options options=None): + cdef shared_ptr[CS3FileSystem] wrapped + options = options or S3Options() + check_status(CS3FileSystem.Make(options.options, &wrapped)) self.init( wrapped) cdef init(self, const shared_ptr[CFileSystem]& wrapped): diff --git a/python/pyarrow/s3fs.py b/python/pyarrow/s3fs.py index bc0127ae3c4cc..5619e186f9ea4 100644 --- a/python/pyarrow/s3fs.py +++ b/python/pyarrow/s3fs.py @@ -20,6 +20,7 @@ from pyarrow._s3fs import ( # noqa initialize_s3, finalize_s3, + S3Options, S3FileSystem ) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 85297fe81de68..68e215b3f4223 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -132,7 +132,7 @@ pass try: - from pyarrow.fs import S3FileSystem # noqa + import pyarrow.s3fs # noqa defaults['s3'] = True except ImportError: pass @@ -248,8 +248,8 @@ def __exit__(self, exc_type, exc_value, traceback): shutil.rmtree(self.tmp) -@pytest.fixture(scope='session') @pytest.mark.s3 +@pytest.fixture(scope='session') def minio_server(): host, port = 'localhost', find_free_port() access_key, secret_key = 'arrow', 'apachearrow' diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 6e0e1f5e35111..f6b6bf1d18cdf 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -51,18 +51,20 @@ def subtree_localfs(request, tempdir, localfs): ) +@pytest.mark.s3 @pytest.fixture def s3fs(request, minio_server): - from pyarrow.s3fs import S3FileSystem + from pyarrow.s3fs import S3Options, S3FileSystem address, access_key, secret_key = minio_server bucket = 'pyarrow-filesystem/' - fs = S3FileSystem( + options = S3Options( endpoint_override=address, access_key=access_key, secret_key=secret_key, scheme='http' ) + fs = S3FileSystem(options) fs.create_dir(bucket) return dict( @@ -387,3 +389,37 @@ def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor, else: with pytest.raises(pa.ArrowNotImplementedError): fs.open_append_stream(p, compression, buffer_size) + + +@pytest.mark.s3 +def test_s3_options(minio_server): + from pyarrow.s3fs import S3Options + + options = S3Options() + + assert options.region == 'us-east-1' + options.region = 'us-west-1' + assert options.region == 'us-west-1' + + assert options.scheme == 'https' + options.scheme = 'http' + assert options.scheme == 'http' + + assert options.endpoint_override == '' + options.endpoint_override = 'localhost:8999' + assert options.endpoint_override == 'localhost:8999' + + with pytest.raises(ValueError): + S3Options(access_key='access') + with pytest.raises(ValueError): + S3Options(secret_key='secret') + + address, access_key, secret_key = minio_server + options = S3Options( + access_key=access_key, + secret_key=secret_key, + endpoint_override=address, + scheme='http' + ) + assert options.scheme == 'http' + assert options.endpoint_override == address