Skip to content

Commit

Permalink
Support loading external Parquet files into Space
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Dec 27, 2023
1 parent 9a87356 commit 5038dec
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 35 deletions.
13 changes: 13 additions & 0 deletions python/src/space/core/loaders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
25 changes: 7 additions & 18 deletions python/src/space/core/loaders/array_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Loads ArrayRecord files into Space datasets."""
"""Load ArrayRecord files into Space datasets."""

import os
from typing import Any, Callable, Dict, List, Optional, Tuple

import pyarrow as pa
from typing_extensions import TypeAlias

from space.core.fs.array_record import read_record_file
from space.core.loaders.utils import list_files
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as runtime
from space.core.ops import utils
Expand All @@ -32,15 +32,15 @@
ArrayRecordIndexFn: TypeAlias = Callable[[Dict[str, Any]], Dict[str, Any]]


class LocalLoadArrayRecordOp(StoragePathsMixin):
class LocalArrayRecordLoadOp(StoragePathsMixin):
"""Load ArrayRecord files into Space without copying data."""

def __init__(self, location: str, metadata: meta.StorageMetadata,
array_record_dir: str, index_fn: ArrayRecordIndexFn):
input_dir: str, index_fn: ArrayRecordIndexFn):
StoragePathsMixin.__init__(self, location)

self._metadata = metadata
self._array_record_dir = array_record_dir
self._input_dir = input_dir
self._index_fn = index_fn

record_fields = set(self._metadata.schema.record_fields)
Expand All @@ -58,17 +58,16 @@ def __init__(self, location: str, metadata: meta.StorageMetadata,
self._record_field = self._record_fields[0]

self._serializer = DictSerializer(logical_schema)
self._array_record_files = _list_files(array_record_dir)
self._input_files = list_files(input_dir, substr=".array_record")

def write(self) -> Optional[runtime.Patch]:
"""Write index files to load ArrayRecord files to Space dataset."""
# TODO: to load files in parallel.
append_op = LocalAppendOp(self._location,
self._metadata,
record_address_input=True)

total_record_bytes = 0
for f in self._array_record_files:
for f in self._input_files:
index_data, record_bytes = self._build_index_for_array_record(f)
total_record_bytes += record_bytes
append_op.write(index_data)
Expand Down Expand Up @@ -99,13 +98,3 @@ def _build_index_for_array_record(self,
utils.address_column(file_path, start_row=0, num_rows=len(indxes)))

return index_data, record_uncompressed_bytes


def _list_files(array_record_dir: str) -> List[str]:
files: List[str] = []
for f in os.listdir(array_record_dir):
full_path = os.path.join(array_record_dir, f)
if os.path.isfile(full_path) and '.array_record' in f:
files.append(full_path)

return files
71 changes: 71 additions & 0 deletions python/src/space/core/loaders/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Load Parquet files into Space datasets."""

from typing import Optional

import pyarrow.parquet as pq

from space.core.manifests import IndexManifestWriter
from space.core.loaders.utils import list_files
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as runtime
from space.core.ops import utils
from space.core.schema import arrow
from space.core.utils.paths import StoragePathsMixin


class LocalParquetLoadOp(StoragePathsMixin):
"""Load ArrayRecord files into Space without copying data."""

def __init__(self, location: str, metadata: meta.StorageMetadata,
input_dir: str):
StoragePathsMixin.__init__(self, location)

self._metadata = metadata
self._input_dir = input_dir

assert len(self._metadata.schema.record_fields) == 0

self._physical_schema = arrow.arrow_schema(self._metadata.schema.fields,
set(),
physical=True)
self._input_files = list_files(input_dir, suffix=".parquet")

def write(self) -> Optional[runtime.Patch]:
"""Write metadata files to load Parquet files to Space dataset."""
index_manifest_writer = IndexManifestWriter(
self._metadata_dir, self._physical_schema,
self._metadata.schema.primary_keys) # type: ignore[arg-type]
patch = runtime.Patch()

for f in self._input_files:
stats = _write_index_manifest(index_manifest_writer, f)
utils.update_index_storage_stats(base=patch.storage_statistics_update,
update=stats)

index_manifest_full_path = index_manifest_writer.finish()
if index_manifest_full_path is not None:
patch.addition.index_manifest_files.append(
self.short_path(index_manifest_full_path))

return patch


def _write_index_manifest(manifest_writer: IndexManifestWriter,
file_path: str) -> meta.StorageStatistics:
# TODO: to verify that file schemas are compatible with dataset.
metadata = pq.read_metadata(file_path)
return manifest_writer.write(file_path, metadata)
32 changes: 32 additions & 0 deletions python/src/space/core/loaders/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Utilities for loaders."""

import os
from typing import List, Optional


def list_files(directory: str,
substr: Optional[str] = None,
suffix: Optional[str] = None) -> List[str]:
"""List files in a directory."""
files: List[str] = []
for f in os.listdir(directory):
full_path = os.path.join(directory, f)
if (os.path.isfile(full_path) and (substr is None or substr in f)
and (suffix is None or f.endswith(suffix))):
files.append(full_path)

return files
2 changes: 1 addition & 1 deletion python/src/space/core/ops/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _read_index_and_record(self, index_path: str) -> pa.Table:
record_columns: List[Tuple[int, pa.Field]] = []
for column_id, field in enumerate(index_data.schema):
field_id = arrow.field_id(field)
if field_id in self._index_field_ids:
if field_id in self._index_field_ids or field_id == arrow.NULL_FIELD_ID:
index_column_ids.append(column_id)
else:
record_columns.append(
Expand Down
28 changes: 22 additions & 6 deletions python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import space.core.proto.runtime_pb2 as runtime
from space.core.storage import Storage
from space.core.loaders.array_record import ArrayRecordIndexFn
from space.core.loaders.array_record import LocalLoadArrayRecordOp
from space.core.loaders.array_record import LocalArrayRecordLoadOp
from space.core.loaders.parquet import LocalParquetLoadOp


class BaseRunner(ABC):
Expand Down Expand Up @@ -65,17 +66,27 @@ def append_from(self, source: Iterator[InputData]) -> runtime.JobResult:
"""Append data into the dataset from an iterator source."""

@abstractmethod
def append_array_record(self, array_record_dir: str,
def append_array_record(self, input_dir: str,
index_fn: ArrayRecordIndexFn) -> runtime.JobResult:
"""Append data from ArrayRecord files without copying data.
TODO: to support a pattern of files to expand.
Args:
array_record_dir: the folder of ArrayRecord files.
input_dir: the folder of ArrayRecord files.
index_fn: a function that build index fields from each TFDS record.
"""

@abstractmethod
def append_parquet(self, input_dir: str) -> runtime.JobResult:
"""Append data from Parquet files without copying data.
TODO: to support a pattern of files to expand.
Args:
input_dir: the folder of Parquet files.
"""

def upsert(self, data: InputData) -> runtime.JobResult:
"""Upsert data into the dataset.
Expand Down Expand Up @@ -136,10 +147,15 @@ def append_from(self, source: Iterator[InputData]) -> runtime.JobResult:

return self._try_commit(op.finish())

def append_array_record(self, array_record_dir: str,
def append_array_record(self, input_dir: str,
index_fn: ArrayRecordIndexFn) -> runtime.JobResult:
op = LocalLoadArrayRecordOp(self._storage.location, self._storage.metadata,
array_record_dir, index_fn)
op = LocalArrayRecordLoadOp(self._storage.location, self._storage.metadata,
input_dir, index_fn)
return self._try_commit(op.write())

def append_parquet(self, input_dir: str) -> runtime.JobResult:
op = LocalParquetLoadOp(self._storage.location, self._storage.metadata,
input_dir)
return self._try_commit(op.write())

def _insert(self, data: InputData,
Expand Down
8 changes: 8 additions & 0 deletions python/src/space/core/schema/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
from space.core.schema import utils
from space.core.utils.constants import UTF_8

# A special field ID representing unassigned field ID.
# Used for external Parquet files not created by Space that don't have field
# ID. Schema evolution is limitted for datasets containing such files.
NULL_FIELD_ID = -1

_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"


Expand All @@ -35,6 +40,9 @@ def field_metadata(field_id_: int) -> Dict[bytes, bytes]:

def field_id(field: pa.Field) -> int:
"""Return field ID of an Arrow field."""
if field.metadata is None or _PARQUET_FIELD_ID_KEY not in field.metadata:
return NULL_FIELD_ID

return int(field.metadata[_PARQUET_FIELD_ID_KEY])


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from space.core.utils.uuids import uuid_


class TestLocalLoadArrayRecordOp:
class TestLocalArrayRecordLoadOp:

@pytest.fixture
def tf_features(self):
Expand All @@ -37,7 +37,7 @@ def tf_features(self):
})
return TfFeatures(features_dict)

def test_write_tfds_to_space(self, tmp_path, tf_features):
def test_append_array_record(self, tmp_path, tf_features):
schema = pa.schema([("id", pa.int64()), ("num_objects", pa.int64()),
("features", tf_features)])
ds = Dataset.create(str(tmp_path / "dataset"),
Expand All @@ -58,13 +58,12 @@ def test_write_tfds_to_space(self, tmp_path, tf_features):
}
}]

# Make a fake TFDS dataset.
tfds_path = tmp_path / "tfds"
tfds_path.mkdir(parents=True)
# Create dummy ArrayRecord files.
input_dir = tmp_path / "array_record"
input_dir.mkdir(parents=True)
_write_array_record_files(
tfds_path, [tf_features.serialize(r) for r in features_data])
input_dir, [tf_features.serialize(r) for r in features_data])

# Write TFDS into Space.
def index_fn(record):
assert len(record['features']) == 1
features = record['features'][0]
Expand All @@ -74,7 +73,7 @@ def index_fn(record):
}

runner = ds.local()
response = runner.append_array_record(tfds_path, index_fn)
response = runner.append_array_record(input_dir, index_fn)
assert response.storage_statistics_update == meta.StorageStatistics(
num_rows=2,
index_compressed_bytes=104,
Expand All @@ -89,9 +88,9 @@ def index_fn(record):
})


def _write_array_record_files(tfds_path, records: List[bytes]):
def _write_array_record_files(input_dir, records: List[bytes]):
file_path = f"{uuid_()}.array_record"
writer = ar.ArrayRecordWriter(str(tfds_path / file_path), options="")
writer = ar.ArrayRecordWriter(str(input_dir / file_path), options="")
for r in records:
writer.write(r)

Expand Down
Loading

0 comments on commit 5038dec

Please sign in to comment.