Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] New executor API prototype #30222

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
72 changes: 72 additions & 0 deletions python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Dict, List, Iterator

import ray
from ray.data._internal.execution.interfaces import (
Executor,
RefBundle,
PhysicalOperator,
)
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats


class BulkExecutor(Executor):
def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]:
"""Synchronously executes the DAG via bottom-up recursive traversal.

TODO: optimize memory usage by deleting intermediate results and marking
the `owned` field in the ref bundles correctly.
"""

saved_outputs: Dict[PhysicalOperator, List[RefBundle]] = {}

def execute_recursive(node: PhysicalOperator) -> List[RefBundle]:
# Avoid duplicate executions.
if node in saved_outputs:
return saved_outputs[node]

# Compute dependencies.
inputs = [execute_recursive(dep) for dep in node.input_dependencies]

# Fully execute this operator.
for i, ref_bundles in enumerate(inputs):
for r in ref_bundles:
node.add_input(r, input_index=i)
node.inputs_done(i)
output = _naive_run_until_complete(node)
node.release_unused_resources()

# Cache and return output.
saved_outputs[node] = output
return output

return execute_recursive(dag)

def get_stats() -> DatasetStats:
raise NotImplementedError


def _naive_run_until_complete(node: PhysicalOperator) -> List[RefBundle]:
"""Run this operator until completion, assuming all inputs have been submitted.

Args:
node: The operator to run.

Returns:
The list of output ref bundles for the operator.
"""
output = []
tasks = node.get_tasks()
if tasks:
bar = ProgressBar(node.name, total=node.num_outputs_total())
while tasks:
[ready], remaining = ray.wait(tasks, num_returns=1, fetch_local=True)
node.notify_task_completed(ready)
tasks = node.get_tasks()
while node.has_next():
bar.update(1)
output.append(node.get_next())
bar.close()
while node.has_next():
output.append(node.get_next())
return output
222 changes: 222 additions & 0 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Iterator, Tuple, Callable

import ray
from ray.data._internal.stats import DatasetStats
from ray.data.block import Block, BlockMetadata
from ray.types import ObjectRef


@dataclass
class RefBundle:
"""A group of data block references and their metadata.

Operators take in and produce streams of RefBundles.

Most commonly an RefBundle consists of a single block object reference.
In some cases, e.g., due to block splitting, or for a SortReduce task, there may
be more than one block.

Block bundles have ownership semantics, i.e., shared_ptr vs unique_ptr. This
allows operators to know whether they can destroy blocks when they don't need
them. Destroying blocks eagerly is more efficient than waiting for Python GC /
Ray reference counting to kick in.
"""

# The num_rows / size_bytes must be known in the metadata.
blocks: List[Tuple[ObjectRef[Block], BlockMetadata]]

# Serializable extra data passed from upstream operator. This can be
# used to implement per-block behavior, for example, the last task
# for a Limit() operation truncates the block at a certain row.
input_metadata: Dict[str, Any] = field(default_factory=lambda: {})

# Whether we own the blocks (can safely destroy them).
owns_blocks: bool = False

def __post_init__(self):
for b in self.blocks:
assert isinstance(b, tuple), b
assert len(b) == 2, b
assert isinstance(b[0], ray.ObjectRef), b
assert isinstance(b[1], BlockMetadata), b
assert b[1].num_rows is not None, b
assert b[1].size_bytes is not None, b

def num_rows(self) -> int:
"""Number of rows present in this bundle."""
return sum(b[1].num_rows for b in self.blocks)

def size_bytes(self) -> int:
"""Size of the blocks of this bundle in bytes."""
return sum(b[1].size_bytes for b in self.blocks)

def destroy(self) -> None:
"""Clears the object store memory for these blocks."""
assert self.owns_blocks, "Should not destroy unowned blocks."
raise NotImplementedError


@dataclass
class ExecutionOptions:
"""Common options that should be supported by all Executor implementations."""

# Max number of in flight tasks.
parallelism_limit: Optional[int] = None

# Example: set to 1GB and executor will try to limit object store
# memory usage to 1GB.
memory_limit_bytes: Optional[int] = None

# Set this to prefer running tasks on the same node as the output
# node (node driving the execution).
locality_with_output: bool = False

# Always preserve ordering of blocks, even if using operators that
# don't require it.
preserve_order: bool = True


class PhysicalOperator:
"""Abstract class for physical operators.

An operator transforms one or more input streams of RefBundles into a single
output stream of RefBundles. There are three types of operators that Executors
must be aware of in operator DAGs.

Subclasses:
OneToOneOperator: handles one-to-one operations (e.g., map, filter)
ExchangeOperator: handles other types of operations (e.g., shuffle, union)
"""

def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]):
self._name = name
self._input_dependencies = input_dependencies
for x in input_dependencies:
assert isinstance(x, PhysicalOperator), x

@property
def name(self) -> str:
return self._name

@property
def input_dependencies(self) -> List["PhysicalOperator"]:
"""List of operators that provide inputs for this operator."""
assert hasattr(
self, "_input_dependencies"
), "PhysicalOperator.__init__() was not called."
return self._input_dependencies

def __reduce__(self):
raise ValueError("PhysicalOperator is not serializable.")

def num_outputs_total(self) -> Optional[int]:
"""Returns the total number of output bundles of this operator, if known.

This is useful for reporting progress.
"""
if len(self.input_dependencies) == 1:
return self.input_dependencies[0].num_outputs_total()
return None

def add_input(self, refs: RefBundle, input_index: int) -> None:
"""Called when an upstream result is available."""
raise NotImplementedError

def inputs_done(self, input_index: int) -> None:
"""Called when an upstream operator finishes."""
raise NotImplementedError

def has_next(self) -> bool:
"""Returns when a downstream output is available."""
raise NotImplementedError

def get_next(self) -> RefBundle:
"""Get the next downstream output."""
raise NotImplementedError

def get_tasks(self) -> List[ray.ObjectRef]:
"""Get a list of object references the executor should wait on."""
return []

def notify_task_completed(self, task: ray.ObjectRef) -> None:
"""Executor calls this when the given task is completed and local."""
raise NotImplementedError

def release_unused_resources(self) -> None:
"""Release any currently unused operator resources."""
pass


class Executor:
"""Abstract class for executors, which implement physical operator execution.

Subclasses:
BulkExecutor
PipelinedExecutor
"""

def __init__(self, options: ExecutionOptions):
"""Create the executor."""
self._options = options

def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]:
"""Start execution."""
raise NotImplementedError

def get_stats() -> DatasetStats:
"""Return stats for the execution so far."""
raise NotImplementedError


class ExchangeOperator(PhysicalOperator):
"""A streaming operator for more complex parallel transformations.

Subclasses have full control over how to buffer and transform input blocks, which
enables them to implement metadata-only stream transformations (e.g., union),
as well as all-to-all transformations (e.g., shuffle, zip).

Subclasses:
AllToAllOperator
"""

pass


class AllToAllOperator(ExchangeOperator):
"""An ExchangeOperator that doesn't execute until all inputs are available.

Used to implement all:all transformations such as sort / shuffle.

Subclasses:
SortMap
"""

def __init__(self, preprocessor: Optional[Callable] = None):
self._preprocessor = preprocessor
self._buffer = []
self._outbox = None

def add_input(self, refs: RefBundle, input_index: int) -> None:
assert input_index == 0, "AllToAll only supports one input."
self._buffer.append(refs)

def inputs_done(self, input_index: int) -> None:
# Note: blocking synchronous execution for now.
self._outbox = self.execute_all(self._buffer)

def has_next(self) -> bool:
return bool(self._outbox)

def get_next(self) -> RefBundle:
return self._outbox.pop(0)

def execute_all(self, inputs: List[RefBundle]) -> List[RefBundle]:
"""Execute distributedly from a driver process.

This is a synchronous call that blocks until the computation is completed.

Args:
inputs: List of ref bundles.
"""
raise NotImplementedError
48 changes: 48 additions & 0 deletions python/ray/data/_internal/execution/one_to_one_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Callable, TYPE_CHECKING

import ray
from ray.data._internal.execution.interfaces import (
RefBundle,
)
from ray.data.block import Block, BlockMetadata, BlockAccessor
from ray.types import ObjectRef

if TYPE_CHECKING:
from ray.data._internal.execution.operators import OneToOneOperator


@ray.remote(num_returns=2)
def _transform_one(fn: Callable, block: Block) -> (Block, BlockMetadata):
[out] = list(fn([block], {}))
return out, BlockAccessor.for_block(out).get_metadata([], None)


# TODO: handle block splitting?
class _Task:
def __init__(self, block_ref: ObjectRef):
self.block_ref = block_ref


class OneToOneOperatorState:
def __init__(self, op: "OneToOneOperator"):
self._transform_fn = op.get_transform_fn()
self._compute_strategy = op.compute_strategy()
self._ray_remote_args = op.ray_remote_args()
self.outputs = []
self.tasks = {}

def add_input(self, bundle: RefBundle) -> None:
input_blocks = []
for block, _ in bundle.blocks:
input_blocks.append(block)
for in_b in input_blocks:
out_b, out_m = _transform_one.remote(self._transform_fn, in_b)
self.tasks[out_m] = _Task(out_b)

def task_completed(self, ref: ObjectRef) -> None:
task = self.tasks.pop(ref)
block_meta = ray.get(ref)
self.outputs.append(RefBundle([(task.block_ref, block_meta)]))

def release_unused_resources(self) -> None:
pass
Loading