From de7cae88edea7e679c88fb06d8c3660867d34b7e Mon Sep 17 00:00:00 2001 From: Jim Pivarski Date: Fri, 29 Apr 2022 16:02:19 -0500 Subject: [PATCH] Fixes `to_layout` with `allow_records=False` and allows single-record writing to Arrow and Parquet (#1456) * First, fix #1453. * Turn Records into length-1 Arrays in v1. * Implemented record_is_scalar in metadata, but no tests yet. * Add a test. * Also to_arrow_table. --- src/awkward/_v2/_connect/pyarrow.py | 21 ++++++++++-- src/awkward/_v2/contents/content.py | 2 ++ src/awkward/_v2/contents/emptyarray.py | 6 +++- src/awkward/_v2/contents/indexedarray.py | 6 +++- src/awkward/_v2/contents/listoffsetarray.py | 12 +++++-- src/awkward/_v2/contents/numpyarray.py | 6 +++- src/awkward/_v2/contents/recordarray.py | 6 +++- src/awkward/_v2/contents/regulararray.py | 2 ++ src/awkward/_v2/contents/unionarray.py | 6 +++- .../_v2/operations/convert/ak_from_arrow.py | 3 ++ .../_v2/operations/convert/ak_from_parquet.py | 2 ++ .../_v2/operations/convert/ak_to_arrow.py | 8 ++++- .../operations/convert/ak_to_arrow_table.py | 14 ++++++-- .../_v2/operations/convert/ak_to_layout.py | 11 +++++-- .../_v2/operations/convert/ak_to_parquet.py | 13 +++++--- src/awkward/operations/convert.py | 33 +++++++++++++++---- ...st_1453-write-single-records-to-parquet.py | 26 +++++++++++++++ 17 files changed, 151 insertions(+), 26 deletions(-) create mode 100644 tests/v2/test_1453-write-single-records-to-parquet.py diff --git a/src/awkward/_v2/_connect/pyarrow.py b/src/awkward/_v2/_connect/pyarrow.py index 3b1879f8f4..6e62a94b35 100644 --- a/src/awkward/_v2/_connect/pyarrow.py +++ b/src/awkward/_v2/_connect/pyarrow.py @@ -93,12 +93,14 @@ def __init__( mask_parameters, node_parameters, record_is_tuple, + record_is_scalar, ): self._mask_type = mask_type self._node_type = node_type self._mask_parameters = mask_parameters self._node_parameters = node_parameters self._record_is_tuple = record_is_tuple + self._record_is_scalar = record_is_scalar super().__init__(storage_type, "awkward") def __str__(self): @@ -127,6 +129,10 @@ def node_parameters(self): def record_is_tuple(self): return self._record_is_tuple + @property + def record_is_scalar(self): + return self._record_is_scalar + def __arrow_ext_class__(self): return AwkwardArrowArray @@ -138,6 +144,7 @@ def __arrow_ext_serialize__(self): "mask_parameters": self._mask_parameters, "node_parameters": self._node_parameters, "record_is_tuple": self._record_is_tuple, + "record_is_scalar": self._record_is_scalar, } ).encode(errors="surrogatescape") @@ -151,6 +158,7 @@ def __arrow_ext_deserialize__(cls, storage_type, serialized): metadata["mask_parameters"], metadata["node_parameters"], metadata["record_is_tuple"], + metadata["record_is_scalar"], ) @property @@ -162,7 +170,7 @@ def num_fields(self): return self.storage_type.num_fields pyarrow.register_extension_type( - AwkwardArrowType(pyarrow.null(), None, None, None, None, None) + AwkwardArrowType(pyarrow.null(), None, None, None, None, None, None) ) # order is important; _string_like[:2] vs _string_like[::2] @@ -861,7 +869,9 @@ def form_popbuffers(awkwardarrow_type, storage_type): ) -def to_awkwardarrow_type(storage_type, use_extensionarray, mask, node): +def to_awkwardarrow_type( + storage_type, use_extensionarray, record_is_scalar, mask, node +): if use_extensionarray: return AwkwardArrowType( storage_type, @@ -870,6 +880,7 @@ def to_awkwardarrow_type(storage_type, use_extensionarray, mask, node): None if mask is None else mask.parameters, None if node is None else node.parameters, node.is_tuple if isinstance(node, ak._v2.contents.RecordArray) else None, + record_is_scalar, ) else: return storage_type @@ -929,6 +940,7 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False): else: record_is_optiontype = False optiontype_fields = [] + record_is_scalar = False optiontype_parameters = None recordtype_parameters = None if ( @@ -940,6 +952,8 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False): (value,) = x.values() if key == "optiontype_fields": optiontype_fields = value + elif key == "record_is_scalar": + record_is_scalar = value elif key in ( "UnmaskedArray", "BitMaskedArray", @@ -975,6 +989,9 @@ def handle_arrow(obj, generate_bitmasks=False, pass_empty_field=False): parameters=recordtype_parameters, ) + if record_is_scalar: + return out._getitem_at(0) + if record_is_optiontype and record_mask is None and generate_bitmasks: record_mask = numpy.zeros(len(out), dtype=np.bool_) diff --git a/src/awkward/_v2/contents/content.py b/src/awkward/_v2/contents/content.py index 4de99589fb..eb88c4784c 100644 --- a/src/awkward/_v2/contents/content.py +++ b/src/awkward/_v2/contents/content.py @@ -1271,6 +1271,7 @@ def to_arrow( categorical_as_dictionary=False, extensionarray=True, count_nulls=True, + record_is_scalar=False, ): import awkward._v2._connect.pyarrow @@ -1288,6 +1289,7 @@ def to_arrow( "categorical_as_dictionary": categorical_as_dictionary, "extensionarray": extensionarray, "count_nulls": count_nulls, + "record_is_scalar": record_is_scalar, }, ) diff --git a/src/awkward/_v2/contents/emptyarray.py b/src/awkward/_v2/contents/emptyarray.py index eba1550000..7041e7811c 100644 --- a/src/awkward/_v2/contents/emptyarray.py +++ b/src/awkward/_v2/contents/emptyarray.py @@ -274,7 +274,11 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): if options["emptyarray_to"] is None: return pyarrow.Array.from_buffers( ak._v2._connect.pyarrow.to_awkwardarrow_type( - pyarrow.null(), options["extensionarray"], mask_node, self + pyarrow.null(), + options["extensionarray"], + options["record_is_scalar"], + mask_node, + self, ), length, [ diff --git a/src/awkward/_v2/contents/indexedarray.py b/src/awkward/_v2/contents/indexedarray.py index 2b199a5c26..4402e85db3 100644 --- a/src/awkward/_v2/contents/indexedarray.py +++ b/src/awkward/_v2/contents/indexedarray.py @@ -1142,7 +1142,11 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): if options["extensionarray"]: return ak._v2._connect.pyarrow.AwkwardArrowArray.from_storage( ak._v2._connect.pyarrow.to_awkwardarrow_type( - out.type, options["extensionarray"], mask_node, self + out.type, + options["extensionarray"], + options["record_is_scalar"], + mask_node, + self, ), out, ) diff --git a/src/awkward/_v2/contents/listoffsetarray.py b/src/awkward/_v2/contents/listoffsetarray.py index 427904ad9c..876c3849c7 100644 --- a/src/awkward/_v2/contents/listoffsetarray.py +++ b/src/awkward/_v2/contents/listoffsetarray.py @@ -1953,7 +1953,11 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): return pyarrow.Array.from_buffers( ak._v2._connect.pyarrow.to_awkwardarrow_type( - string_type, options["extensionarray"], mask_node, self + string_type, + options["extensionarray"], + options["record_is_scalar"], + mask_node, + self, ), length, [ @@ -1979,7 +1983,11 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): return pyarrow.Array.from_buffers( ak._v2._connect.pyarrow.to_awkwardarrow_type( - list_type, options["extensionarray"], mask_node, self + list_type, + options["extensionarray"], + options["record_is_scalar"], + mask_node, + self, ), length, [ diff --git a/src/awkward/_v2/contents/numpyarray.py b/src/awkward/_v2/contents/numpyarray.py index 43b1b823a7..cd0303382b 100644 --- a/src/awkward/_v2/contents/numpyarray.py +++ b/src/awkward/_v2/contents/numpyarray.py @@ -1233,7 +1233,11 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): return pyarrow.Array.from_buffers( ak._v2._connect.pyarrow.to_awkwardarrow_type( - storage_type, options["extensionarray"], mask_node, self + storage_type, + options["extensionarray"], + options["record_is_scalar"], + mask_node, + self, ), length, [ diff --git a/src/awkward/_v2/contents/recordarray.py b/src/awkward/_v2/contents/recordarray.py index af0451491c..0555f0f7b3 100644 --- a/src/awkward/_v2/contents/recordarray.py +++ b/src/awkward/_v2/contents/recordarray.py @@ -889,7 +889,11 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): return pyarrow.Array.from_buffers( ak._v2._connect.pyarrow.to_awkwardarrow_type( - types, options["extensionarray"], mask_node, self + types, + options["extensionarray"], + options["record_is_scalar"], + mask_node, + self, ), length, [ak._v2._connect.pyarrow.to_validbits(validbytes)], diff --git a/src/awkward/_v2/contents/regulararray.py b/src/awkward/_v2/contents/regulararray.py index 94ad099452..6cb7f246ef 100644 --- a/src/awkward/_v2/contents/regulararray.py +++ b/src/awkward/_v2/contents/regulararray.py @@ -1102,6 +1102,7 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): ak._v2._connect.pyarrow.to_awkwardarrow_type( pyarrow.binary(self._size), options["extensionarray"], + options["record_is_scalar"], mask_node, self, ), @@ -1125,6 +1126,7 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): ak._v2._connect.pyarrow.to_awkwardarrow_type( pyarrow.list_(content_type, self._size), options["extensionarray"], + options["record_is_scalar"], mask_node, self, ), diff --git a/src/awkward/_v2/contents/unionarray.py b/src/awkward/_v2/contents/unionarray.py index 555da80fbf..a9a44a3dbc 100644 --- a/src/awkward/_v2/contents/unionarray.py +++ b/src/awkward/_v2/contents/unionarray.py @@ -1260,7 +1260,11 @@ def _to_arrow(self, pyarrow, mask_node, validbytes, length, options): return pyarrow.Array.from_buffers( ak._v2._connect.pyarrow.to_awkwardarrow_type( - types, options["extensionarray"], None, self + types, + options["extensionarray"], + options["record_is_scalar"], + None, + self, ), nptags.shape[0], [ diff --git a/src/awkward/_v2/operations/convert/ak_from_arrow.py b/src/awkward/_v2/operations/convert/ak_from_arrow.py index ba64702f7b..deafdd7386 100644 --- a/src/awkward/_v2/operations/convert/ak_from_arrow.py +++ b/src/awkward/_v2/operations/convert/ak_from_arrow.py @@ -65,4 +65,7 @@ def _impl(array, generate_bitmasks, highlevel, behavior): if awkwardarrow_type.mask_type in (None, "IndexedArray"): out = awkward._v2._connect.pyarrow.remove_optiontype(out) + if awkwardarrow_type.record_is_scalar: + out = out._getitem_at(0) + return ak._v2._util.wrap(out, behavior, highlevel) diff --git a/src/awkward/_v2/operations/convert/ak_from_parquet.py b/src/awkward/_v2/operations/convert/ak_from_parquet.py index 66e616b807..08301d9cd5 100644 --- a/src/awkward/_v2/operations/convert/ak_from_parquet.py +++ b/src/awkward/_v2/operations/convert/ak_from_parquet.py @@ -242,6 +242,8 @@ def _load( return ak._v2.operations.convert.ak_from_buffers._impl( subform, 0, _DictOfEmptyBuffers(), "", numpy, highlevel, behavior ) + elif len(arrays) == 1 and isinstance(arrays[0], ak._v2.record.Record): + return ak._v2._util.wrap(arrays[0], behavior, highlevel) else: return ak._v2.operations.structure.ak_concatenate._impl( arrays, 0, True, True, highlevel, behavior diff --git a/src/awkward/_v2/operations/convert/ak_to_arrow.py b/src/awkward/_v2/operations/convert/ak_to_arrow.py index 57b4244830..aed812c557 100644 --- a/src/awkward/_v2/operations/convert/ak_to_arrow.py +++ b/src/awkward/_v2/operations/convert/ak_to_arrow.py @@ -97,8 +97,13 @@ def _impl( count_nulls, ): layout = ak._v2.operations.convert.to_layout( - array, allow_record=False, allow_other=False + array, allow_record=True, allow_other=False ) + if isinstance(layout, ak._v2.record.Record): + layout = layout.array[layout.at : layout.at + 1] + record_is_scalar = True + else: + record_is_scalar = False return layout.to_arrow( list_to32=list_to32, @@ -108,4 +113,5 @@ def _impl( categorical_as_dictionary=categorical_as_dictionary, extensionarray=extensionarray, count_nulls=count_nulls, + record_is_scalar=record_is_scalar, ) diff --git a/src/awkward/_v2/operations/convert/ak_to_arrow_table.py b/src/awkward/_v2/operations/convert/ak_to_arrow_table.py index 73319d25e0..955bd8cb92 100644 --- a/src/awkward/_v2/operations/convert/ak_to_arrow_table.py +++ b/src/awkward/_v2/operations/convert/ak_to_arrow_table.py @@ -100,8 +100,13 @@ def _impl( from awkward._v2._connect.pyarrow import pyarrow layout = ak._v2.operations.convert.to_layout( - array, allow_record=False, allow_other=False + array, allow_record=True, allow_other=False ) + if isinstance(layout, ak._v2.record.Record): + layout = layout.array[layout.at : layout.at + 1] + record_is_scalar = True + else: + record_is_scalar = False check = [layout] while check[-1].is_OptionType or check[-1].is_IndexedType: @@ -121,6 +126,7 @@ def _impl( categorical_as_dictionary=categorical_as_dictionary, extensionarray=extensionarray, count_nulls=count_nulls, + record_is_scalar=record_is_scalar, ) ) pafields.append( @@ -131,7 +137,10 @@ def _impl( if check[-1].contents[check[-1].field_to_index(name)].is_OptionType: optiontype_fields.append(name) - parameters = [{"optiontype_fields": optiontype_fields}] + parameters = [ + {"optiontype_fields": optiontype_fields}, + {"record_is_scalar": record_is_scalar}, + ] for x in check: parameters.append( {ak._v2._util.direct_Content_subclass(x).__name__: x._parameters} @@ -147,6 +156,7 @@ def _impl( categorical_as_dictionary=categorical_as_dictionary, extensionarray=extensionarray, count_nulls=count_nulls, + record_is_scalar=record_is_scalar, ) ) pafields.append( diff --git a/src/awkward/_v2/operations/convert/ak_to_layout.py b/src/awkward/_v2/operations/convert/ak_to_layout.py index edeac499ba..9c21168d24 100644 --- a/src/awkward/_v2/operations/convert/ak_to_layout.py +++ b/src/awkward/_v2/operations/convert/ak_to_layout.py @@ -51,7 +51,7 @@ def _impl(array, allow_record, allow_other, numpytype): elif isinstance(array, ak._v2.record.Record): if not allow_record: raise ak._v2._util.error( - TypeError("ak._v2.Record objects are not allowed here") + TypeError("ak._v2.Record objects are not allowed in this function") ) else: return array @@ -59,8 +59,13 @@ def _impl(array, allow_record, allow_other, numpytype): elif isinstance(array, ak._v2.highlevel.Array): return array.layout - elif allow_record and isinstance(array, ak._v2.highlevel.Record): - return array.layout + elif isinstance(array, ak._v2.highlevel.Record): + if not allow_record: + raise ak._v2._util.error( + TypeError("ak._v2.Record objects are not allowed in this function") + ) + else: + return array.layout # elif isinstance(array, ak._v2.highlevel.ArrayBuilder): # return array.snapshot().layout diff --git a/src/awkward/_v2/operations/convert/ak_to_parquet.py b/src/awkward/_v2/operations/convert/ak_to_parquet.py index d5e470872b..f41d3d562e 100644 --- a/src/awkward/_v2/operations/convert/ak_to_parquet.py +++ b/src/awkward/_v2/operations/convert/ak_to_parquet.py @@ -40,7 +40,9 @@ def to_parquet( ) fsspec = awkward._v2._connect.pyarrow.import_fsspec("ak.to_parquet") - if isinstance(data, Iterable) and not isinstance(data, Sized): + if isinstance(data, (ak._v2.highlevel.Record, ak._v2.record.Record)): + iterator = iter([data]) + elif isinstance(data, Iterable) and not isinstance(data, Sized): iterator = iter(data) elif isinstance(data, Iterable): iterator = iter([data]) @@ -54,7 +56,7 @@ def to_parquet( row_group = 0 array = next(iterator) layout = ak._v2.operations.convert.ak_to_layout.to_layout( - array, allow_record=False, allow_other=False + array, allow_record=True, allow_other=False ) table = ak._v2.operations.convert.ak_to_arrow_table._impl( layout, @@ -77,7 +79,10 @@ def to_parquet( else: column_prefix = () - form = layout.form + if isinstance(layout, ak._v2.record.Record): + form = layout.array.form + else: + form = layout.form def parquet_columns(specifier, only=None): if specifier is None: @@ -200,7 +205,7 @@ def parquet_columns(specifier, only=None): except StopIteration: break layout = ak._v2.operations.convert.ak_to_layout.to_layout( - array, allow_record=False, allow_other=False + array, allow_record=True, allow_other=False ) table = ak._v2.operations.convert.ak_to_arrow_table._impl( layout, diff --git a/src/awkward/operations/convert.py b/src/awkward/operations/convert.py index 68df28f03d..994f982d97 100644 --- a/src/awkward/operations/convert.py +++ b/src/awkward/operations/convert.py @@ -1882,8 +1882,14 @@ def to_layout( if isinstance(array, ak.highlevel.Array): return array.layout - elif allow_record and isinstance(array, ak.highlevel.Record): - return array.layout + elif isinstance(array, ak.highlevel.Record): + if not allow_record: + raise TypeError( + "ak.Record objects are not allowed in this function" + + ak._util.exception_suffix(__file__) + ) + else: + return array.layout elif isinstance(array, ak.highlevel.ArrayBuilder): return array.snapshot().layout @@ -1894,8 +1900,14 @@ def to_layout( elif isinstance(array, (ak.layout.Content, ak.partition.PartitionedArray)): return array - elif allow_record and isinstance(array, ak.layout.Record): - return array + elif isinstance(array, ak.layout.Record): + if not allow_record: + raise TypeError( + "ak.Record objects are not allowed in this function" + + ak._util.exception_suffix(__file__) + ) + else: + return array elif isinstance(array, (np.ndarray, numpy.ma.MaskedArray)): if not issubclass(array.dtype.type, numpytype): @@ -2014,7 +2026,9 @@ def to_arrow( """ pyarrow = _import_pyarrow("ak.to_arrow") - layout = to_layout(array, allow_record=False, allow_other=False) + layout = to_layout(array, allow_record=True, allow_other=False) + if isinstance(layout, ak.layout.Record): + layout = layout.array[layout.at : layout.at + 1] def recurse(layout, mask, is_option): if isinstance(layout, ak.layout.NumpyArray): @@ -2541,7 +2555,9 @@ def to_arrow_table( """ pyarrow = _import_pyarrow("ak.to_arrow_table") - layout = to_layout(array, allow_record=False, allow_other=False) + layout = to_layout(array, allow_record=True, allow_other=False) + if isinstance(layout, ak.layout.Record): + layout = layout.array[layout.at : layout.at + 1] if explode_records or isinstance( ak.operations.describe.type(layout), ak.types.RecordType @@ -3041,7 +3057,10 @@ def batch_iterator(layout): pa_arrays, schema=pyarrow.schema(pa_fields) ) - layout = to_layout(array, allow_record=False, allow_other=False) + layout = to_layout(array, allow_record=True, allow_other=False) + if isinstance(layout, ak.layout.Record): + layout = layout.array[layout.at : layout.at + 1] + iterator = batch_iterator(layout) first = next(iterator) diff --git a/tests/v2/test_1453-write-single-records-to-parquet.py b/tests/v2/test_1453-write-single-records-to-parquet.py new file mode 100644 index 0000000000..0220148f9f --- /dev/null +++ b/tests/v2/test_1453-write-single-records-to-parquet.py @@ -0,0 +1,26 @@ +# BSD 3-Clause License; see https://github.com/scikit-hep/awkward-1.0/blob/main/LICENSE + +import os + +import pytest # noqa: F401 +import numpy as np # noqa: F401 +import awkward as ak # noqa: F401 + +pytest.importorskip("pyarrow") +pytest.importorskip("pyarrow.parquet") +pytest.importorskip("fsspec") + + +def test(tmp_path): + filename = os.path.join(tmp_path, "whatever.parquet") + + original = ak._v2.Record({"x": 1, "y": [1, 2, 3], "z": "THREE"}) + + assert ak._v2.from_arrow(ak._v2.to_arrow(original)).tolist() == original.tolist() + + assert ( + ak._v2.from_arrow(ak._v2.to_arrow_table(original)).tolist() == original.tolist() + ) + + ak._v2.to_parquet(original, filename) + assert ak._v2.from_parquet(filename).tolist() == original.tolist()