diff --git a/python/src/space/core/ops/insert.py b/python/src/space/core/ops/insert.py index 9d8b646..88cc674 100644 --- a/python/src/space/core/ops/insert.py +++ b/python/src/space/core/ops/insert.py @@ -81,7 +81,7 @@ def _write_arrow(self, data: pa.Table) -> Optional[runtime.Patch]: data_files = self._storage.data_files(pk_filter) mode = self._options.mode - patches: List[runtime.Patch] = [] + patches: List[Optional[runtime.Patch]] = [] if data_files.index_files: if mode == InsertOptions.Mode.INSERT: read_op = FileSetReadOp( @@ -105,14 +105,15 @@ def _write_arrow(self, data: pa.Table) -> Optional[runtime.Patch]: return utils.merge_patches(patches) -def _try_delete_data(op: BaseDeleteOp, patches: List[runtime.Patch]) -> None: +def _try_delete_data(op: BaseDeleteOp, + patches: List[Optional[runtime.Patch]]) -> None: patch = op.delete() if patch is not None: patches.append(patch) def _try_append_data(op: BaseAppendOp, data: pa.Table, - patches: List[runtime.Patch]) -> None: + patches: List[Optional[runtime.Patch]]) -> None: op.write(data) patch = op.finish() if patch is not None: diff --git a/python/src/space/core/ops/utils.py b/python/src/space/core/ops/utils.py index c0e56d4..2ae8be3 100644 --- a/python/src/space/core/ops/utils.py +++ b/python/src/space/core/ops/utils.py @@ -89,13 +89,17 @@ def primary_key_filter(primary_keys: List[str], return filter_ -def merge_patches(patches: List[runtime.Patch]) -> Optional[runtime.Patch]: +def merge_patches( + patches: List[Optional[runtime.Patch]]) -> Optional[runtime.Patch]: """Merge multiple patches into one.""" patch = runtime.Patch() stats_update = meta.StorageStatistics() empty = True for p in patches: + if p is None: + continue + if empty: empty = False diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index b251cda..cdda9ed 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -65,12 +65,25 @@ def diff(self, start_version: Union[int], """ -class BaseReadWriteRunner(BaseReadOnlyRunner): - """Abstract base runner class.""" +class StorageCommitMixin: + """Provide storage commit utilities.""" def __init__(self, storage: Storage): self._storage = storage + def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult: + if patch is not None: + self._storage.commit(patch) + + return _job_result(patch) + + +class BaseReadWriteRunner(StorageCommitMixin, BaseReadOnlyRunner): + """Abstract base runner class.""" + + def __init__(self, storage: Storage): + StorageCommitMixin.__init__(self, storage) + @abstractmethod def append(self, data: InputData) -> runtime.JobResult: """Append data into the dataset.""" @@ -124,12 +137,6 @@ def _insert(self, data: InputData, def delete(self, filter_: pc.Expression) -> runtime.JobResult: """Delete data matching the filter from the dataset.""" - def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult: - if patch is not None: - self._storage.commit(patch) - - return _job_result(patch) - class LocalRunner(BaseReadWriteRunner): """A runner that runs operations locally.""" diff --git a/python/src/space/core/views.py b/python/src/space/core/views.py index e8d7a96..218500f 100644 --- a/python/src/space/core/views.py +++ b/python/src/space/core/views.py @@ -29,7 +29,8 @@ from space.core.utils.lazy_imports_utils import ray # pylint: disable=unused-import from space.core.utils.paths import UDF_DIR, metadata_dir from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn -from space.ray.runners import RayReadOnlyRunner +from space.core.runners import LocalRunner +from space.ray.runners import RayMaterializedViewRunner, RayReadOnlyRunner if TYPE_CHECKING: from space.core.datasets import Dataset @@ -159,6 +160,17 @@ def view(self) -> View: """Return view of the materialized view.""" return self._view + def ray(self) -> RayMaterializedViewRunner: + """Return a Ray runner for the materialized view.""" + return RayMaterializedViewRunner(self) + + def local(self) -> LocalRunner: + """Get a runner that runs operations locally. + + TODO: should use a read-only local runner. + """ + return LocalRunner(self._storage) + @classmethod def create(cls, location: str, view: View, logical_plan: meta.LogicalPlan, udfs: Dict[str, UserDefinedFn]) -> MaterializedView: diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 65a81dd..31b4a32 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -21,13 +21,17 @@ import pyarrow as pa import pyarrow.compute as pc -from space.core.runners import BaseReadOnlyRunner +from space.core.runners import BaseReadOnlyRunner, StorageCommitMixin +from space.core.ops import utils +from space.core.ops.append import LocalAppendOp from space.core.ops.change_data import ChangeType, read_change_data +from space.core.ops.delete import FileSetDeleteOp +import space.core.proto.runtime_pb2 as runtime from space.core.utils.lazy_imports_utils import ray from space.core.versions.utils import version_to_snapshot_id if TYPE_CHECKING: - from space.core.views import View + from space.core.views import MaterializedView, View class RayReadOnlyRunner(BaseReadOnlyRunner): @@ -58,3 +62,41 @@ def diff(self, start_version: Union[int], processed_ray_data = self._view.process_source(data) processed_data = ray.get(processed_ray_data.to_arrow_refs()) yield change_type, pa.concat_tables(processed_data) + + +class RayMaterializedViewRunner(RayReadOnlyRunner, StorageCommitMixin): + """Ray runner for materialized views.""" + + def __init__(self, mv: MaterializedView): + RayReadOnlyRunner.__init__(self, mv.view) + StorageCommitMixin.__init__(self, mv.storage) + + def refresh(self, target_version: Union[int]) -> runtime.JobResult: + """Refresh the materialized view by synchronizing from source dataset.""" + start_snapshot_id = self._storage.metadata.current_snapshot_id + end_snapshot_id = version_to_snapshot_id(target_version) + + patches: List[Optional[runtime.Patch]] = [] + for change_type, data in self.diff(start_snapshot_id, end_snapshot_id): + if change_type == ChangeType.DELETE: + patches.append(self._process_delete(data)) + elif change_type == ChangeType.ADD: + patches.append(self._process_append(data)) + else: + raise NotImplementedError(f"Change type {change_type} not supported") + + return self._try_commit(utils.merge_patches(patches)) + + def _process_delete(self, data: pa.Table) -> Optional[runtime.Patch]: + filter_ = utils.primary_key_filter(self._storage.primary_keys, data) + if filter_ is None: + return None + + op = FileSetDeleteOp(self._storage.location, self._storage.metadata, + self._storage.data_files(filter_), filter_) + return op.delete() + + def _process_append(self, data: pa.Table) -> Optional[runtime.Patch]: + op = LocalAppendOp(self._storage.location, self._storage.metadata) + op.write(data) + return op.finish() diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index a77b293..f2308de 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -48,7 +48,7 @@ def sample_dataset(tmp_path, sample_schema): class TestRayReadOnlyRunner: - def test_diff_map_batches(self, sample_dataset): + def test_diff_map_batches(self, tmp_path, sample_dataset): # A sample UDF for testing. def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: batch["float64"] = batch["float64"] + 1 @@ -62,6 +62,7 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: input_fields=["int64", "binary"], output_schema=view_schema, output_record_fields=["binary"]) + mv = view.materialize(str(tmp_path / "mv")) ds_runner = sample_dataset.local() view_runner = view.ray() @@ -92,6 +93,19 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: # Test several changes. assert list(view_runner.diff(0, 2)) == [expected_change0, expected_change1] + # Test materialized views. + ray_runner = mv.ray() + local = mv.local() + + ray_runner.refresh(1) + assert local.read_all() == expected_change0[1] + + ray_runner.refresh(2) + assert local.read_all() == pa.Table.from_pydict({ + "int64": [1, 3], + "float64": [1.1, 1.3], + }) + def test_diff_filter(self, sample_dataset): # A sample UDF for testing. def _sample_filter_udf(row: Dict[str, Any]) -> Dict[str, Any]: