Skip to content

Commit

Permalink
Move Ray to lazy import (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhou Fang authored Jan 13, 2024
1 parent e9bf757 commit 88c2b7a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 20 deletions.
4 changes: 2 additions & 2 deletions python/src/space/core/utils/lazy_imports_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def __getattr__(self, name: str) -> Any:
)
except ImportError as exception:
if self.error_callback is not None:
self.error_callback(exception=exception,
module_name=self.module_name)
self.error_callback(exception=exception, module_name=self.module_name)
raise exception
return getattr(self.module, name)

Expand Down Expand Up @@ -193,3 +192,4 @@ def array_record_error_callback(**kwargs):

with lazy_imports():
import ray # type: ignore[import-untyped] # pylint: disable=unused-import
from ray.data.datasource import datasource as ray_datasource # pylint: disable=unused-import
34 changes: 16 additions & 18 deletions python/src/space/ray/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,39 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional, TYPE_CHECKING

from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import Datasource, Reader, ReadTask
from ray.data.datasource.datasource import WriteResult
from ray.types import ObjectRef

from space.core.ops.read import FileSetReadOp
from space.core.options import ReadOptions
import space.core.proto.metadata_pb2 as meta
import space.core.proto.runtime_pb2 as rt
from space.core.utils.lazy_imports_utils import ray, ray_datasource

if TYPE_CHECKING:
from space.core.storage import Storage


class SpaceDataSource(Datasource):
class SpaceDataSource(ray_datasource.Datasource):
"""A Ray data source for a Space dataset."""

# pylint: disable=arguments-differ,too-many-arguments
def create_reader( # type: ignore[override]
self, storage: Storage, read_options: ReadOptions) -> Reader:
self, storage: Storage,
read_options: ReadOptions) -> ray_datasource.Reader:
return _SpaceDataSourceReader(storage, read_options)

def do_write(self, blocks: List[ObjectRef[Block]],
metadata: List[BlockMetadata],
ray_remote_args: Optional[Dict[str, Any]],
location: str) -> List[ObjectRef[WriteResult]]:
def do_write(
self, blocks: List[ray.types.ObjectRef[ray.data.block.Block]],
metadata: List[ray.data.block.BlockMetadata],
ray_remote_args: Optional[Dict[str, Any]],
location: str) -> List[ray.types.ObjectRef[ray_datasource.WriteResult]]:
"""Write a Ray dataset into Space datasets."""
raise NotImplementedError("Write from a Ray dataset is not supported")

def on_write_complete( # type: ignore[override]
self, write_results: List[WriteResult]) -> None:
self, write_results: List[ray_datasource.WriteResult]) -> None:
raise NotImplementedError("Write from a Ray dataset is not supported")


class _SpaceDataSourceReader(Reader):
class _SpaceDataSourceReader(ray_datasource.Reader):

def __init__(self, storage: Storage, read_options: ReadOptions):
self._storage = storage
Expand All @@ -66,8 +64,8 @@ def estimate_inmemory_data_size(self) -> Optional[int]:
# Note: The `parallelism` which is supposed to indicate how many `ReadTask` to
# return will have no effect here, since we map each query into a `ReadTask`.
# TODO: to properly handle the error that returned list is empty.
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
read_tasks: List[ReadTask] = []
def get_read_tasks(self, parallelism: int) -> List[ray_datasource.ReadTask]:
read_tasks: List[ray_datasource.ReadTask] = []
file_set = self._storage.data_files(self._read_options.filter_,
self._read_options.snapshot_id)

Expand All @@ -77,7 +75,7 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
# The metadata about the block that we know prior to actually executing
# the read task.
# TODO: to populate the storage values.
block_metadata = BlockMetadata(
block_metadata = ray.data.block.BlockMetadata(
num_rows=1,
size_bytes=1,
schema=None,
Expand All @@ -90,7 +88,7 @@ def _read_fn(location=self._storage.location,
file_set=task_file_set):
return _read_file_set(location, metadata, file_set, self._read_options)

read_tasks.append(ReadTask(_read_fn, block_metadata))
read_tasks.append(ray_datasource.ReadTask(_read_fn, block_metadata))

return read_tasks

Expand All @@ -99,5 +97,5 @@ def _read_fn(location=self._storage.location,
# whether row group granularity is needed.
def _read_file_set(location: str, metadata: meta.StorageMetadata,
file_set: rt.FileSet,
read_options: ReadOptions) -> List[Block]:
read_options: ReadOptions) -> List[ray.data.block.Block]:
return list(FileSetReadOp(location, metadata, file_set, read_options))

0 comments on commit 88c2b7a

Please sign in to comment.