Skip to content

Commit

Permalink
test requirements; flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Sep 24, 2019
1 parent 44aedfd commit c0b9162
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 53 deletions.
2 changes: 2 additions & 0 deletions ci/conda_env_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ cython=0.29.7
cloudpickle
hypothesis
numpy>=1.14
minio
pandas
pytest
pytest-faulthandler
pytest-lazy-fixture
pytz
setuptools
setuptools_scm=3.2.0
3 changes: 3 additions & 0 deletions ci/travis_script_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ cmake -GNinja \
-DARROW_TENSORFLOW=on \
-DARROW_PYTHON=on \
-DARROW_ORC=on \
-DARROW_S3=on \
-DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
-DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
$ARROW_CPP_DIR
Expand Down Expand Up @@ -164,6 +165,7 @@ export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE
export PYARROW_WITH_PARQUET=1
export PYARROW_WITH_PLASMA=1
export PYARROW_WITH_ORC=1
export PYARROW_WITH_S3=1
if [ "$ARROW_TRAVIS_FLIGHT" == "1" ]; then
export PYARROW_WITH_FLIGHT=1
fi
Expand All @@ -177,6 +179,7 @@ python setup.py develop
python -c "import pyarrow.parquet"
python -c "import pyarrow.plasma"
python -c "import pyarrow.orc"
python -c "import pyarrow.fs"

# Ensure we do eagerly import pandas (or other expensive imports)
python < scripts/test_imports.py
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ cdef class SubTreeFileSystem(FileSystem):
cdef:
CSubTreeFileSystem* subtreefs

cdef init(self, const shared_ptr[CFileSystem]& wrapped)
cdef init(self, const shared_ptr[CFileSystem]& wrapped)
4 changes: 2 additions & 2 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from __future__ import absolute_import

from pyarrow._fs import (
from pyarrow._fs import ( # noqa
Selector,
FileType,
FileStats,
Expand All @@ -27,6 +27,6 @@
)
from pyarrow._s3 import S3FileSystem
try:
from pyarrow._s3 import S3FileSystem, initialize_s3, finalize_s3
from pyarrow._s3 import S3FileSystem, initialize_s3, finalize_s3 # noqa
except ImportError:
pass
21 changes: 11 additions & 10 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
int file_descriptor()

cdef cppclass CMemoryMappedFile \
" arrow::io::MemoryMappedFile"(ReadWriteFileInterface):
"arrow::io::MemoryMappedFile"(ReadWriteFileInterface):

@staticmethod
CStatus Create(const c_string& path, int64_t size,
Expand All @@ -826,7 +826,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
int file_descriptor()

cdef cppclass CCompressedInputStream \
" arrow::io::CompressedInputStream"(CInputStream):
"arrow::io::CompressedInputStream"(CInputStream):
@staticmethod
CStatus Make(CMemoryPool* pool, CCodec* codec,
shared_ptr[CInputStream] raw,
Expand All @@ -837,7 +837,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
shared_ptr[CCompressedInputStream]* out)

cdef cppclass CCompressedOutputStream \
" arrow::io::CompressedOutputStream"(COutputStream):
"arrow::io::CompressedOutputStream"(COutputStream):
@staticmethod
CStatus Make(CMemoryPool* pool, CCodec* codec,
shared_ptr[COutputStream] raw,
Expand All @@ -848,7 +848,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
shared_ptr[CCompressedOutputStream]* out)

cdef cppclass CBufferedInputStream \
" arrow::io::BufferedInputStream"(CInputStream):
"arrow::io::BufferedInputStream"(CInputStream):

@staticmethod
CStatus Create(int64_t buffer_size, CMemoryPool* pool,
Expand All @@ -858,7 +858,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
shared_ptr[CInputStream] Detach()

cdef cppclass CBufferedOutputStream \
" arrow::io::BufferedOutputStream"(COutputStream):
"arrow::io::BufferedOutputStream"(COutputStream):

@staticmethod
CStatus Create(int64_t buffer_size, CMemoryPool* pool,
Expand Down Expand Up @@ -903,7 +903,8 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
cdef cppclass HdfsOutputStream(COutputStream):
pass

cdef cppclass CHadoopFileSystem" arrow::io::HadoopFileSystem"(CIOFileSystem):
cdef cppclass CHadoopFileSystem \
"arrow::io::HadoopFileSystem"(CIOFileSystem):
@staticmethod
CStatus Connect(const HdfsConnectionConfig* config,
shared_ptr[CHadoopFileSystem]* client)
Expand Down Expand Up @@ -939,21 +940,21 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
shared_ptr[HdfsOutputStream]* handle)

cdef cppclass CBufferReader \
" arrow::io::BufferReader"(CRandomAccessFile):
"arrow::io::BufferReader"(CRandomAccessFile):
CBufferReader(const shared_ptr[CBuffer]& buffer)
CBufferReader(const uint8_t* data, int64_t nbytes)

cdef cppclass CBufferOutputStream \
" arrow::io::BufferOutputStream"(COutputStream):
"arrow::io::BufferOutputStream"(COutputStream):
CBufferOutputStream(const shared_ptr[CResizableBuffer]& buffer)

cdef cppclass CMockOutputStream \
" arrow::io::MockOutputStream"(COutputStream):
"arrow::io::MockOutputStream"(COutputStream):
CMockOutputStream()
int64_t GetExtentBytesWritten()

cdef cppclass CFixedSizeBufferWriter \
" arrow::io::FixedSizeBufferWriter"(WritableFile):
"arrow::io::FixedSizeBufferWriter"(WritableFile):
CFixedSizeBufferWriter(const shared_ptr[CBuffer]& buffer)

void set_memcopy_threads(int num_threads)
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/includes/libarrow_s3.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
void ConfigureDefaultCredentials()
void ConfigureAccessKey(const c_string& access_key,
const c_string& secret_key)

@staticmethod
CS3Options Defaults()
@staticmethod
Expand All @@ -56,4 +57,4 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:

cdef CStatus CInitializeS3 "arrow::fs::InitializeS3"(
const CS3GlobalOptions& options)
cdef CStatus CFinalizeS3 "arrow::fs::FinalizeS3"()
cdef CStatus CFinalizeS3 "arrow::fs::FinalizeS3"()
2 changes: 1 addition & 1 deletion python/pyarrow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
pass

try:
from pyarrow.fs import S3FileSystem
from pyarrow.fs import S3FileSystem # noqa
defaults['s3'] = True
except ImportError:
pass
Expand Down
123 changes: 85 additions & 38 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,65 @@
import tempfile
from datetime import datetime
try:

import pathlib
except ImportError:
import pathlib2 as pathlib # py2 compat

import pytest

import pyarrow as pa
from pyarrow import ArrowIOError
from pyarrow.tests.test_io import gzip_compress, gzip_decompress
from pyarrow.fs import (FileType, Selector, FileSystem, LocalFileSystem,
SubTreeFileSystem)


class Local:
class FileSystemWrapper:

# Whether the filesystem may "implicitly" create intermediate directories
have_implicit_directories = False
# Whether the filesystem may allow writing a file "over" a directory
allow_write_file_over_dir = False
# Whether the filesystem allows moving a directory
allow_move_dir = True
# Whether the filesystem allows appending to a file
allow_append_to_file = False
# Whether the filesystem supports directory modification times
have_directory_mtimes = True

@property
def impl(self):
return self._impl

@impl.setter
def impl(self, impl):
self._impl = impl

def pathpair(self, p):
raise NotImplementedError()

def mkdir(self, p):
raise NotImplementedError()

def touch(self, p):
raise NotImplementedError()

def exists(self, p):
raise NotImplementedError()

def mtime(self, p):
raise NotImplementedError()

def write_bytes(self, p, data):
raise NotImplementedError()

def read_bytes(self, p):
raise NotImplementedError()


class LocalWrapper(FileSystemWrapper):

allow_append_to_file = True

def __init__(self, tempdir):
self.impl = LocalFileSystem()
Expand Down Expand Up @@ -67,7 +112,7 @@ def read_bytes(self, p):
return pathlib.Path(p).read_bytes()


class SubTreeLocal(Local):
class SubTreeLocalWrapper(LocalWrapper):

def __init__(self, tempdir, prefix='local/prefix'):
prefix_absolute = tempdir / prefix
Expand All @@ -86,11 +131,12 @@ def pathpair(self, p):
return (path_for_wrapper, path_for_impl)


class S3:
class S3Wrapper(FileSystemWrapper):

allow_move_dir = False

def __init__(self, minio_client, bucket='test-bucket', **kwargs):
from pyarrow.fs import S3FileSystem, initialize_s3
initialize_s3()
from pyarrow.fs import S3FileSystem
self.impl = S3FileSystem(**kwargs)
self.client = minio_client
self.bucket = bucket
Expand Down Expand Up @@ -156,7 +202,7 @@ def read_bytes(self, p):
return data.read()


class SubTreeS3(S3):
class SubTreeS3Wrapper(S3Wrapper):

def __init__(self, minio_client, bucket='test-bucket', prefix='s3/prefix',
**kwargs):
Expand Down Expand Up @@ -215,18 +261,20 @@ def minio_client(minio_server):


@pytest.fixture(params=[
Local,
SubTreeLocal
LocalWrapper,
SubTreeLocalWrapper
])
def localfs(request, tempdir):
return request.param(tempdir)


@pytest.fixture(params=[
S3,
SubTreeS3
S3Wrapper,
SubTreeS3Wrapper
])
def s3fs(request, minio_server, minio_client):
from pyarrow.fs import initialize_s3
initialize_s3()
address, access_key, secret_key = minio_server
client, bucket = minio_client
return request.param(
Expand Down Expand Up @@ -263,11 +311,6 @@ class Path:
fs.impl.create_dir(path)


def test_file_stat_repr():
# TODO(kszucs)
pass


def test_get_target_stats(fs):
_aaa, aaa = fs.pathpair('a/aa/aaa/')
_bb, bb = fs.pathpair('a/bb')
Expand All @@ -287,11 +330,11 @@ def mtime_almost_equal(a, b):

assert aaa_stat.path == aaa
assert 'aaa' in repr(aaa_stat)
assert aaa_stat.extension == ''
assert mtime_almost_equal(aaa_stat.mtime, fs.mtime(_aaa))
# type is inconsistent base_name has a trailing slas for 'aaa' and 'aaa/'
# assert aaa_stat.base_name == 'aaa'
assert aaa_stat.extension == ''
# assert aaa_stat.type == FileType.Directory
assert mtime_almost_equal(aaa_stat.mtime, fs.mtime(_aaa))
# assert aaa_stat is None

assert bb_stat.path == str(bb)
Expand Down Expand Up @@ -372,17 +415,21 @@ def test_copy_file(fs):
assert fs.exists(_t)


def test_move_directory(localfs):
def test_move_directory(fs):
# move directory (doesn't work with S3)
_s, s = localfs.pathpair('source-dir/')
_t, t = localfs.pathpair('target-dir/')
localfs.mkdir(_s)
_s, s = fs.pathpair('source-dir/')
_t, t = fs.pathpair('target-dir/')
fs.mkdir(_s)

assert localfs.exists(_s)
assert not localfs.exists(_t)
localfs.impl.move(s, t)
assert not localfs.exists(_s)
assert localfs.exists(_t)
if fs.allow_move_dir:
assert fs.exists(_s)
assert not fs.exists(_t)
fs.impl.move(s, t)
assert not fs.exists(_s)
assert fs.exists(_t)
else:
with pytest.raises(pa.ArrowIOError):
fs.impl.move(s, t)


def test_move_file(fs):
Expand Down Expand Up @@ -487,18 +534,18 @@ def test_open_output_stream(fs, compression, buffer_size, decompressor):
('gzip', 256, gzip_compress, gzip_decompress),
]
)
def test_open_append_stream(localfs, compression, buffer_size, compressor,
def test_open_append_stream(fs, compression, buffer_size, compressor,
decompressor):
_p, p = localfs.pathpair('open-append-stream')
_p, p = fs.pathpair('open-append-stream')

data = compressor(b'already existing')
localfs.write_bytes(_p, data)

with localfs.impl.open_append_stream(p, compression, buffer_size) as f:
f.write(b'\nnewly added')

result = decompressor(localfs.read_bytes(_p))
assert result == b'already existing\nnewly added'

fs.write_bytes(_p, data)

# TODO(kszucs): test that open_append_stream raises for s3
if fs.allow_append_to_file:
with fs.impl.open_append_stream(p, compression, buffer_size) as f:
f.write(b'\nnewly added')
result = decompressor(fs.read_bytes(_p))
assert result == b'already existing\nnewly added'
else:
with pytest.raises(pa.ArrowNotImplementedError):
fs.impl.open_append_stream(p, compression, buffer_size)

0 comments on commit c0b9162

Please sign in to comment.