From 99212fe034b00b016840510ebf1ce1b9361a0c30 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 18:19:18 -0800 Subject: [PATCH] add basic pipelined executor Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 17 ++- .../_internal/execution/pipelined_executor.py | 126 ++++++++++++++++++ python/ray/data/tests/test_execution.py | 32 +++-- 3 files changed, 155 insertions(+), 20 deletions(-) create mode 100644 python/ray/data/_internal/execution/pipelined_executor.py diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index fbf515418543..ee96163da83d 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -14,6 +14,16 @@ from ray.data._internal.stats import DatasetStats +@ray.remote(num_returns=2) +def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata): + print("Processing", block) + import time + + time.sleep(1) + [out] = list(op.execute_one([block], {})) + return out, BlockAccessor.for_block(out).get_metadata([], None) + + def _naive_task_execute( inputs: List[RefBundle], op: OneToOneOperator ) -> List[RefBundle]: @@ -22,11 +32,6 @@ def _naive_task_execute( TODO: This should be reconciled with ComputeStrategy. """ - @ray.remote(num_returns=2) - def transform_one(block: Block) -> (Block, BlockMetadata): - [out] = list(op.execute_one([block], {})) - return out, BlockAccessor.for_block(out).get_metadata([], None) - input_blocks = [] for bundle in inputs: for block, _ in bundle.blocks: @@ -34,7 +39,7 @@ def transform_one(block: Block) -> (Block, BlockMetadata): out_blocks, out_meta = [], [] for in_b in input_blocks: - out_b, out_m = transform_one.remote(in_b) + out_b, out_m = _transform_one.remote(op, in_b) out_blocks.append(out_b) out_meta.append(out_m) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py new file mode 100644 index 000000000000..78d1535bfcea --- /dev/null +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -0,0 +1,126 @@ +from typing import Dict, List, Iterator, Optional + +import ray +from ray.data.block import Block, BlockMetadata +from ray.data._internal.execution.interfaces import ( + Executor, + RefBundle, + PhysicalOperator, + OneToOneOperator, + AllToAllOperator, + BufferOperator, +) +from ray.data._internal.execution.bulk_executor import _transform_one +from ray.data._internal.progress_bar import ProgressBar +from ray.data._internal.stats import DatasetStats +from ray.types import ObjectRef + + +class _OpState: + def __init__(self, num_inputs: int): + self.inqueues: List[List[RefBundle]] = [[] for _ in range(num_inputs)] + self.outqueue: List[RefBundle] = [] + + +# TODO: reconcile with ComputeStrategy +class _OneToOneTask: + def __init__(self, op: OneToOneOperator, state: _OpState, inputs: RefBundle): + self._op: OneToOneOperator = op + self._state: _OpState = state + self._inputs: RefBundle = inputs + self._block_ref: Optional[ObjectRef[Block]] = None + self._meta_ref: Optional[ObjectRef[BlockMetadata]] = None + + def execute(self) -> ObjectRef: + if len(self._inputs.blocks) != 1: + raise NotImplementedError("TODO: multi-block inputs") + self._block_ref, self._meta_ref = _transform_one.remote( + self._op, self._inputs.blocks[0][0] + ) + return self._meta_ref + + def completed(self): + meta = ray.get(self._meta_ref) + self._state.outqueue.append(RefBundle([(self._block_ref, meta)])) + + +class PipelinedExecutor(Executor): + def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]: + """Executes the DAG using a fully pipelined strategy. + + TODO: optimize memory usage by deleting intermediate results and marking + the `owned` field in the ref bundles correctly. + + TODO: implement order preservation. + """ + + # TODO: implement parallelism control and autoscaling strategies. + PARALLELISM_LIMIT = 2 + + # TODO: make these class members so we can unit test this. + operator_state: Dict[PhysicalOperator, _OpState] = {} + candidate_tasks: Dict[PhysicalOperator, _OneToOneTask] = {} + active_tasks: List[ObjectRef, _OneToOneTask] = {} + + # Setup the streaming topology. + def setup_state(node) -> _OpState: + if node in operator_state: + return operator_state[node] + + # Create state if it doesn't exist. + state = _OpState(len(node.input_dependencies)) + operator_state[node] = state + + # Wire up the input outqueues to this node's inqueues. + for i, parent in enumerate(node.input_dependencies): + parent_state = setup_state(parent) + state.inqueues[i] = parent_state.outqueue + + return state + + setup_state(dag) + buffer_state_change = True + + while candidate_tasks or active_tasks or buffer_state_change: + buffer_state_change = False + + # Process completed tasks. + if active_tasks: + [ref], _ = ray.wait(list(active_tasks), num_returns=1, fetch_local=True) + task = active_tasks.pop(ref) + task.completed() + + # Generate new tasks. + for op, state in operator_state.items(): + if isinstance(op, OneToOneOperator): + assert len(state.inqueues) == 1, "OneToOne takes exactly 1 input" + inqueue = state.inqueues[0] + if inqueue and op not in candidate_tasks: + candidate_tasks[op] = _OneToOneTask(op, state, inqueue.pop(0)) + elif isinstance(op, AllToAllOperator): + assert len(state.inqueues) == 1, "AllToAll takes exactly 1 input" + raise NotImplementedError + elif isinstance(op, BufferOperator): + for i, inqueue in enumerate(state.inqueues): + while inqueue: + op.add_next(state.inqueue.pop(0), input_index=i) + buffer_state_change = True + while op.has_next(): + state.outqueue.append(op.get_next()) + buffer_state_change = True + else: + assert False, "Unknown operator type: {}".format(op) + + # Yield outputs. + output = operator_state[dag] + while output.outqueue: + yield output.outqueue.pop(0) + + # Dispatch new tasks. + for op, task in list(candidate_tasks.items()): + if len(active_tasks) < PARALLELISM_LIMIT: + active_tasks[task.execute()] = task + del candidate_tasks[op] + + def get_stats() -> DatasetStats: + raise NotImplementedError diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index f4cc247a0bee..ca9edc4d78d0 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -6,6 +6,7 @@ from ray.data.block import BlockAccessor from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle from ray.data._internal.execution.bulk_executor import BulkExecutor +from ray.data._internal.execution.pipelined_executor import PipelinedExecutor from ray.data._internal.execution.operators import InputDataBuffer, MapOperator @@ -30,28 +31,31 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: for bundle in bundles: for block, _ in bundle.blocks: output.append(ray.get(block)) + print("Output", output[-1]) return output def test_basic_bulk(): executor = BulkExecutor(ExecutionOptions()) - inputs = make_ref_bundles( - [ - [1, 2, 3], - [4, 5, 6], - [7, 8, 9], - ] - ) + inputs = make_ref_bundles([[x] for x in range(10)]) o1 = InputDataBuffer(inputs) - o2 = MapOperator(lambda block: [b * 2 for b in block], o1) - o3 = MapOperator(lambda block: [b * -1 for b in block], o2) + o2 = MapOperator(lambda block: [b * -1 for b in block], o1) + o3 = MapOperator(lambda block: [b * 2 for b in block], o2) it = executor.execute(o3) output = ref_bundles_to_list(it) - expected = [ - [-2, -4, -6], - [-8, -10, -12], - [-14, -16, -18], - ] + expected = [[x * -2] for x in range(10)] + assert output == expected, (output, expected) + + +def test_basic_pipelined(): + executor = PipelinedExecutor(ExecutionOptions()) + inputs = make_ref_bundles([[x] for x in range(10)]) + o1 = InputDataBuffer(inputs) + o2 = MapOperator(lambda block: [b * -1 for b in block], o1) + o3 = MapOperator(lambda block: [b * 2 for b in block], o2) + it = executor.execute(o3) + output = ref_bundles_to_list(it) + expected = [[x * -2] for x in range(10)] assert output == expected, (output, expected)