Skip to content

Commit

Permalink
add bindings to parquet file format properties in R and python
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Feb 23, 2020
1 parent 5540eb9 commit 7b6d619
Show file tree
Hide file tree
Showing 9 changed files with 5,385 additions and 3,481 deletions.
44 changes: 41 additions & 3 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,50 @@ cdef class ParquetFileFormat(FileFormat):
CParquetFileFormat* parquet_format

def __init__(self):
self.init(shared_ptr[CFileFormat](new CParquetFileFormat()))
self.init(<shared_ptr[CFileFormat]> CParquetFileFormat.Make())
self.parquet_format = <CParquetFileFormat*> self.wrapped.get()

@property
def use_buffered_stream(self):
"""The arrow Schema describing the partition scheme."""
return self.wrapped.
"""Read files through buffered input streams rather than
loading entire chunks at a time."""
return self.parquet_format.use_buffered_stream

@use_buffered_stream.setter
def use_buffered_stream(self, bint value):
self.parquet_format.use_buffered_stream = value

@property
def buffer_size(self):
"""Size of buffered stream, if enabled."""
return self.parquet_format.buffer_size

@buffer_size.setter
def buffer_size(self, int value):
self.parquet_format.buffer_size = value

@property
def read_dict_indices(self):
"""Indices of columns which should be read as dictionaries."""
return self.parquet_format.read_dict_indices

@read_dict_indices.setter
def read_dict_indices(self, set values):
self.parquet_format.read_dict_indices.clear()
for value in values:
self.read_dict_index(int(value))

def read_dict_index(self, int value):
self.parquet_format.read_dict_indices.insert(value)

@property
def batch_size(self):
"""Maximum number of rows in read record batches."""
return self.parquet_format.batch_size

@batch_size.setter
def batch_size(self, int value):
self.parquet_format.batch_size = value


cdef class IpcFileFormat(FileFormat):
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def dataset(sources, filesystem=None, partitioning=None, format=None):
case, the additional keywords will be ignored).
filesystem : FileSystem, default None
By default will be inferred from the path.
partitioning : Partitioning(Factory), str, list of str
partitioning : Partitioning, PartitioningFactory, str, list of str
The partitioning scheme specified with the ``partitioning()``
function. A flavor string can be used as shortcut, and with a list of
field names a DirectionaryPartitioning will be inferred.
Expand Down
4 changes: 3 additions & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:

cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
CFileFormat):
@staticmethod
shared_ptr[CParquetFileFormat] Make()
c_bool use_buffered_stream
int64_t buffer_size
unordered_set[c_int] read_dict_indices
unordered_set[int] read_dict_indices
int64_t batch_size

cdef cppclass CParquetFragment "arrow::dataset::ParquetFragment"(
Expand Down
22 changes: 15 additions & 7 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ def mockfs():

data = [
list(range(5)),
list(map(float, range(5)))
list(map(float, range(5))),
list(map(str, range(5)))
]
schema = pa.schema([
pa.field('i64', pa.int64()),
pa.field('f64', pa.float64())
pa.field('f64', pa.float64()),
pa.field('str', pa.string())
])
batch = pa.record_batch(data, schema=schema)
table = pa.Table.from_batches([batch])
Expand Down Expand Up @@ -432,6 +434,7 @@ def test_expression_ergonomics():
])
def test_file_system_factory(mockfs, paths_or_selector):
format = ds.ParquetFileFormat()
format.read_dict_indices = {2}

options = ds.FileSystemFactoryOptions('subdir')
options.partitioning = ds.DirectoryPartitioning(
Expand Down Expand Up @@ -464,23 +467,27 @@ def test_file_system_factory(mockfs, paths_or_selector):
scanner = ds.Scanner(dataset)
expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64())
expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64())
expected_str = pa.DictionaryArray.from_arrays(
pa.array([0, 1, 2, 3, 4], type=pa.int32()),
pa.array("0 1 2 3 4".split(), type=pa.string()))
for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']):
expected_group_column = pa.array([group] * 5, type=pa.int32())
expected_key_column = pa.array([key] * 5, type=pa.string())
for batch in task.execute():
assert batch.num_columns == 4
assert batch.num_columns == 5
assert batch[0].equals(expected_i64)
assert batch[1].equals(expected_f64)
assert batch[2].equals(expected_group_column)
assert batch[3].equals(expected_key_column)
assert batch[2].equals(expected_str)
assert batch[3].equals(expected_group_column)
assert batch[4].equals(expected_key_column)

table = dataset.to_table()
assert isinstance(table, pa.Table)
assert len(table) == 10
assert table.num_columns == 4
assert table.num_columns == 5


def test_paritioning_factory(mockfs):
def test_partitioning_factory(mockfs):
paths_or_selector = fs.FileSelector('subdir', recursive=True)
format = ds.ParquetFileFormat()

Expand All @@ -497,6 +504,7 @@ def test_paritioning_factory(mockfs):
expected_schema = pa.schema([
("i64", pa.int64()),
("f64", pa.float64()),
("str", pa.string()),
("group", pa.int32()),
("key", pa.string()),
])
Expand Down
8 changes: 4 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ SourceFactory$create <- function(path,
recursive = recursive
)

format <- FileFormat$create(match.arg(format))
if (is.character(format)) {
format <- FileFormat$create(match.arg(format))
} else {
assert_is(format, "FileFormat")
}

if (!is.null(partitioning)) {
if (inherits(partitioning, "Schema")) {
Expand Down Expand Up @@ -376,12 +380,10 @@ FileFormat <- R6Class("FileFormat", inherit = Object,
)
)
FileFormat$create <- function(format, ...) {
# TODO: pass list(...) options to the initializers
# https://issues.apache.org/jira/browse/ARROW-7547
if (format == "parquet") {
shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make())
shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make(list(...)))
} else if (format %in% c("ipc", "arrow")) { # These are aliases for the same thing
shared_ptr(IpcFileFormat, dataset___IpcFileFormat__Make())
shared_ptr(IpcFileFormat, dataset___IpcFileFormat__Make(list(...)))
} else {
stop("Unsupported file format: ", format, call. = FALSE)
}
Expand Down
Loading

0 comments on commit 7b6d619

Please sign in to comment.