Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logical plan support #18

Merged
merged 2 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
working-directory: ./python
run: |
pylint tests/**/* \
--disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code"
--disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code,redefined-outer-name"
- name: Checking type with mypy
working-directory: ./python/src
run: |
Expand Down
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ requires-python = ">=3.8"
dependencies = [
"absl-py",
"array-record",
"cloudpickle",
"numpy",
"protobuf",
"pyarrow >= 14.0.0",
Expand Down
1 change: 1 addition & 0 deletions python/src/space/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
from space.core.datasets import Dataset
from space.core.runners import LocalRunner
from space.core.schema.types import TfFeatures
from space.core.views import MaterializedView
24 changes: 22 additions & 2 deletions python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
"""Space dataset is the interface to interact with underlying storage."""

from __future__ import annotations
from typing import List
from typing import Dict, List

import pyarrow as pa
from substrait.algebra_pb2 import ReadRel, Rel

from space.core.runners import LocalRunner
from space.core.serializers.base import DictSerializer
from space.core.storage import Storage
from space.core.utils.plans import LogicalPlanBuilder
from space.core.views import View


class Dataset:
class Dataset(View):
"""Dataset is the interface to interact with Space storage."""

def __init__(self, storage: Storage):
Expand Down Expand Up @@ -54,6 +57,14 @@ def schema(self) -> pa.Schema:
"""Return the dataset schema."""
return self._storage.logical_schema

@property
def primary_keys(self) -> List[str]:
return self._storage.primary_keys

@property
def record_fields(self) -> List[str]:
return self._storage.record_fields

def serializer(self) -> DictSerializer:
"""Return a serializer (deserializer) for the dataset."""
return DictSerializer(self.schema)
Expand All @@ -71,3 +82,12 @@ def index_files(self) -> List[str]:
def snapshot_ids(self) -> List[int]:
"""A list of all alive snapshot IDs in the dataset."""
return self._storage.snapshot_ids

@property
def sources(self) -> Dict[str, Dataset]:
return {self._storage.location: self}

def to_relation(self, builder: LogicalPlanBuilder) -> Rel:
location = self._storage.location
return Rel(read=ReadRel(named_table=ReadRel.NamedTable(names=[location]),
base_schema=self._storage.metadata.schema.fields))
21 changes: 19 additions & 2 deletions python/src/space/core/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ syntax = "proto3";
package space.proto;

import "google/protobuf/timestamp.proto";
import "substrait/plan.proto";
import "substrait/type.proto";

// Record the current storage metadata path in a static local file.
Expand All @@ -32,7 +33,7 @@ message EntryPoint {
// Metadata persisting the current status of a storage, including logical
// metadata such as schema, and physical metadata persisted as a history of
// snapshots
// NEXT_ID: 7
// NEXT_ID: 8
message StorageMetadata {
// Create time of the storage.
google.protobuf.Timestamp create_time = 1;
Expand All @@ -43,8 +44,10 @@ message StorageMetadata {
// The storage type.
enum Type {
TYPE_UNSPECIFIED = 0;
// The dataset type supports fully managed storage features.
// Dataset type supports fully managed storage features.
DATASET = 1;
// Materialized view type supports synchronizing changes from sources.
MATERIALIZED_VIEW = 2;
}
Type type = 3;

Expand All @@ -56,6 +59,9 @@ message StorageMetadata {

// All alive snapshots with snapshot ID as key.
map<int64, Snapshot> snapshots = 6;

// Store the logical plan for materialized views.
LogicalPlan logical_plan = 7;
}

// The storage logical schema where user provided types are persisted instead
Expand Down Expand Up @@ -148,3 +154,14 @@ message RowBitmap {
bytes roaring_bitmap = 3;
}
}

// Store the logical plan of a transform.
// NEXT_ID: 3
message LogicalPlan {
// Stores the logical plan.
substrait.Plan logical_plan = 1;

// Registry of user defined functions.
// Key is UDF name; value is pickle file path.
map<string, string> udfs = 2;
}
49 changes: 28 additions & 21 deletions python/src/space/core/proto/metadata_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 60 additions & 5 deletions python/src/space/core/proto/metadata_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import google.protobuf.timestamp_pb2
import substrait.plan_pb2
import substrait.type_pb2
import sys
import typing
Expand Down Expand Up @@ -61,7 +62,7 @@ class StorageMetadata(google.protobuf.message.Message):
"""Metadata persisting the current status of a storage, including logical
metadata such as schema, and physical metadata persisted as a history of
snapshots
NEXT_ID: 7
NEXT_ID: 8
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand All @@ -74,14 +75,18 @@ class StorageMetadata(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
TYPE_UNSPECIFIED: StorageMetadata._Type.ValueType # 0
DATASET: StorageMetadata._Type.ValueType # 1
"""The dataset type supports fully managed storage features."""
"""Dataset type supports fully managed storage features."""
MATERIALIZED_VIEW: StorageMetadata._Type.ValueType # 2
"""Materialized view type supports synchronizing changes from sources."""

class Type(_Type, metaclass=_TypeEnumTypeWrapper):
"""The storage type."""

TYPE_UNSPECIFIED: StorageMetadata.Type.ValueType # 0
DATASET: StorageMetadata.Type.ValueType # 1
"""The dataset type supports fully managed storage features."""
"""Dataset type supports fully managed storage features."""
MATERIALIZED_VIEW: StorageMetadata.Type.ValueType # 2
"""Materialized view type supports synchronizing changes from sources."""

@typing_extensions.final
class SnapshotsEntry(google.protobuf.message.Message):
Expand All @@ -107,6 +112,7 @@ class StorageMetadata(google.protobuf.message.Message):
SCHEMA_FIELD_NUMBER: builtins.int
CURRENT_SNAPSHOT_ID_FIELD_NUMBER: builtins.int
SNAPSHOTS_FIELD_NUMBER: builtins.int
LOGICAL_PLAN_FIELD_NUMBER: builtins.int
@property
def create_time(self) -> google.protobuf.timestamp_pb2.Timestamp:
"""Create time of the storage."""
Expand All @@ -122,6 +128,9 @@ class StorageMetadata(google.protobuf.message.Message):
@property
def snapshots(self) -> google.protobuf.internal.containers.MessageMap[builtins.int, global___Snapshot]:
"""All alive snapshots with snapshot ID as key."""
@property
def logical_plan(self) -> global___LogicalPlan:
"""Store the logical plan for materialized views."""
def __init__(
self,
*,
Expand All @@ -131,9 +140,10 @@ class StorageMetadata(google.protobuf.message.Message):
schema: global___Schema | None = ...,
current_snapshot_id: builtins.int = ...,
snapshots: collections.abc.Mapping[builtins.int, global___Snapshot] | None = ...,
logical_plan: global___LogicalPlan | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "schema", b"schema"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "logical_plan", b"logical_plan", "schema", b"schema"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "logical_plan", b"logical_plan", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ...

global___StorageMetadata = StorageMetadata

Expand Down Expand Up @@ -327,3 +337,48 @@ class RowBitmap(google.protobuf.message.Message):
def WhichOneof(self, oneof_group: typing_extensions.Literal["bitmap", b"bitmap"]) -> typing_extensions.Literal["roaring_bitmap"] | None: ...

global___RowBitmap = RowBitmap

@typing_extensions.final
class LogicalPlan(google.protobuf.message.Message):
"""Store the logical plan of a transform.
NEXT_ID: 3
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

@typing_extensions.final
class UdfsEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
key: builtins.str
value: builtins.str
def __init__(
self,
*,
key: builtins.str = ...,
value: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ...

LOGICAL_PLAN_FIELD_NUMBER: builtins.int
UDFS_FIELD_NUMBER: builtins.int
@property
def logical_plan(self) -> substrait.plan_pb2.Plan:
"""Stores the logical plan."""
@property
def udfs(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
"""Registry of user defined functions.
Key is UDF name; value is pickle file path.
"""
def __init__(
self,
*,
logical_plan: substrait.plan_pb2.Plan | None = ...,
udfs: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["logical_plan", b"logical_plan"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["logical_plan", b"logical_plan", "udfs", b"udfs"]) -> None: ...

global___LogicalPlan = LogicalPlan
5 changes: 5 additions & 0 deletions python/src/space/core/schema/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ def field_name_to_id_dict(schema: pa.Schema) -> Dict[str, int]:
return {f.name: field_id(f) for f in schema}


def field_id_to_name_dict(schema: pa.Schema) -> Dict[int, str]:
"""Return a dict with field ID as key and field name as value."""
return {field_id(f): f.name for f in schema}


def field_id_to_column_id_dict(schema: pa.Schema) -> Dict[int, int]:
"""Return a dict with field ID as key and column ID as value."""
field_id_dict = field_name_to_id_dict(schema)
Expand Down
22 changes: 19 additions & 3 deletions python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def primary_keys(self) -> List[str]:
"""Return the storage primary keys."""
return list(self._metadata.schema.primary_keys)

@property
def record_fields(self) -> List[str]:
"""Return record field names."""
return list(self._metadata.schema.record_fields)

@property
def logical_schema(self) -> pa.Schema:
"""Return the user specified schema."""
Expand All @@ -89,17 +94,24 @@ def snapshot(self, snapshot_id: Optional[int] = None) -> meta.Snapshot:

raise RuntimeError(f"Snapshot {snapshot_id} is not found")

# pylint: disable=too-many-arguments
@classmethod
def create(
cls, location: str, schema: pa.Schema, primary_keys: List[str],
record_fields: List[str]) -> Storage: # pylint: disable=unused-argument
cls,
location: str,
schema: pa.Schema,
primary_keys: List[str],
record_fields: List[str],
logical_plan: Optional[meta.LogicalPlan] = None
) -> Storage: # pylint: disable=unused-argument
"""Create a new empty storage.

Args:
location: the directory path to the storage.
schema: the schema of the storage.
primary_keys: un-enforced primary keys.
record_fields: fields stored in row format (ArrayRecord).
logical_plan: logical plan of materialized view.
"""
# TODO: to verify that location is an empty directory.
# TODO: to verify primary key fields and record_fields (and types) are
Expand All @@ -122,6 +134,10 @@ def create(
current_snapshot_id=_INIT_SNAPSHOT_ID,
type=meta.StorageMetadata.DATASET)

if logical_plan is not None:
metadata.type = meta.StorageMetadata.MATERIALIZED_VIEW
metadata.logical_plan.CopyFrom(logical_plan)

new_metadata_path = paths.new_metadata_path(paths.metadata_dir(location))

snapshot = meta.Snapshot(snapshot_id=_INIT_SNAPSHOT_ID, create_time=now)
Expand Down
Loading