Skip to content

Commit

Permalink
add basic pipelined executor
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Liang <[email protected]>
  • Loading branch information
ericl committed Nov 12, 2022
1 parent ed36a06 commit 99212fe
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 20 deletions.
17 changes: 11 additions & 6 deletions python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@
from ray.data._internal.stats import DatasetStats


@ray.remote(num_returns=2)
def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata):
print("Processing", block)
import time

time.sleep(1)
[out] = list(op.execute_one([block], {}))
return out, BlockAccessor.for_block(out).get_metadata([], None)


def _naive_task_execute(
inputs: List[RefBundle], op: OneToOneOperator
) -> List[RefBundle]:
Expand All @@ -22,19 +32,14 @@ def _naive_task_execute(
TODO: This should be reconciled with ComputeStrategy.
"""

@ray.remote(num_returns=2)
def transform_one(block: Block) -> (Block, BlockMetadata):
[out] = list(op.execute_one([block], {}))
return out, BlockAccessor.for_block(out).get_metadata([], None)

input_blocks = []
for bundle in inputs:
for block, _ in bundle.blocks:
input_blocks.append(block)

out_blocks, out_meta = [], []
for in_b in input_blocks:
out_b, out_m = transform_one.remote(in_b)
out_b, out_m = _transform_one.remote(op, in_b)
out_blocks.append(out_b)
out_meta.append(out_m)

Expand Down
126 changes: 126 additions & 0 deletions python/ray/data/_internal/execution/pipelined_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Dict, List, Iterator, Optional

import ray
from ray.data.block import Block, BlockMetadata
from ray.data._internal.execution.interfaces import (
Executor,
RefBundle,
PhysicalOperator,
OneToOneOperator,
AllToAllOperator,
BufferOperator,
)
from ray.data._internal.execution.bulk_executor import _transform_one
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats
from ray.types import ObjectRef


class _OpState:
def __init__(self, num_inputs: int):
self.inqueues: List[List[RefBundle]] = [[] for _ in range(num_inputs)]
self.outqueue: List[RefBundle] = []


# TODO: reconcile with ComputeStrategy
class _OneToOneTask:
def __init__(self, op: OneToOneOperator, state: _OpState, inputs: RefBundle):
self._op: OneToOneOperator = op
self._state: _OpState = state
self._inputs: RefBundle = inputs
self._block_ref: Optional[ObjectRef[Block]] = None
self._meta_ref: Optional[ObjectRef[BlockMetadata]] = None

def execute(self) -> ObjectRef:
if len(self._inputs.blocks) != 1:
raise NotImplementedError("TODO: multi-block inputs")
self._block_ref, self._meta_ref = _transform_one.remote(
self._op, self._inputs.blocks[0][0]
)
return self._meta_ref

def completed(self):
meta = ray.get(self._meta_ref)
self._state.outqueue.append(RefBundle([(self._block_ref, meta)]))


class PipelinedExecutor(Executor):
def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]:
"""Executes the DAG using a fully pipelined strategy.
TODO: optimize memory usage by deleting intermediate results and marking
the `owned` field in the ref bundles correctly.
TODO: implement order preservation.
"""

# TODO: implement parallelism control and autoscaling strategies.
PARALLELISM_LIMIT = 2

# TODO: make these class members so we can unit test this.
operator_state: Dict[PhysicalOperator, _OpState] = {}
candidate_tasks: Dict[PhysicalOperator, _OneToOneTask] = {}
active_tasks: List[ObjectRef, _OneToOneTask] = {}

# Setup the streaming topology.
def setup_state(node) -> _OpState:
if node in operator_state:
return operator_state[node]

# Create state if it doesn't exist.
state = _OpState(len(node.input_dependencies))
operator_state[node] = state

# Wire up the input outqueues to this node's inqueues.
for i, parent in enumerate(node.input_dependencies):
parent_state = setup_state(parent)
state.inqueues[i] = parent_state.outqueue

return state

setup_state(dag)
buffer_state_change = True

while candidate_tasks or active_tasks or buffer_state_change:
buffer_state_change = False

# Process completed tasks.
if active_tasks:
[ref], _ = ray.wait(list(active_tasks), num_returns=1, fetch_local=True)
task = active_tasks.pop(ref)
task.completed()

# Generate new tasks.
for op, state in operator_state.items():
if isinstance(op, OneToOneOperator):
assert len(state.inqueues) == 1, "OneToOne takes exactly 1 input"
inqueue = state.inqueues[0]
if inqueue and op not in candidate_tasks:
candidate_tasks[op] = _OneToOneTask(op, state, inqueue.pop(0))
elif isinstance(op, AllToAllOperator):
assert len(state.inqueues) == 1, "AllToAll takes exactly 1 input"
raise NotImplementedError
elif isinstance(op, BufferOperator):
for i, inqueue in enumerate(state.inqueues):
while inqueue:
op.add_next(state.inqueue.pop(0), input_index=i)
buffer_state_change = True
while op.has_next():
state.outqueue.append(op.get_next())
buffer_state_change = True
else:
assert False, "Unknown operator type: {}".format(op)

# Yield outputs.
output = operator_state[dag]
while output.outqueue:
yield output.outqueue.pop(0)

# Dispatch new tasks.
for op, task in list(candidate_tasks.items()):
if len(active_tasks) < PARALLELISM_LIMIT:
active_tasks[task.execute()] = task
del candidate_tasks[op]

def get_stats() -> DatasetStats:
raise NotImplementedError
32 changes: 18 additions & 14 deletions python/ray/data/tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ray.data.block import BlockAccessor
from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle
from ray.data._internal.execution.bulk_executor import BulkExecutor
from ray.data._internal.execution.pipelined_executor import PipelinedExecutor
from ray.data._internal.execution.operators import InputDataBuffer, MapOperator


Expand All @@ -30,28 +31,31 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]:
for bundle in bundles:
for block, _ in bundle.blocks:
output.append(ray.get(block))
print("Output", output[-1])
return output


def test_basic_bulk():
executor = BulkExecutor(ExecutionOptions())
inputs = make_ref_bundles(
[
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
]
)
inputs = make_ref_bundles([[x] for x in range(10)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(lambda block: [b * 2 for b in block], o1)
o3 = MapOperator(lambda block: [b * -1 for b in block], o2)
o2 = MapOperator(lambda block: [b * -1 for b in block], o1)
o3 = MapOperator(lambda block: [b * 2 for b in block], o2)
it = executor.execute(o3)
output = ref_bundles_to_list(it)
expected = [
[-2, -4, -6],
[-8, -10, -12],
[-14, -16, -18],
]
expected = [[x * -2] for x in range(10)]
assert output == expected, (output, expected)


def test_basic_pipelined():
executor = PipelinedExecutor(ExecutionOptions())
inputs = make_ref_bundles([[x] for x in range(10)])
o1 = InputDataBuffer(inputs)
o2 = MapOperator(lambda block: [b * -1 for b in block], o1)
o3 = MapOperator(lambda block: [b * 2 for b in block], o2)
it = executor.execute(o3)
output = ref_bundles_to_list(it)
expected = [[x * -2] for x in range(10)]
assert output == expected, (output, expected)


Expand Down

0 comments on commit 99212fe

Please sign in to comment.