From 96e15ebb55762550e2e61df906b5df5af5eb577c Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 19 Jul 2022 13:21:40 -0400 Subject: [PATCH] parquet redux (#1476) * stop * get some tests going * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Selection * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * a fix * questionable fix * tidy * update exception calls * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * correct to_parq (thanks flake) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Jim Pivarski --- src/awkward/_v2/operations/ak_from_parquet.py | 285 ++++++++++-------- .../operations/ak_metadata_from_parquet.py | 91 +++--- src/awkward/_v2/operations/ak_to_parquet.py | 147 +++++---- tests/v2/test_1294-to-and-from_parquet.py | 93 ++++++ tests/v2/test_1440-start-v2-to_parquet.py | 10 +- 5 files changed, 373 insertions(+), 253 deletions(-) diff --git a/src/awkward/_v2/operations/ak_from_parquet.py b/src/awkward/_v2/operations/ak_from_parquet.py index d1ceabb808..aa92bbeda2 100644 --- a/src/awkward/_v2/operations/ak_from_parquet.py +++ b/src/awkward/_v2/operations/ak_from_parquet.py @@ -65,18 +65,15 @@ def from_parquet( ): import awkward._v2._connect.pyarrow # noqa: F401 - parquet_columns, subform, actual_paths, fs, subrg, meta = _metadata( + parquet_columns, subform, actual_paths, fs, subrg, row_counts, meta = metadata( path, storage_options, row_groups, columns, - max_gap, - max_block, - footer_sample_size, ) return _load( actual_paths, - parquet_columns, + parquet_columns if columns is not None else None, subrg, max_gap, max_block, @@ -86,127 +83,122 @@ def from_parquet( highlevel, behavior, fs, - meta, ) -def _metadata( - path, storage_options, row_groups, columns, max_gap, max_block, footer_sample_size +def metadata( + path, + storage_options=None, + row_groups=None, + columns=None, + ignore_metadata=False, + scan_files=True, ): - import pyarrow.parquet as pyarrow_parquet + import awkward._v2._connect.pyarrow + + # early exit if missing deps + pyarrow_parquet = awkward._v2._connect.pyarrow.import_pyarrow_parquet( + "ak._v2.from_parquet" + ) import fsspec.parquet if row_groups is not None: if not all(ak._v2._util.isint(x) and x >= 0 for x in row_groups): raise ak._v2._util.error( - TypeError("row_groups must be a set of non-negative integers") + ValueError("row_groups must be a set of non-negative integers") ) + if len(set(row_groups)) < len(row_groups): + raise ak._v2._util.error(ValueError("row group indices must not repeat")) fs, _, paths = fsspec.get_fs_token_paths( path, mode="rb", storage_options=storage_options ) - all_paths, path_for_metadata = _all_and_metadata_paths(path, fs, paths) + all_paths, path_for_schema, can_sub = _all_and_metadata_paths( + path, fs, paths, ignore_metadata, scan_files + ) - parquet_columns = None - subform = None subrg = [None] * len(all_paths) actual_paths = all_paths - with fsspec.parquet.open_parquet_file( - path_for_metadata, - fs=fs, - engine="pyarrow", - row_groups=[], - storage_options=storage_options, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, + with fs.open( + path_for_schema, ) as file_for_metadata: parquetfile_for_metadata = pyarrow_parquet.ParquetFile(file_for_metadata) - if columns is not None: - list_indicator = "list.item" - for column_metadata in parquetfile_for_metadata.schema: - if ( - column_metadata.max_repetition_level > 0 - and ".list.element." in column_metadata.path - ): - list_indicator = "list.element" - break - - form = ak._v2._connect.pyarrow.form_handle_arrow( - parquetfile_for_metadata.schema_arrow, pass_empty_field=True + list_indicator = "list.item" + for column_metadata in parquetfile_for_metadata.schema: + if ( + column_metadata.max_repetition_level > 0 + and ".list.element." in column_metadata.path + ): + list_indicator = "list.element" + break + if columns is not None: + + form = ak._v2._connect.pyarrow.form_handle_arrow( + parquetfile_for_metadata.schema_arrow, pass_empty_field=True + ) + subform = form.select_columns(columns) + else: + subform = ak._v2._connect.pyarrow.form_handle_arrow( + parquetfile_for_metadata.schema_arrow, pass_empty_field=True + ) + + metadata = parquetfile_for_metadata.metadata + if scan_files and not path_for_schema.endswith("/_metadata"): + if path_for_schema in all_paths: + scan_paths = all_paths[1:] + else: + scan_paths = all_paths + for apath in scan_paths: + with fs.open(apath, "rb") as f: + md = pyarrow_parquet.ParquetFile(f).metadata + # TODO: not nested directory structure yet + md.set_file_path(apath.rsplit("/", 1)[-1]) + metadata.append_row_groups(md) + if row_groups is not None: + if any(_ >= metadata.num_row_groups for _ in row_groups): + raise ak._v2._util.error( + ValueError( + f"Row group selection out of bounds 0..{metadata.num_row_groups - 1}" + ) ) - subform = form.select_columns(columns) - parquet_columns = subform.columns(list_indicator=list_indicator) - if parquetfile_for_metadata.schema_arrow.names == [""]: - parquet_columns = ["." + x for x in parquet_columns] - - metadata = parquetfile_for_metadata.metadata - if row_groups is not None: - eoln = "\n " - if any(not 0 <= rg < metadata.num_row_groups for rg in row_groups): - raise ak._v2._util.error( - ValueError( - f"one of the requested row_groups is out of range " - f"(must be less than {metadata.num_row_groups})" - ) + if not can_sub: + raise ak._v2._util.error( + TypeError( + "Requested selection of row-groups, but not scanning metadata" ) - - split_paths = [p.split("/") for p in all_paths] - prev_index = None - prev_i = 0 - actual_paths = [] - subrg = [] - for i in range(metadata.num_row_groups): - unsplit_path = metadata.row_group(i).column(0).file_path - if unsplit_path == "": - if len(all_paths) == 1: - index = 0 - else: - raise ak._v2._util.error( - LookupError( - f"""path from metadata is {unsplit_path!r} but more - than one path matches: - - {eoln.join(all_paths)}""" - ) - ) - - else: - split_path = unsplit_path.split("/") - index = None - for j, compare in enumerate(split_paths): - if split_path == compare[-len(split_path) :]: - index = j - break - if index is None: - raise ak._v2._util.error( - LookupError( - f"""path {'/'.join(split_path)!r} from metadata not found - in path matches: - - {eoln.join(all_paths)}""" - ) - ) - - if prev_index != index: - prev_index = index - prev_i = i - actual_paths.append(all_paths[index]) - subrg.append([]) - - if i in row_groups: - subrg[-1].append(i - prev_i) - - for k in range(len(subrg) - 1, -1, -1): - if len(subrg[k]) == 0: - del actual_paths[k] - del subrg[k] - if subform is None: - subform = ak._v2._connect.pyarrow.form_handle_arrow( - parquetfile_for_metadata.schema_arrow, pass_empty_field=True ) - return parquet_columns, subform, actual_paths, fs, subrg, metadata + + path_rgs = {} + rgs_path = {} + subrg = [] + col_counts = [] + for i in range(metadata.num_row_groups): + fp = metadata.row_group(i).column(0).file_path + path_rgs.setdefault(fp, []).append(i) + rgs_path[i] = fp + + actual_paths = [] + for select in row_groups: + path = rgs_path[select] + path2 = [_ for _ in all_paths if _.endswith(path)][0] + if path2 not in actual_paths: + actual_paths.append(path2) + subrg.append([path_rgs[path].index(select)]) + else: + subrg[-1].append(path_rgs[path].index(select)) + col_counts.append(metadata.row_group(select).num_rows) + else: + if can_sub: + col_counts = [ + metadata.row_group(i).num_rows for i in range(metadata.num_row_groups) + ] + else: + col_counts = None + + parquet_columns = subform.columns(list_indicator=list_indicator) + + return parquet_columns, subform, actual_paths, fs, subrg, col_counts, metadata def _load( @@ -221,7 +213,7 @@ def _load( highlevel, behavior, fs, - meta, + metadata=None, ): arrays = [] for i, p in enumerate(actual_paths): @@ -235,7 +227,7 @@ def _load( max_block=max_block, footer_sample_size=footer_sample_size, generate_bitmasks=generate_bitmasks, - metadata=meta, + metadata=metadata, ) ) @@ -244,14 +236,41 @@ def _load( return ak._v2.operations.ak_from_buffers._impl( subform, 0, _DictOfEmptyBuffers(), "", numpy, highlevel, behavior ) - elif len(arrays) == 1 and isinstance(arrays[0], ak._v2.record.Record): - return ak._v2._util.wrap(arrays[0], behavior, highlevel) + elif len(arrays) == 1: + # make high-level + if isinstance(arrays[0], ak._v2.record.Record): + return ak._v2.Record(arrays[0]) + return ak._v2.Array(arrays[0]) else: + # TODO: if each array is a record? return ak._v2.operations.ak_concatenate._impl( arrays, 0, True, True, highlevel, behavior ) +def _open_file( + path, fs, columns, row_groups, max_gap, max_block, footer_sample_size, metadata +): + """Picks between fsspec.parquet and normal fs.open""" + import fsspec.parquet + + # condition should be if columns and ow_groups are not all the possible ones + if (columns or row_groups) and getattr(fs, "async_impl", False): + return fsspec.parquet.open_parquet_file( + path, + fs=fs, + engine="pyarrow", + columns=columns, + row_groups=row_groups, + max_gap=max_gap, + metadata=metadata, + max_block=max_block, + footer_sample_size=footer_sample_size, + ) + else: + return fs.open(path, "rb") + + def _read_parquet_file( path, fs, @@ -260,21 +279,20 @@ def _read_parquet_file( footer_sample_size, max_gap, max_block, - metadata, generate_bitmasks, + metadata=None, ): - import fsspec.parquet import pyarrow.parquet as pyarrow_parquet - with fsspec.parquet.open_parquet_file( + with _open_file( path, - fs=fs, - engine="pyarrow", - columns=parquet_columns, - row_groups=row_groups, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, + fs, + parquet_columns, + row_groups, + max_gap, + max_block, + footer_sample_size, + metadata, ) as file: parquetfile = pyarrow_parquet.ParquetFile(file) @@ -286,6 +304,7 @@ def _read_parquet_file( return ak._v2.operations.ak_from_arrow._impl( arrow_table, generate_bitmasks, + # why is high-level False here? False, None, ) @@ -296,25 +315,33 @@ def __getitem__(self, where): return b"\x00\x00\x00\x00\x00\x00\x00\x00" -def _all_and_metadata_paths(path, fs, paths): +def _all_and_metadata_paths(path, fs, paths, ignore_metadata=False, scan_files=True): all_paths = [] for x in paths: if fs.isfile(x): - is_meta = x.split("/")[-1] == "_metadata" - is_comm = x.split("/")[-1] == "_common_metadata" + is_meta = x.rsplit("/", 1)[-1] == "_metadata" + if is_meta and ignore_metadata: + continue + is_comm = x.rsplit("/", 1)[-1] == "_common_metadata" + if is_comm and scan_files: + continue all_paths.append((x, is_meta, is_comm)) elif fs.isdir(x): - for prefix, _, files in fs.walk(x): - for f in files: - is_meta = f == "_metadata" - is_comm = f == "_common_metadata" - if f.endswith((".parq", ".parquet")) or is_meta or is_comm: - if fs.isfile("/".join((prefix, f))): - all_paths.append(("/".join((prefix, f)), is_meta, is_comm)) + for f, fdata in fs.find(x, detail=True).items(): + is_meta = f.endswith("_metadata") + if is_meta and ignore_metadata: + continue + is_comm = f.endswith("_common_metadata") + if is_comm and scan_files: + continue + if f.endswith((".parq", ".parquet")) or is_meta or is_comm: + if fdata["type"] == "file": + all_paths.append((f, is_meta, is_comm)) path_for_metadata = [x for x, is_meta, is_comm in all_paths if is_meta] if len(path_for_metadata) != 0: path_for_metadata = path_for_metadata[0] + can_sub = True else: path_for_metadata = [x for x, is_meta, is_comm in all_paths if is_comm] if len(path_for_metadata) != 0: @@ -322,6 +349,8 @@ def _all_and_metadata_paths(path, fs, paths): else: if len(all_paths) != 0: path_for_metadata = all_paths[0][0] + # we will still know rew-groups and counts if we scan, so can sub-select + can_sub = scan_files or len(all_paths) == 1 all_paths = [x for x, is_meta, is_comm in all_paths if not is_meta and not is_comm] @@ -330,4 +359,4 @@ def _all_and_metadata_paths(path, fs, paths): ValueError(f"no *.parquet or *.parq matches for path {path!r}") ) - return all_paths, path_for_metadata + return all_paths, path_for_metadata, can_sub diff --git a/src/awkward/_v2/operations/ak_metadata_from_parquet.py b/src/awkward/_v2/operations/ak_metadata_from_parquet.py index 8ea36346b2..4936dc5c41 100644 --- a/src/awkward/_v2/operations/ak_metadata_from_parquet.py +++ b/src/awkward/_v2/operations/ak_metadata_from_parquet.py @@ -14,22 +14,23 @@ def metadata_from_parquet( - path, - storage_options=None, - max_gap=64_000, - max_block=256_000_000, - footer_sample_size=1_000_000, + path, storage_options=None, row_groups=None, ignore_metadata=False, scan_files=True ): """ + This function differs from ak.from_parquet._metadata as follows: + + * this function will always use a _metadata file, if present + * if there is no _metadata, the schema comes from _common_metadata or the first + data file + * the total number of rows is always known # TODO: is this true? + Args: path (str): Local filename or remote URL, passed to fsspec for resolution. - May contain glob patterns. - storage_options: Passed to `fsspec.parquet.open_parquet_file`. - max_gap (int): Passed to `fsspec.parquet.open_parquet_file`. - max_block (int): Passed to `fsspec.parquet.open_parquet_file`. - footer_sample_size (int): Passed to `fsspec.parquet.open_parquet_file`. + May contain glob patterns. A list of paths is also allowed, but they + must be data files, not directories. + storage_options: Passed to `fsspec`. - Returns a named tuple containing + Returns dict containing * `form`: an Awkward Form representing the low-level type of the data (use `.type` to get a high-level type), @@ -42,63 +43,43 @@ def metadata_from_parquet( See also #ak.from_parquet, #ak.to_parquet. """ + import awkward._v2._connect.pyarrow # noqa: F401 + with ak._v2._util.OperationErrorContext( "ak._v2.metadata_from_parquet", dict( path=path, storage_options=storage_options, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, ), ): return _impl( path, storage_options, - max_gap, - max_block, - footer_sample_size, + row_groups=row_groups, + ignore_metadata=ignore_metadata, + scan_files=scan_files, ) def _impl( - path, - storage_options, - max_gap, - max_block, - footer_sample_size, + path, storage_options, row_groups=None, ignore_metadata=False, scan_files=True ): - import awkward._v2._connect.pyarrow # noqa: F401 - - name = "ak._v2.from_parquet" - pyarrow_parquet = ak._v2._connect.pyarrow.import_pyarrow_parquet(name) - fsspec = ak._v2._connect.pyarrow.import_fsspec(name) - - import fsspec.parquet - - fs, _, paths = fsspec.get_fs_token_paths( - path, mode="rb", storage_options=storage_options + results = ak._v2.operations.ak_from_parquet.metadata( + path, storage_options, row_groups, None, ignore_metadata, scan_files ) - - ( - all_paths, - path_for_metadata, - ) = ak._v2.operations.ak_from_parquet._all_and_metadata_paths(path, fs, paths) - - with fsspec.parquet.open_parquet_file( - path_for_metadata, - fs=fs, - engine="pyarrow", - row_groups=[], - storage_options=storage_options, - max_gap=max_gap, - max_block=max_block, - footer_sample_size=footer_sample_size, - ) as file_for_metadata: - parquetfile_for_metadata = pyarrow_parquet.ParquetFile(file_for_metadata) - - form = ak._v2._connect.pyarrow.form_handle_arrow( - parquetfile_for_metadata.schema_arrow, pass_empty_field=True - ) - - return ParquetMetadata(form, fs, all_paths, parquetfile_for_metadata.metadata) + parquet_columns, subform, actual_paths, fs, subrg, col_counts, metadata = results + + out = { + "form": subform, + "paths": actual_paths, + "col_counts": col_counts, + "columns": parquet_columns, + } + if col_counts: + out["num_row_groups"] = len(col_counts) + out["col_counts"] = col_counts + out["num_rows"] = sum(col_counts) + else: + out["num_rows"] = None + out["num_row_groups"] = None + return out diff --git a/src/awkward/_v2/operations/ak_to_parquet.py b/src/awkward/_v2/operations/ak_to_parquet.py index ecbb7c8206..d6f9e75c36 100644 --- a/src/awkward/_v2/operations/ak_to_parquet.py +++ b/src/awkward/_v2/operations/ak_to_parquet.py @@ -1,6 +1,6 @@ # BSD 3-Clause License; see https://github.com/scikit-hep/awkward-1.0/blob/main/LICENSE -from collections.abc import Iterable, Sized, Mapping, Sequence +from collections.abc import Mapping, Sequence import numpy as np @@ -32,7 +32,40 @@ def to_parquet( parquet_compliant_nested=False, # https://issues.apache.org/jira/browse/ARROW-16348 parquet_extra_options=None, hook_after_write=None, + storage_options=None, ): + """ + + Args: + data: ak.Array or ak.Record + destination: str + list_to32: bool + string_to32: bool + bytestring_to32: bool + emptyarray_to: + categorical_as_dictionary: bool + extensionarray: bool + count_nulls: + compression: str | dict + compression_level: + row_group_size: int + data_page_size: + parquet_flavor: + parquet_version: + parquet_page_version: + parquet_metadata_statistics: + parquet_dictionary_encoding: + parquet_byte_stream_split: + parquet_coerce_timestamps: + parquet_old_int96_timestamps: + parquet_compliant_nested: + parquet_extra_options: + hook_after_write: callable + + Returns: + + ``pyarrow._parquet.FileMetaData`` instance + """ import awkward._v2._connect.pyarrow pyarrow_parquet = awkward._v2._connect.pyarrow.import_pyarrow_parquet( @@ -40,23 +73,8 @@ def to_parquet( ) fsspec = awkward._v2._connect.pyarrow.import_fsspec("ak.to_parquet") - if isinstance(data, (ak._v2.highlevel.Record, ak._v2.record.Record)): - iterator = iter([data]) - elif isinstance(data, Iterable) and not isinstance(data, Sized): - iterator = iter(data) - elif isinstance(data, Iterable): - iterator = iter([data]) - else: - raise ak._v2._util.error( - TypeError( - "'data' must be an array (one row group) or iterable of arrays (row group per array)" - ) - ) - - row_group = 0 - array = next(iterator) layout = ak._v2.operations.ak_to_layout.to_layout( - array, allow_record=True, allow_other=False + data, allow_record=True, allow_other=False ) table = ak._v2.operations.ak_to_arrow_table._impl( layout, @@ -79,7 +97,7 @@ def to_parquet( else: column_prefix = () - if isinstance(layout, ak._v2.record.Record): + if isinstance(data, ak._v2.Record): form = layout.array.form else: form = layout.form @@ -169,51 +187,48 @@ def parquet_columns(specifier, only=None): if parquet_extra_options is None: parquet_extra_options = {} - with fsspec.open(destination, "wb") as file: - with pyarrow_parquet.ParquetWriter( - destination, - table.schema, - filesystem=file.fs, - flavor=parquet_flavor, - version=parquet_version, - use_dictionary=parquet_dictionary_encoding, - compression=compression, - write_statistics=parquet_metadata_statistics, - use_deprecated_int96_timestamps=parquet_old_int96_timestamps, - compression_level=compression_level, - use_byte_stream_split=parquet_byte_stream_split, - data_page_version=parquet_page_version, - use_compliant_nested_type=parquet_compliant_nested, - data_page_size=data_page_size, - coerce_timestamps=parquet_coerce_timestamps, - **parquet_extra_options, - ) as writer: - while True: - writer.write_table(table, row_group_size=row_group_size) - if hook_after_write is not None: - hook_after_write( - row_group=row_group, - array=array, - layout=layout, - table=table, - writer=writer, - ) - - row_group += 1 - try: - array = next(iterator) - except StopIteration: - break - layout = ak._v2.operations.ak_to_layout.to_layout( - array, allow_record=True, allow_other=False - ) - table = ak._v2.operations.ak_to_arrow_table._impl( - layout, - list_to32, - string_to32, - bytestring_to32, - emptyarray_to, - categorical_as_dictionary, - extensionarray, - count_nulls, - ) + fs, destination = fsspec.core.url_to_fs(destination, **(storage_options or {})) + metalist = [] + with pyarrow_parquet.ParquetWriter( + destination, + table.schema, + filesystem=fs, + flavor=parquet_flavor, + version=parquet_version, + use_dictionary=parquet_dictionary_encoding, + compression=compression, + write_statistics=parquet_metadata_statistics, + use_deprecated_int96_timestamps=parquet_old_int96_timestamps, + compression_level=compression_level, + use_byte_stream_split=parquet_byte_stream_split, + data_page_version=parquet_page_version, + use_compliant_nested_type=parquet_compliant_nested, + data_page_size=data_page_size, + coerce_timestamps=parquet_coerce_timestamps, + metadata_collector=metalist, + **parquet_extra_options, + ) as writer: + writer.write_table(table, row_group_size=row_group_size) + if hook_after_write is not None: + hook_after_write( + array=data, + layout=layout, + table=table, + writer=writer, + ) + meta = metalist[0] + meta.set_file_path(destination.rsplit("/", 1)[-1]) + return meta + + +def write_metadata(dir_path, fs, *metas, global_metadata=True): + """Generate metadata file(s) from list of arrow metadata instances""" + assert metas + md = metas[0] + with fs.open("/".join([dir_path, "_common_metadata"]), "wb") as fil: + md.write_metadata_file(fil) + if global_metadata: + for meta in metas[1:]: + md.append_row_groups(meta) + with fs.open("/".join([dir_path, "_metadata"]), "wb") as fil: + md.write_metadata_file(fil) diff --git a/tests/v2/test_1294-to-and-from_parquet.py b/tests/v2/test_1294-to-and-from_parquet.py index 7e491f8b6a..4dace853f8 100644 --- a/tests/v2/test_1294-to-and-from_parquet.py +++ b/tests/v2/test_1294-to-and-from_parquet.py @@ -827,3 +827,96 @@ def test_unionarray(tmp_path, through, extensionarray): schema_arrow, pass_empty_field=True ) assert predicted_form == array_form + + +# Test cases +# - list of data files, scanned +# - list of data files, not scanned +# - list of directories -> exception +# - directory with _metadata, used +# - directory with _metadata, not used, files scanned +# - directory with _metadata, not used, files not scanned +# - directory without _metadata but with _common_metadata +# - directory with only data files, scanned +# - directory with only data files, not scanned + + +@pytest.fixture() +def generate_datafiles(tmp_path): + import fsspec + + fs = fsspec.filesystem("file") + data1 = ak.from_iter([[1, 2, 3], [4, 5]]) + data2 = data1 + 1 + md1 = ak._v2.to_parquet(data1, os.path.join(tmp_path, "data1.parq")) + md2 = ak._v2.to_parquet(data2, os.path.join(tmp_path, "data2.parq")) + return str(tmp_path), [md1, md2], fs + + +@pytest.fixture() +def with_common_metadata(generate_datafiles): + path, mdlist, fs = generate_datafiles + ak._v2.operations.ak_to_parquet.write_metadata( + path, fs, *mdlist, global_metadata=False + ) + return path + + +@pytest.fixture() +def with_global_metadata(generate_datafiles): + path, mdlist, fs = generate_datafiles + ak._v2.operations.ak_to_parquet.write_metadata( + path, fs, *mdlist, global_metadata=True + ) + return path + + +@pytest.fixture() +def with_corrupted_global_metadata(generate_datafiles): + path, mdlist, fs = generate_datafiles + ak._v2.operations.ak_to_parquet.write_metadata(path, fs, *mdlist) + with open(os.path.join("path", "_metadata"), "wb") as f: + f.write(b"not parquet") + return path + + +def test_defaults_global(with_global_metadata): + arr = ak._v2.metadata_from_parquet(with_global_metadata) + assert arr["num_rows"] == 4 + assert arr["col_counts"] == [2, 2] + + +def test_defaults_common(with_common_metadata): + arr = ak._v2.metadata_from_parquet(with_common_metadata) + assert arr["num_rows"] == 4 + assert arr["col_counts"] == [2, 2] + + +def test_dont_scan(with_global_metadata): + arr = ak._v2.metadata_from_parquet( + with_global_metadata, ignore_metadata=True, scan_files=False + ) + assert arr["col_counts"] is None + + +def test_cant_select(with_common_metadata): + # strictly, tow_groups=[0] could be allowed, since that file is first and may be scanned + # anyway + with pytest.raises(ValueError): + ak._v2.metadata_from_parquet( + with_common_metadata, scan_files=False, row_groups=[1] + ) + + +def test_select(with_global_metadata): + arr = ak._v2.metadata_from_parquet(with_global_metadata, row_groups=[1]) + assert arr["col_counts"] == [2] + + with pytest.raises(ValueError): + ak._v2.metadata_from_parquet(with_global_metadata, row_groups=[1, 1]) + + with pytest.raises(ValueError): + ak._v2.metadata_from_parquet(with_global_metadata, row_groups=[-1]) + + with pytest.raises(ValueError): + ak._v2.metadata_from_parquet(with_global_metadata, row_groups=[4]) diff --git a/tests/v2/test_1440-start-v2-to_parquet.py b/tests/v2/test_1440-start-v2-to_parquet.py index ad55ba105e..731cec35d6 100644 --- a/tests/v2/test_1440-start-v2-to_parquet.py +++ b/tests/v2/test_1440-start-v2-to_parquet.py @@ -1,5 +1,6 @@ # BSD 3-Clause License; see https://github.com/scikit-hep/awkward-1.0/blob/main/LICENSE +import io import os import pytest # noqa: F401 import numpy as np # noqa: F401 @@ -17,11 +18,12 @@ def parquet_round_trip(akarray, extensionarray, tmp_path): akarray2 = ak._v2.from_parquet(filename) assert to_list(akarray2) == to_list(akarray) + str_type2 = io.StringIO() + str_type = io.StringIO() if extensionarray: - print("read back") - akarray2.type.show() - print("original") - akarray.type.show() + akarray2.type.show(stream=str_type2) + akarray.type.show(stream=str_type) + assert str_type.getvalue() == str_type2.getvalue() assert akarray2.type == akarray.type