From 072a752515dec1b257ec3bd31d76dc50ef26154b Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Tue, 5 Dec 2023 18:26:59 +0200 Subject: [PATCH] GH-37857: [Python][Dataset] Expose file size to python dataset (#37868) ### Rationale for this change Allow passing known file sizes to `make_fragment`, to avoid potential network requests. ### What changes are included in this PR? ### Are these changes tested? Yes, tests with S3 that file size gets used. ### Are there any user-facing changes? Yes, new function arguments. * Closes: #37857 Lead-authored-by: Eero Lihavainen Co-authored-by: Benjamin Kietzman Co-authored-by: Eero Lihavainen Signed-off-by: Antoine Pitrou --- python/pyarrow/_dataset.pxd | 5 +- python/pyarrow/_dataset.pyx | 25 ++++++--- python/pyarrow/_dataset_parquet.pyx | 11 ++-- python/pyarrow/includes/libarrow_dataset.pxd | 1 + python/pyarrow/tests/test_dataset.py | 58 ++++++++++++++++++++ 5 files changed, 84 insertions(+), 16 deletions(-) diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index bee9fc1f0987a..220ab6b19affe 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -22,11 +22,10 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow_dataset cimport * from pyarrow.lib cimport * -from pyarrow._fs cimport FileSystem +from pyarrow._fs cimport FileSystem, FileInfo -cdef CFileSource _make_file_source(object file, FileSystem filesystem=*) - +cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, object file_size=*) cdef class DatasetFactory(_Weakrefable): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 029948a609fc5..b93f71969e8d3 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -32,7 +32,7 @@ from pyarrow.includes.libarrow_dataset cimport * from pyarrow._acero cimport ExecNodeOptions from pyarrow._compute cimport Expression, _bind from pyarrow._compute import _forbid_instantiation -from pyarrow._fs cimport FileSystem, FileSelector +from pyarrow._fs cimport FileSystem, FileSelector, FileInfo from pyarrow._csv cimport ( ConvertOptions, ParseOptions, ReadOptions, WriteOptions) from pyarrow.util import _is_iterable, _is_path_like, _stringify_path @@ -96,27 +96,33 @@ def _get_parquet_symbol(name): return _dataset_pq and getattr(_dataset_pq, name) -cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): +cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, object file_size=None): cdef: CFileSource c_source shared_ptr[CFileSystem] c_filesystem + CFileInfo c_info c_string c_path shared_ptr[CRandomAccessFile] c_file shared_ptr[CBuffer] c_buffer + int64_t c_size if isinstance(file, Buffer): c_buffer = pyarrow_unwrap_buffer(file) c_source = CFileSource(move(c_buffer)) - elif _is_path_like(file): if filesystem is None: raise ValueError("cannot construct a FileSource from " "a path without a FileSystem") c_filesystem = filesystem.unwrap() c_path = tobytes(_stringify_path(file)) - c_source = CFileSource(move(c_path), move(c_filesystem)) + if file_size is not None: + c_size = file_size + c_info = FileInfo(c_path, size=c_size).unwrap() + c_source = CFileSource(move(c_info), move(c_filesystem)) + else: + c_source = CFileSource(move(c_path), move(c_filesystem)) elif hasattr(file, 'read'): # Optimistically hope this is file-like c_file = get_native_file(file, False).get_random_access_file() @@ -1230,7 +1236,7 @@ cdef class FileFormat(_Weakrefable): The schema inferred from the file """ cdef: - CFileSource c_source = _make_file_source(file, filesystem) + CFileSource c_source = _make_file_source(file, filesystem, file_size=None) CResult[shared_ptr[CSchema]] c_result with nogil: c_result = self.format.Inspect(c_source) @@ -1238,7 +1244,8 @@ cdef class FileFormat(_Weakrefable): return pyarrow_wrap_schema(move(c_schema)) def make_fragment(self, file, filesystem=None, - Expression partition_expression=None): + Expression partition_expression=None, + *, file_size=None): """ Make a FileFragment from a given file. @@ -1252,6 +1259,9 @@ cdef class FileFormat(_Weakrefable): partition_expression : Expression, optional An expression that is guaranteed true for all rows in the fragment. Allows fragment to be potentially skipped while scanning with a filter. + file_size : int, optional + The size of the file in bytes. Can improve performance with high-latency filesystems + when file size needs to be known before reading. Returns ------- @@ -1260,8 +1270,7 @@ cdef class FileFormat(_Weakrefable): """ if partition_expression is None: partition_expression = _true - - c_source = _make_file_source(file, filesystem) + c_source = _make_file_source(file, filesystem, file_size) c_fragment = GetResultValue( self.format.MakeFragment(move(c_source), partition_expression.unwrap(), diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index f83b78d9336b8..d458ac4ee710d 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat): return f"" def make_fragment(self, file, filesystem=None, - Expression partition_expression=None, row_groups=None): + Expression partition_expression=None, row_groups=None, *, file_size=None): """ Make a FileFragment from a given file. @@ -251,6 +251,9 @@ cdef class ParquetFileFormat(FileFormat): fragment to be potentially skipped while scanning with a filter. row_groups : Iterable, optional The indices of the row groups to include + file_size : int, optional + The size of the file in bytes. Can improve performance with high-latency filesystems + when file size needs to be known before reading. Returns ------- @@ -259,15 +262,13 @@ cdef class ParquetFileFormat(FileFormat): """ cdef: vector[int] c_row_groups - if partition_expression is None: partition_expression = _true - if row_groups is None: return super().make_fragment(file, filesystem, - partition_expression) + partition_expression, file_size=file_size) - c_source = _make_file_source(file, filesystem) + c_source = _make_file_source(file, filesystem, file_size) c_row_groups = [ row_group for row_group in set(row_groups)] c_fragment = GetResultValue( diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 8901d763e3998..4566cb5004add 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: const c_string& path() const const shared_ptr[CFileSystem]& filesystem() const const shared_ptr[CBuffer]& buffer() const + const int64_t size() const # HACK: Cython can't handle all the overloads so don't declare them. # This means invalid construction of CFileSource won't be caught in # the C++ generation phase (though it will still be caught when diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d5e7015a5d5b9..f3c25ee8c5c3b 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -988,6 +988,64 @@ def test_make_fragment(multisourcefs): assert row_group_fragment.row_groups == [0] +@pytest.mark.parquet +@pytest.mark.s3 +def test_make_fragment_with_size(s3_example_simple): + """ + Test passing file_size to make_fragment. Not all FS implementations make use + of the file size (by implementing an OpenInputFile that takes a FileInfo), but + s3 does, which is why it's used here. + """ + table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple + + file_format = ds.ParquetFileFormat() + paths = [path] + + fragments = [file_format.make_fragment(path, fs) + for path in paths] + dataset = ds.FileSystemDataset( + fragments, format=file_format, schema=table.schema, filesystem=fs + ) + + tbl = dataset.to_table() + assert tbl.equals(table) + + # true sizes -> works + sizes_true = [dataset.filesystem.get_file_info(x).size for x in dataset.files] + fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) + for path, size in zip(paths, sizes_true)] + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, filesystem=fs + ) + tbl = dataset.to_table() + assert tbl.equals(table) + + # too small sizes -> error + sizes_toosmall = [1 for path in paths] + fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) + for path, size in zip(paths, sizes_toosmall)] + + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, filesystem=fs + ) + + with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'): + table = dataset_with_size.to_table() + + # too large sizes -> error + sizes_toolarge = [1000000 for path in paths] + fragments_with_size = [file_format.make_fragment(path, fs, file_size=size) + for path, size in zip(paths, sizes_toolarge)] + + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, filesystem=fs + ) + + # invalid range + with pytest.raises(OSError, match='HTTP status 416'): + table = dataset_with_size.to_table() + + def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module): content = textwrap.dedent(""" alpha,num,animal