Skip to content

Commit

Permalink
Add writing record fields in Append op.
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Dec 22, 2023
1 parent 67e879e commit 80ceb28
Show file tree
Hide file tree
Showing 18 changed files with 529 additions and 57 deletions.
4 changes: 4 additions & 0 deletions python/src/space/core/manifests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Manifest files writer and reader implementation."""

from space.core.manifests.index import IndexManifestWriter
from space.core.manifests.record import RecordManifestWriter
23 changes: 7 additions & 16 deletions python/src/space/core/manifests/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import pyarrow as pa
import pyarrow.parquet as pq

from space.core.manifests.utils import write_index_manifest
import space.core.proto.metadata_pb2 as meta
from space.core.schema import constants
from space.core.schema.arrow import field_id, field_id_to_column_id_dict
from space.core.utils import paths

# Manifest file fields.
_FILE_PATH_FIELD = '_FILE'
_NUM_ROWS_FIELD = '_NUM_ROWS'
_INDEX_COMPRESSED_BYTES_FIELD = '_INDEX_COMPRESSED_BYTES'
_INDEX_UNCOMPRESSED_BYTES_FIELD = '_INDEX_UNCOMPRESSED_BYTES'

Expand Down Expand Up @@ -57,7 +57,8 @@ def _manifest_schema(
"""Build the index manifest file schema, based on storage schema."""
primary_keys_ = set(primary_keys)

fields = [(_FILE_PATH_FIELD, pa.utf8()), (_NUM_ROWS_FIELD, pa.int64()),
fields = [(constants.FILE_PATH_FIELD, pa.utf8()),
(constants.NUM_ROWS_FIELD, pa.int64()),
(_INDEX_COMPRESSED_BYTES_FIELD, pa.int64()),
(_INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())]

Expand Down Expand Up @@ -209,16 +210,6 @@ def finish(self) -> Optional[str]:
if manifest_data.num_rows == 0:
return None

return _write_index_manifest(self._metadata_dir, self._manifest_schema,
manifest_data)


def _write_index_manifest(metadata_dir: str, schema: pa.Schema,
data: pa.Table) -> str:
# TODO: currently assume this file is small, so always write a single file.
file_path = paths.new_index_manifest_path(metadata_dir)
writer = pq.ParquetWriter(file_path, schema)
writer.write_table(data)

writer.close()
return file_path
file_path = paths.new_index_manifest_path(self._metadata_dir)
write_index_manifest(file_path, self._manifest_schema, manifest_data)
return file_path
68 changes: 68 additions & 0 deletions python/src/space/core/manifests/record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Record manifest files writer and reader implementation."""

from typing import List, Optional

import pyarrow as pa
import pyarrow.parquet as pq

from space.core.manifests.utils import write_index_manifest
import space.core.proto.metadata_pb2 as meta
from space.core.utils import paths
from space.core.schema import constants


def _manifest_schema() -> pa.Schema:
fields = [(constants.FILE_PATH_FIELD, pa.utf8()),
(constants.FIELD_ID_FIELD, pa.int32()),
(constants.NUM_ROWS_FIELD, pa.int64()),
(constants.UNCOMPRESSED_BYTES_FIELD, pa.int64())]
return pa.schema(fields)


class RecordManifestWriter:
'''Writer of record manifest files.'''

def __init__(self, metadata_dir: str):
self._metadata_dir = metadata_dir
self._manifest_schema = _manifest_schema()

self._file_paths: List[str] = []
self._field_ids: List[int] = []
self._num_rows: List[int] = []
self._uncompressed_bytes: List[int] = []

def write(self, file_path: str, field_id: int,
storage_statistics: meta.StorageStatistics) -> None:
self._file_paths.append(file_path)
self._field_ids.append(field_id)
self._num_rows.append(storage_statistics.num_rows)
self._uncompressed_bytes.append(
storage_statistics.record_uncompressed_bytes)

def finish(self) -> Optional[str]:
if not self._file_paths:
return None

arrays = [
self._file_paths, self._field_ids, self._num_rows,
self._uncompressed_bytes
]
manifest_table = pa.Table.from_arrays(arrays, schema=self._manifest_schema)

file_path = paths.new_record_manifest_path(self._metadata_dir)
write_index_manifest(file_path, self._manifest_schema, manifest_data)
return file_path
27 changes: 27 additions & 0 deletions python/src/space/core/manifests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Manifest utilities."""

import pyarrow.parquet as pq


def write_index_manifest(file_path: str, schema: pa.Schema,
data: pa.Table) -> str:
# TODO: currently assume this file is small, so always write a single file.
writer = pq.ParquetWriter(file_path, schema)
writer.write_table(data)

writer.close()
return file_path
114 changes: 107 additions & 7 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from typing import Optional
from typing import Dict, Optional, Tuple

import pyarrow as pa
import pyarrow.parquet as pq

from space.core.manifests.index import IndexManifestWriter
from space.core.manifests import IndexManifestWriter
from space.core.manifests import RecordManifestWriter
from space.core.ops import utils
from space.core.ops.base import BaseOp, InputData
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as runtime
from space.core.schema.arrow import arrow_schema
from space.core.schema import arrow
from space.core.utils import paths
from space.core.utils.lazy_imports_utils import array_record_module as ar
from space.core.utils.paths import StoragePaths

# TODO: to obtain the values from user provided options.
Expand All @@ -52,11 +54,25 @@ def finish(self) -> Optional[runtime.Patch]:

@dataclass
class _IndexWriterInfo:
"""Contain information of index file writer."""
"""Information of index file writer."""
writer: pq.ParquetWriter
file_path: str


@dataclass
class _RecordWriterInfo:
"""Information of record file writer."""
writer: ar.ArrayRecordWriter
file_path: str
file_id: int = 0
next_row_id: int = 0
storage_statistics: meta.StorageStatistics = None

def __post_init__(self):
if self.storage_statistics is None:
self.storage_statistics = meta.StorageStatistics()


class LocalAppendOp(BaseAppendOp, StoragePaths):
"""Append operation running locally.
Expand All @@ -70,11 +86,19 @@ def __init__(self, location: str, metadata: meta.StorageMetadata):
StoragePaths.__init__(self, location)

self._metadata = metadata
self._schema = arrow_schema(self._metadata.schema.fields)
self._schema = arrow.arrow_schema(self._metadata.schema.fields)

self._index_fields, self._record_fields = arrow.classify_fields(
self._schema,
set(self._metadata.schema.record_fields),
selected_fields=None)

# Data file writers.
self._index_writer_info: Optional[_IndexWriterInfo] = None

# Key is field name.
self._record_writers: Dict[str, _RecordWriterInfo] = {}

# Local runtime caches.
self._cached_index_data: Optional[pa.Table] = None
self._cached_index_file_bytes = 0
Expand All @@ -83,6 +107,7 @@ def __init__(self, location: str, metadata: meta.StorageMetadata):
self._index_manifest_writer = IndexManifestWriter(
self._metadata_dir, self._schema,
self._metadata.schema.primary_keys) # type: ignore[arg-type]
self._record_manifest_writer = RecordManifestWriter(self._metadata_dir)

self._patch = runtime.Patch()

Expand All @@ -98,6 +123,11 @@ def finish(self) -> Optional[runtime.Patch]:
Returns:
A patch to the storage or None if no actual storage modification happens.
"""
# Flush all cached record data.
for f in self._record_fields:
if f.name in self._record_writers:
self._finish_record_writer(f, self._record_writers[f.name])

# Flush all cached index data.
if self._cached_index_data is not None:
self._maybe_create_index_writer()
Expand All @@ -107,11 +137,17 @@ def finish(self) -> Optional[runtime.Patch]:
if self._index_writer_info is not None:
self._finish_index_writer()

# Write manifest files.
index_manifest_full_path = self._index_manifest_writer.finish()
if index_manifest_full_path is not None:
self._patch.added_index_manifest_files.append(
self.short_path(index_manifest_full_path))

record_manifest_path = self._record_manifest_writer.finish()
if record_manifest_path:
self._patch.added_record_manifest_files.append(
self.short_path(record_manifest_path))

if self._patch.storage_statistics_update.num_rows == 0:
return None

Expand All @@ -127,6 +163,20 @@ def _append_arrow(self, data: pa.Table) -> None:
index_data = data
self._maybe_create_index_writer()

index_data = data.select(arrow.field_names(self._index_fields))

# Write record fields into files.
record_addresses = [
self._write_record_column(f, data.column(f.name))
for f in self._record_fields
]

# TODO: to preserve the field order in schema.
for field_name, address_column in record_addresses:
# TODO: the field/column added must have field ID.
index_data = index_data.append_column(field_name, address_column)

# Write index fields into files.
self._cached_index_file_bytes += index_data.nbytes

if self._cached_index_data is None:
Expand Down Expand Up @@ -154,7 +204,7 @@ def _maybe_create_index_writer(self) -> None:
writer, self.short_path(full_file_path))

def _finish_index_writer(self) -> None:
"""Materialize a new index file, update metadata and stats."""
"""Materialize a new index file (Parquet), update metadata and stats."""
if self._index_writer_info is None:
return

Expand All @@ -164,8 +214,58 @@ def _finish_index_writer(self) -> None:
stats = self._index_manifest_writer.write(
self._index_writer_info.file_path,
self._index_writer_info.writer.writer.metadata)
utils.update_index_storage_statistics(
utils.update_index_storage_stats(
base=self._patch.storage_statistics_update, update=stats)

self._index_writer_info = None
self._cached_index_file_bytes = 0

def _write_record_column(
self, field: arrow.Field,
column: pa.ChunkedArray) -> Tuple[str, pa.StructArray]:
"""Write record field into files.
Returns:
A tuple (field_name, address_column).
"""
field_name = field.name

# TODO: this section needs to be locked when supporting threaded execution.
if field_name in self._record_writers:
writer_info = self._record_writers[field_name]
else:
file_path = paths.new_record_file_path(self._data_dir, field_name)
writer = ar.ArrayRecordWriter(file_path, options="")
writer_info = _RecordWriterInfo(writer, self.short_path(file_path))
self._record_writers[field_name] = writer_info

num_rows = column.length()
writer_info.storage_statistics.num_rows += num_rows
writer_info.storage_statistics.record_uncompressed_bytes += column.nbytes

for chunk in column.chunks:
for v in chunk:
writer_info.writer.write(v.as_py())

# Generate record address field values to return.
next_row_id = writer_info.next_row_id + num_rows
address_column = utils.address_column(writer_info.file_path,
writer_info.next_row_id, num_rows)
writer_info.next_row_id = next_row_id

# Materialize the file when size is over threshold.
if writer_info.storage_statistics.record_uncompressed_bytes > _MAX_ARRAY_RECORD_BYTES:
self._finish_record_writer(field, writer_info)

return field_name, address_column

def _finish_record_writer(self, field: arrow.Field,
writer_info: _RecordWriterInfo) -> None:
"""Materialize a new record file (ArrayRecord), update metadata and stats."""
writer_info.writer.close()
self._record_manifest_writer.write(writer_info.file_path, field.field_id,
writer_info.storage_statistics)
utils.update_record_stats_bytes(self._patch.storage_statistics_update,
writer_info.storage_statistics)

del self._record_writers[field.name]
20 changes: 19 additions & 1 deletion python/src/space/core/ops/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,32 @@
#
"""Utilities for operation classes."""

import pyarrow as pa

from space.core.schema import arrow
from space.core.proto import metadata_pb2 as meta


def update_index_storage_statistics(
def update_index_storage_stats(
base: meta.StorageStatistics,
update: meta.StorageStatistics,
) -> None:
"""Update index storage statistics."""
base.num_rows += update.num_rows
base.index_compressed_bytes += update.index_compressed_bytes
base.index_uncompressed_bytes += update.index_uncompressed_bytes


def update_record_stats_bytes(base: meta.StorageStatistics,
update: meta.StorageStatistics) -> None:
"""Update record storage statistics."""
base.record_uncompressed_bytes += update.record_uncompressed_bytes


def address_column(file_path: str, start_row: int,
num_rows: int) -> pa.StructArray:
"""Construct an record address column by a file path and row ID range."""
return pa.StructArray.from_arrays(
[[file_path] * num_rows,
np.arange(start_row, start_row + num_rows, dtype=np.int32)],
fields=arrow.record_address_types())
Loading

0 comments on commit 80ceb28

Please sign in to comment.