diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index d33d88e9966fe..3c760b299f478 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -54,7 +54,11 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable { : file_info_(std::move(path)), filesystem_(std::move(filesystem)), compression_(compression) {} - + FileSource(std::string path, int64_t size, std::shared_ptr filesystem, + Compression::type compression = Compression::UNCOMPRESSED) + : file_info_(std::move(path), std::move(size)), + filesystem_(std::move(filesystem)), + compression_(compression) {} FileSource(fs::FileInfo info, std::shared_ptr filesystem, Compression::type compression = Compression::UNCOMPRESSED) : file_info_(std::move(info)), diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 559f1335f12c1..3f233c74d5a1f 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -60,6 +60,8 @@ struct ARROW_EXPORT FileInfo : public util::EqualityComparable { explicit FileInfo(std::string path, FileType type = FileType::Unknown) : path_(std::move(path)), type_(type) {} + explicit FileInfo(std::string path, int64_t size, FileType type = FileType::Unknown) + : path_(std::move(path)), type_(type), size_(size) {} /// The file type FileType type() const { return type_; } diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd index 210e5558009ec..67074961ae90b 100644 --- a/python/pyarrow/_dataset.pxd +++ b/python/pyarrow/_dataset.pxd @@ -25,7 +25,7 @@ from pyarrow.lib cimport * from pyarrow._fs cimport FileSystem -cdef CFileSource _make_file_source(object file, FileSystem filesystem=*) +cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int size=*) cdef class DatasetFactory(_Weakrefable): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 48ee676915311..968887f7715f3 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -96,7 +96,7 @@ def _get_parquet_symbol(name): return _dataset_pq and getattr(_dataset_pq, name) -cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): +cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int size=-1): cdef: CFileSource c_source @@ -108,14 +108,14 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None): if isinstance(file, Buffer): c_buffer = pyarrow_unwrap_buffer(file) c_source = CFileSource(move(c_buffer)) - elif _is_path_like(file): if filesystem is None: raise ValueError("cannot construct a FileSource from " "a path without a FileSystem") c_filesystem = filesystem.unwrap() c_path = tobytes(_stringify_path(file)) - c_source = CFileSource(move(c_path), move(c_filesystem)) + c_size = size + c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem)) elif hasattr(file, 'read'): # Optimistically hope this is file-like @@ -1230,7 +1230,7 @@ cdef class FileFormat(_Weakrefable): The schema inferred from the file """ cdef: - CFileSource c_source = _make_file_source(file, filesystem) + CFileSource c_source = _make_file_source(file, filesystem=filesystem, size=-1) CResult[shared_ptr[CSchema]] c_result with nogil: c_result = self.format.Inspect(c_source) @@ -1238,7 +1238,8 @@ cdef class FileFormat(_Weakrefable): return pyarrow_wrap_schema(move(c_schema)) def make_fragment(self, file, filesystem=None, - Expression partition_expression=None): + Expression partition_expression=None, + size=-1): """ Make a FileFragment from a given file. @@ -1252,6 +1253,9 @@ cdef class FileFormat(_Weakrefable): partition_expression : Expression, optional An expression that is guaranteed true for all rows in the fragment. Allows fragment to be potentially skipped while scanning with a filter. + size : int, optional + The size of the file in bytes. Can improve performance with high-latency filesystems + when file size needs to be known before reading. Returns ------- @@ -1260,8 +1264,7 @@ cdef class FileFormat(_Weakrefable): """ if partition_expression is None: partition_expression = _true - - c_source = _make_file_source(file, filesystem) + c_source = _make_file_source(file, filesystem=filesystem, size=size) c_fragment = GetResultValue( self.format.MakeFragment(move(c_source), partition_expression.unwrap(), diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index cf5c44c1c964a..8687c00ddb77b 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -226,7 +226,7 @@ cdef class ParquetFileFormat(FileFormat): return f"" def make_fragment(self, file, filesystem=None, - Expression partition_expression=None, row_groups=None): + Expression partition_expression=None, row_groups=None, size=-1): """ Make a FileFragment from a given file. @@ -242,6 +242,9 @@ cdef class ParquetFileFormat(FileFormat): fragment to be potentially skipped while scanning with a filter. row_groups : Iterable, optional The indices of the row groups to include + size : int, optional + The size of the file in bytes. Can improve performance with high-latency filesystems + when file size needs to be known before reading. Returns ------- @@ -256,9 +259,9 @@ cdef class ParquetFileFormat(FileFormat): if row_groups is None: return super().make_fragment(file, filesystem, - partition_expression) + partition_expression, size) - c_source = _make_file_source(file, filesystem) + c_source = _make_file_source(file, filesystem, size=size) c_row_groups = [ row_group for row_group in set(row_groups)] c_fragment = GetResultValue( diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 8901d763e3998..063dc2f6d7a85 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: const c_string& path() const const shared_ptr[CFileSystem]& filesystem() const const shared_ptr[CBuffer]& buffer() const + const int size() const # HACK: Cython can't handle all the overloads so don't declare them. # This means invalid construction of CFileSource won't be caught in # the C++ generation phase (though it will still be caught when diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 39c3c43daea37..e2627eee2a047 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -981,6 +981,49 @@ def test_make_fragment(multisourcefs): assert row_group_fragment.row_groups == [0] +@pytest.mark.parquet +@pytest.mark.s3 +def test_make_fragment_with_size(s3_example_simple): + table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple + + file_format = ds.ParquetFileFormat() + paths = [path] + + fragments = [file_format.make_fragment(path, fs) + for path in paths] + + dataset = ds.FileSystemDataset( + fragments, format=file_format, schema=table.schema, + filesystem = fs + ) + + tbl = dataset.to_table() + assert tbl.equals(table) + + sizes_toosmall = [1] + fragments_with_size = [file_format.make_fragment(path, fs, size=size) + for path, size in zip(paths, sizes_toosmall)] + + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, + filesystem = fs + ) + + with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'): + table = dataset_with_size.to_table() + + sizes_toolarge = [1000000] + fragments_with_size = [file_format.make_fragment(path, fs, size=size) + for path, size in zip(paths, sizes_toolarge)] + + dataset_with_size = ds.FileSystemDataset( + fragments_with_size, format=file_format, schema=table.schema, + filesystem = fs + ) + + with pytest.raises(OSError, match='ExceptionName: InvalidRange'): + table = dataset_with_size.to_table() + def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module): content = textwrap.dedent(""" alpha,num,animal