From 5038decfa6d1e1a9655e301f1dc8eff829326065 Mon Sep 17 00:00:00 2001 From: coufon Date: Wed, 27 Dec 2023 05:35:03 +0000 Subject: [PATCH] Support loading external Parquet files into Space --- python/src/space/core/loaders/__init__.py | 13 ++++ python/src/space/core/loaders/array_record.py | 25 ++----- python/src/space/core/loaders/parquet.py | 71 +++++++++++++++++++ python/src/space/core/loaders/utils.py | 32 +++++++++ python/src/space/core/ops/read.py | 2 +- python/src/space/core/runners.py | 28 ++++++-- python/src/space/core/schema/arrow.py | 8 +++ .../loaders/test_array_record.py} | 19 +++-- python/tests/core/loaders/test_parquet.py | 68 ++++++++++++++++++ 9 files changed, 231 insertions(+), 35 deletions(-) create mode 100644 python/src/space/core/loaders/parquet.py create mode 100644 python/src/space/core/loaders/utils.py rename python/tests/{tf/test_conversion.py => core/loaders/test_array_record.py} (84%) create mode 100644 python/tests/core/loaders/test_parquet.py diff --git a/python/src/space/core/loaders/__init__.py b/python/src/space/core/loaders/__init__.py index e69de29..5678014 100644 --- a/python/src/space/core/loaders/__init__.py +++ b/python/src/space/core/loaders/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/src/space/core/loaders/array_record.py b/python/src/space/core/loaders/array_record.py index 8d2d52e..27dbae6 100644 --- a/python/src/space/core/loaders/array_record.py +++ b/python/src/space/core/loaders/array_record.py @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # -"""Loads ArrayRecord files into Space datasets.""" +"""Load ArrayRecord files into Space datasets.""" -import os from typing import Any, Callable, Dict, List, Optional, Tuple import pyarrow as pa from typing_extensions import TypeAlias from space.core.fs.array_record import read_record_file +from space.core.loaders.utils import list_files from space.core.proto import metadata_pb2 as meta from space.core.proto import runtime_pb2 as runtime from space.core.ops import utils @@ -32,15 +32,15 @@ ArrayRecordIndexFn: TypeAlias = Callable[[Dict[str, Any]], Dict[str, Any]] -class LocalLoadArrayRecordOp(StoragePathsMixin): +class LocalArrayRecordLoadOp(StoragePathsMixin): """Load ArrayRecord files into Space without copying data.""" def __init__(self, location: str, metadata: meta.StorageMetadata, - array_record_dir: str, index_fn: ArrayRecordIndexFn): + input_dir: str, index_fn: ArrayRecordIndexFn): StoragePathsMixin.__init__(self, location) self._metadata = metadata - self._array_record_dir = array_record_dir + self._input_dir = input_dir self._index_fn = index_fn record_fields = set(self._metadata.schema.record_fields) @@ -58,17 +58,16 @@ def __init__(self, location: str, metadata: meta.StorageMetadata, self._record_field = self._record_fields[0] self._serializer = DictSerializer(logical_schema) - self._array_record_files = _list_files(array_record_dir) + self._input_files = list_files(input_dir, substr=".array_record") def write(self) -> Optional[runtime.Patch]: """Write index files to load ArrayRecord files to Space dataset.""" - # TODO: to load files in parallel. append_op = LocalAppendOp(self._location, self._metadata, record_address_input=True) total_record_bytes = 0 - for f in self._array_record_files: + for f in self._input_files: index_data, record_bytes = self._build_index_for_array_record(f) total_record_bytes += record_bytes append_op.write(index_data) @@ -99,13 +98,3 @@ def _build_index_for_array_record(self, utils.address_column(file_path, start_row=0, num_rows=len(indxes))) return index_data, record_uncompressed_bytes - - -def _list_files(array_record_dir: str) -> List[str]: - files: List[str] = [] - for f in os.listdir(array_record_dir): - full_path = os.path.join(array_record_dir, f) - if os.path.isfile(full_path) and '.array_record' in f: - files.append(full_path) - - return files diff --git a/python/src/space/core/loaders/parquet.py b/python/src/space/core/loaders/parquet.py new file mode 100644 index 0000000..b0b228c --- /dev/null +++ b/python/src/space/core/loaders/parquet.py @@ -0,0 +1,71 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Load Parquet files into Space datasets.""" + +from typing import Optional + +import pyarrow.parquet as pq + +from space.core.manifests import IndexManifestWriter +from space.core.loaders.utils import list_files +from space.core.proto import metadata_pb2 as meta +from space.core.proto import runtime_pb2 as runtime +from space.core.ops import utils +from space.core.schema import arrow +from space.core.utils.paths import StoragePathsMixin + + +class LocalParquetLoadOp(StoragePathsMixin): + """Load ArrayRecord files into Space without copying data.""" + + def __init__(self, location: str, metadata: meta.StorageMetadata, + input_dir: str): + StoragePathsMixin.__init__(self, location) + + self._metadata = metadata + self._input_dir = input_dir + + assert len(self._metadata.schema.record_fields) == 0 + + self._physical_schema = arrow.arrow_schema(self._metadata.schema.fields, + set(), + physical=True) + self._input_files = list_files(input_dir, suffix=".parquet") + + def write(self) -> Optional[runtime.Patch]: + """Write metadata files to load Parquet files to Space dataset.""" + index_manifest_writer = IndexManifestWriter( + self._metadata_dir, self._physical_schema, + self._metadata.schema.primary_keys) # type: ignore[arg-type] + patch = runtime.Patch() + + for f in self._input_files: + stats = _write_index_manifest(index_manifest_writer, f) + utils.update_index_storage_stats(base=patch.storage_statistics_update, + update=stats) + + index_manifest_full_path = index_manifest_writer.finish() + if index_manifest_full_path is not None: + patch.addition.index_manifest_files.append( + self.short_path(index_manifest_full_path)) + + return patch + + +def _write_index_manifest(manifest_writer: IndexManifestWriter, + file_path: str) -> meta.StorageStatistics: + # TODO: to verify that file schemas are compatible with dataset. + metadata = pq.read_metadata(file_path) + return manifest_writer.write(file_path, metadata) diff --git a/python/src/space/core/loaders/utils.py b/python/src/space/core/loaders/utils.py new file mode 100644 index 0000000..b2e9af4 --- /dev/null +++ b/python/src/space/core/loaders/utils.py @@ -0,0 +1,32 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Utilities for loaders.""" + +import os +from typing import List, Optional + + +def list_files(directory: str, + substr: Optional[str] = None, + suffix: Optional[str] = None) -> List[str]: + """List files in a directory.""" + files: List[str] = [] + for f in os.listdir(directory): + full_path = os.path.join(directory, f) + if (os.path.isfile(full_path) and (substr is None or substr in f) + and (suffix is None or f.endswith(suffix))): + files.append(full_path) + + return files diff --git a/python/src/space/core/ops/read.py b/python/src/space/core/ops/read.py index 733f854..51f9a63 100644 --- a/python/src/space/core/ops/read.py +++ b/python/src/space/core/ops/read.py @@ -118,7 +118,7 @@ def _read_index_and_record(self, index_path: str) -> pa.Table: record_columns: List[Tuple[int, pa.Field]] = [] for column_id, field in enumerate(index_data.schema): field_id = arrow.field_id(field) - if field_id in self._index_field_ids: + if field_id in self._index_field_ids or field_id == arrow.NULL_FIELD_ID: index_column_ids.append(column_id) else: record_columns.append( diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 1435723..23f1800 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -30,7 +30,8 @@ import space.core.proto.runtime_pb2 as runtime from space.core.storage import Storage from space.core.loaders.array_record import ArrayRecordIndexFn -from space.core.loaders.array_record import LocalLoadArrayRecordOp +from space.core.loaders.array_record import LocalArrayRecordLoadOp +from space.core.loaders.parquet import LocalParquetLoadOp class BaseRunner(ABC): @@ -65,17 +66,27 @@ def append_from(self, source: Iterator[InputData]) -> runtime.JobResult: """Append data into the dataset from an iterator source.""" @abstractmethod - def append_array_record(self, array_record_dir: str, + def append_array_record(self, input_dir: str, index_fn: ArrayRecordIndexFn) -> runtime.JobResult: """Append data from ArrayRecord files without copying data. TODO: to support a pattern of files to expand. Args: - array_record_dir: the folder of ArrayRecord files. + input_dir: the folder of ArrayRecord files. index_fn: a function that build index fields from each TFDS record. """ + @abstractmethod + def append_parquet(self, input_dir: str) -> runtime.JobResult: + """Append data from Parquet files without copying data. + + TODO: to support a pattern of files to expand. + + Args: + input_dir: the folder of Parquet files. + """ + def upsert(self, data: InputData) -> runtime.JobResult: """Upsert data into the dataset. @@ -136,10 +147,15 @@ def append_from(self, source: Iterator[InputData]) -> runtime.JobResult: return self._try_commit(op.finish()) - def append_array_record(self, array_record_dir: str, + def append_array_record(self, input_dir: str, index_fn: ArrayRecordIndexFn) -> runtime.JobResult: - op = LocalLoadArrayRecordOp(self._storage.location, self._storage.metadata, - array_record_dir, index_fn) + op = LocalArrayRecordLoadOp(self._storage.location, self._storage.metadata, + input_dir, index_fn) + return self._try_commit(op.write()) + + def append_parquet(self, input_dir: str) -> runtime.JobResult: + op = LocalParquetLoadOp(self._storage.location, self._storage.metadata, + input_dir) return self._try_commit(op.write()) def _insert(self, data: InputData, diff --git a/python/src/space/core/schema/arrow.py b/python/src/space/core/schema/arrow.py index 33d1220..1bee269 100644 --- a/python/src/space/core/schema/arrow.py +++ b/python/src/space/core/schema/arrow.py @@ -25,6 +25,11 @@ from space.core.schema import utils from space.core.utils.constants import UTF_8 +# A special field ID representing unassigned field ID. +# Used for external Parquet files not created by Space that don't have field +# ID. Schema evolution is limitted for datasets containing such files. +NULL_FIELD_ID = -1 + _PARQUET_FIELD_ID_KEY = b"PARQUET:field_id" @@ -35,6 +40,9 @@ def field_metadata(field_id_: int) -> Dict[bytes, bytes]: def field_id(field: pa.Field) -> int: """Return field ID of an Arrow field.""" + if field.metadata is None or _PARQUET_FIELD_ID_KEY not in field.metadata: + return NULL_FIELD_ID + return int(field.metadata[_PARQUET_FIELD_ID_KEY]) diff --git a/python/tests/tf/test_conversion.py b/python/tests/core/loaders/test_array_record.py similarity index 84% rename from python/tests/tf/test_conversion.py rename to python/tests/core/loaders/test_array_record.py index eb6a700..c2e01f6 100644 --- a/python/tests/tf/test_conversion.py +++ b/python/tests/core/loaders/test_array_record.py @@ -25,7 +25,7 @@ from space.core.utils.uuids import uuid_ -class TestLocalLoadArrayRecordOp: +class TestLocalArrayRecordLoadOp: @pytest.fixture def tf_features(self): @@ -37,7 +37,7 @@ def tf_features(self): }) return TfFeatures(features_dict) - def test_write_tfds_to_space(self, tmp_path, tf_features): + def test_append_array_record(self, tmp_path, tf_features): schema = pa.schema([("id", pa.int64()), ("num_objects", pa.int64()), ("features", tf_features)]) ds = Dataset.create(str(tmp_path / "dataset"), @@ -58,13 +58,12 @@ def test_write_tfds_to_space(self, tmp_path, tf_features): } }] - # Make a fake TFDS dataset. - tfds_path = tmp_path / "tfds" - tfds_path.mkdir(parents=True) + # Create dummy ArrayRecord files. + input_dir = tmp_path / "array_record" + input_dir.mkdir(parents=True) _write_array_record_files( - tfds_path, [tf_features.serialize(r) for r in features_data]) + input_dir, [tf_features.serialize(r) for r in features_data]) - # Write TFDS into Space. def index_fn(record): assert len(record['features']) == 1 features = record['features'][0] @@ -74,7 +73,7 @@ def index_fn(record): } runner = ds.local() - response = runner.append_array_record(tfds_path, index_fn) + response = runner.append_array_record(input_dir, index_fn) assert response.storage_statistics_update == meta.StorageStatistics( num_rows=2, index_compressed_bytes=104, @@ -89,9 +88,9 @@ def index_fn(record): }) -def _write_array_record_files(tfds_path, records: List[bytes]): +def _write_array_record_files(input_dir, records: List[bytes]): file_path = f"{uuid_()}.array_record" - writer = ar.ArrayRecordWriter(str(tfds_path / file_path), options="") + writer = ar.ArrayRecordWriter(str(input_dir / file_path), options="") for r in records: writer.write(r) diff --git a/python/tests/core/loaders/test_parquet.py b/python/tests/core/loaders/test_parquet.py new file mode 100644 index 0000000..4c78882 --- /dev/null +++ b/python/tests/core/loaders/test_parquet.py @@ -0,0 +1,68 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyarrow as pa + +from space import Dataset +import space.core.proto.metadata_pb2 as meta +from space.core.fs.parquet import write_parquet_file + + +class TestLocalParquetLoadOp: + + def test_append_parquet(self, tmp_path): + schema = pa.schema([ + pa.field("int64", pa.int64()), + pa.field("float64", pa.float64()), + pa.field("bool", pa.bool_()), + pa.field("string", pa.string()) + ]) + ds = Dataset.create(str(tmp_path / "dataset"), + schema, + primary_keys=["int64"], + record_fields=[]) + + dummy_data = [{ + "int64": [1, 2, 3], + "float64": [0.1, 0.2, 0.3], + "bool": [True, False, False], + "string": ["a", "b", "c"] + }, { + "int64": [0, 10], + "float64": [-0.1, 100.0], + "bool": [False, False], + "string": ["A", "z"] + }] + + # Create dummy Parquet files. + input_dir = tmp_path / "parquet" + input_dir.mkdir(parents=True) + write_parquet_file(str(input_dir / "file0.parquet"), schema, + [pa.Table.from_pydict(dummy_data[0])]) + write_parquet_file(str(input_dir / "file1.parquet"), schema, + [pa.Table.from_pydict(dummy_data[1])]) + + runner = ds.local() + response = runner.append_parquet(input_dir) + assert response.storage_statistics_update == meta.StorageStatistics( + num_rows=5, + index_compressed_bytes=214, + index_uncompressed_bytes=209, + record_uncompressed_bytes=0) + + index_data = pa.concat_tables( + (list(runner.read()))).combine_chunks().sort_by("int64") + assert index_data == pa.concat_tables([ + pa.Table.from_pydict(d) for d in dummy_data + ]).combine_chunks().sort_by("int64")