From 85f3936a7547328b30cc38608acda4a1733e9f57 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 13:47:57 -0800 Subject: [PATCH 01/25] wip Signed-off-by: Eric Liang write docs Signed-off-by: Eric Liang wip Signed-off-by: Eric Liang wip Signed-off-by: Eric Liang --- .../ray/data/_internal/execution/__init__.py | 0 .../data/_internal/execution/bulk_executor.py | 85 ++++++++ .../data/_internal/execution/interfaces.py | 193 ++++++++++++++++++ .../ray/data/_internal/execution/operators.py | 39 ++++ python/ray/data/tests/test_execution.py | 56 +++++ 5 files changed, 373 insertions(+) create mode 100644 python/ray/data/_internal/execution/__init__.py create mode 100644 python/ray/data/_internal/execution/bulk_executor.py create mode 100644 python/ray/data/_internal/execution/interfaces.py create mode 100644 python/ray/data/_internal/execution/operators.py create mode 100644 python/ray/data/tests/test_execution.py diff --git a/python/ray/data/_internal/execution/__init__.py b/python/ray/data/_internal/execution/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py new file mode 100644 index 000000000000..dd5641ed27ea --- /dev/null +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -0,0 +1,85 @@ +from typing import Dict, List, Iterator + +import ray +from ray.data.block import Block, BlockMetadata, BlockAccessor +from ray.data._internal.execution.interfaces import ( + Executor, + RefBundle, + PhysicalOperator, + OneToOneOperator, + AllToAllOperator, + BufferOperator, +) +from ray.data._internal.progress_bar import ProgressBar +from ray.data._internal.stats import DatasetStats + + +def _naive_task_execute( + inputs: List[RefBundle], op: OneToOneOperator +) -> List[RefBundle]: + @ray.remote(num_returns=2) + def transform_one(block: Block) -> (Block, BlockMetadata): + [out] = list(op.execute_one([block], {})) + return out, BlockAccessor.for_block(out).get_metadata([], None) + + input_blocks = [] + for bundle in inputs: + for block, _ in bundle.blocks: + input_blocks.append(block) + + out_blocks, out_meta = [], [] + for in_b in input_blocks: + out_b, out_m = transform_one.remote(in_b) + out_blocks.append(out_b) + out_meta.append(out_m) + + bar = ProgressBar("OneToOne", total=len(out_meta)) + out_meta = bar.fetch_until_complete(out_meta) + + return [RefBundle([(b, m)]) for b, m in zip(out_blocks, out_meta)] + + +class BulkExecutor(Executor): + def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]: + """Synchronously executes the DAG via bottom-up recursive traversal. + + TODO: optimize memory usage by deleting intermediate results and marking + the `owned` field in the ref bundles correctly. + """ + + saved_outputs: Dict[PhysicalOperator, List[RefBundle]] = {} + + def execute_recursive(node: PhysicalOperator) -> List[RefBundle]: + # Avoid duplicate executions. + if node in saved_outputs: + return saved_outputs[node] + + # Compute dependencies. + inputs = [execute_recursive(dep) for dep in node.input_dependencies] + + # Execute this node. + output = [] + if isinstance(node, OneToOneOperator): + assert len(inputs == 1), "OneToOne takes exactly 1 input stream" + _naive_task_execute(inputs[0], node) + elif isinstance(node, AllToAllOperator): + assert len(inputs == 1), "AllToAll takes exactly 1 input stream" + output = node.execute_all(inputs[0]) + elif isinstance(node, BufferOperator): + for i, ref_bundles in enumerate(inputs): + for r in ref_bundles: + node.add_input(r, input_index=i) + node.inputs_done(i) + while node.has_next(): + output.append(node.get_next()) + else: + assert False, "Unknown operator type: {}".format(node) + + # Cache and return output. + saved_outputs[node] = output + return output + + return execute_recursive(dag) + + def get_stats() -> DatasetStats: + raise NotImplementedError diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py new file mode 100644 index 000000000000..260913ff0545 --- /dev/null +++ b/python/ray/data/_internal/execution/interfaces.py @@ -0,0 +1,193 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Iterator, Tuple + +from ray.data._internal.stats import DatasetStats +from ray.data.block import Block, BlockMetadata +from ray.types import ObjectRef + + +@dataclass +class RefBundle: + """A group of data block references and their metadata. + + Operators take in and produce streams of RefBundles. + + Most commonly an RefBundle consists of a single block object reference. + In some cases, e.g., due to block splitting, or for a SortReduce task, there may + be more than one block. + + Block bundles have ownership semantics, i.e., shared_ptr vs unique_ptr. This + allows operators to know whether they can destroy blocks when they don't need + them. Destroying blocks eagerly is more efficient than waiting for Python GC / + Ray reference counting to kick in. + """ + + # The num_rows / size_bytes must be known in the metadata. + blocks: List[Tuple[ObjectRef[Block], BlockMetadata]] + + # Serializable extra data passed from upstream operator. This can be + # used to implement per-block behavior, for example, the last task + # for a Limit() operation truncates the block at a certain row. + input_metadata: Dict[str, Any] = field(default_factory=lambda: {}) + + # Whether we own the blocks (can safely destroy them). + owns_blocks: bool = False + + def destroy(self) -> None: + """Clears the object store memory for these blocks.""" + assert self.owns_blocks, "Should not destroy unowned blocks." + raise NotImplementedError + + +@dataclass +class ExecutionOptions: + """Common options that should be supported by all Executor implementations.""" + + # Example: set to 1GB and executor will try to limit object store + # memory usage to 1GB. + memory_limit_bytes: Optional[int] = None + + # Set this to prefer running tasks on the same node as the output + # node (node driving the execution). + locality_with_output: bool = False + + # Always preserve ordering of blocks, even if using operators that + # don't require it. + preserve_order: bool = True + + +class PhysicalOperator: + """Abstract class for physical operators. + + An operator transforms one or more input streams of RefBundles into a single + output stream of RefBundles. There are three types of operators that Executors + must be aware of in operator DAGs. + + Subclasses: + OneToOneOperator + AllToAllOperator + BufferOperator + """ + + def __init__(self, input_dependencies: List["PhysicalOperator"]): + self._input_dependencies = input_dependencies + + @property + def input_dependencies(self) -> List["PhysicalOperator"]: + """List of operators that provide inputs for this operator.""" + assert hasattr( + self, "_input_dependenies" + ), "PhysicalOperator.__init__() was not called." + return self._input_dependencies + + +class Executor: + """Abstract class for executors, wihch implement physical operator execution. + + Subclasses: + BulkExecutor + PipelinedExecutor + """ + + def __init__(self, options: ExecutionOptions): + """Create the executor.""" + self._options = options + + def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]: + """Start execution.""" + raise NotImplementedError + + def get_stats() -> DatasetStats: + """Return stats for the execution so far.""" + raise NotImplementedError + + +class OneToOneOperator(PhysicalOperator): + """Abstract class for operators that run on a single process. + + Used to implement 1:1 transformations. The executor will need to + wrap the operator in Ray tasks or actors for actual execution, + e.g., using TaskPoolStrategy or ActorPoolStrategy. + + Subclasses: + Read + Map + Write + SortReduce + WholeStage + """ + + def execute_one( + self, block_bundle: Iterator[Block], input_metadata: Dict[str, Any] + ) -> Iterator[Block]: + """Execute locally on a worker process. + + Args: + block_bundle: Iterator over input blocks of a RefBundle. Typically, this + will yield only a single block, unless the transformation has multiple + inputs, e.g., in the SortReduce or ZipBlocks cases. It is an iterator + instead of a list for memory efficiency. + input_metadata: Extra metadata provided from the upstream operator. + """ + raise NotImplementedError + + +class AllToAllOperator(PhysicalOperator): + """Abstract class for operators defining their entire distributed execution. + + Used to implement all:all transformations. + + This also defines a barrier between operators in the DAG. Operators + before and after an AllToAllOperator will not run concurrently. + + Subclasses: + SortMap + """ + + def __init__(self, preprocessor: Optional[OneToOneOperator] = None): + self._preprocessor = preprocessor + + def execute_all(self, inputs: List[RefBundle]) -> List[RefBundle]: + """Execute distributedly from a driver process. + + This is a synchronous call that blocks until the computation is completed. + + Args: + inputs: List of ref bundles. + """ + raise NotImplementedError + + +class BufferOperator(PhysicalOperator): + """A streaming operator that buffers blocks for downstream operators. + + For example, this can take two operators and combine their blocks + pairwise for zip, group adjacent blocks for repartition, etc. + + Buffers do not read or transform any block data; they only operate + on block metadata. + + Examples: + Zip = ZipBuffer + DoZip + Union = UnionBuffer + Repartition(False) = SplitBuf + Splitter + CombineBuf + Combiner + Cache = CacheBuffer + Limit = LimitBuffer + MaybeTruncate + RandomizeBlockOrder = RandomizeBlockOrderBuffer + """ + + def add_input(self, refs: RefBundle, input_index: int) -> None: + """Called when an upstream result is available.""" + raise NotImplementedError + + def inputs_done(self, input_index: int) -> None: + """Called when an upstream operator finishes.""" + raise NotImplementedError + + def has_next(self) -> bool: + """Returns when a downstream output is available.""" + raise NotImplementedError + + def get_next(self) -> RefBundle: + """Get the next downstream output.""" + raise NotImplementedError diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py new file mode 100644 index 000000000000..388549e682e1 --- /dev/null +++ b/python/ray/data/_internal/execution/operators.py @@ -0,0 +1,39 @@ +from typing import List, Iterator + +from ray.data.block import Block +from ray.data._internal.execution.interfaces import ( + RefBundle, + OneToOneOperator, + BufferOperator, + PhysicalOperator, +) +from ray.data._internal.compute import BlockTransform + + +class InputOperator(BufferOperator): + """Defines the input data for the operator DAG.""" + + def __init__(self, input_data: List[RefBundle]): + self._input_data = input_data + super().__init__([]) + + def has_next(self) -> bool: + return len(self._input_data) > 0 + + def get_next(self) -> RefBundle: + return self._input_data.pop(0) + + +class MapOperator(OneToOneOperator): + """Defines a simple map operation over blocks.""" + + def __init__(self, input_op: PhysicalOperator, block_transform: BlockTransform): + self._block_transform = block_transform + super().__init__([input_op]) + + def execute_one(self, block_bundle: Iterator[Block], _) -> Iterator[Block]: + def apply_transform(fn, block_bundle): + for b in block_bundle: + yield fn(b) + + return apply_transform(self._block_transform, block_bundle) diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py new file mode 100644 index 000000000000..00d31cd57c1e --- /dev/null +++ b/python/ray/data/tests/test_execution.py @@ -0,0 +1,56 @@ +import pytest + +from typing import List, Any + +import ray +from ray.data.block import BlockAccessor +from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle +from ray.data._internal.execution.bulk_executor import BulkExecutor +from ray.data._internal.execution.operators import InputOperator, MapOperator + + +def make_ref_bundles(simple_data: List[List[Any]]) -> List[RefBundle]: + output = [] + for block in simple_data: + output.append( + RefBundle( + ray.put(block), BlockAccessor.for_block(block).get_metadata([], None) + ) + ) + return output + + +def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: + output = [] + for bundle in bundles: + for block, _ in bundle.blocks: + output.extend(block) + return output + + +def test_basic_bulk(): + executor = BulkExecutor(ExecutionOptions()) + inputs = make_ref_bundles( + [ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9], + ] + ) + o1 = InputOperator(inputs) + o2 = MapOperator(lambda block: [b * 2 for b in block], o1) + o3 = MapOperator(lambda block: [b * -1 for b in block], o2) + it = executor.execute(inputs, o3) + output = ref_bundles_to_list(it) + expected = [ + [-2, -4, -6], + [-8, -10, -12], + [-14, -16, -18], + ] + assert output == expected, (output, expected) + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) From ba64aa3a863d2ae8c8f1945998d0b6ef3a7611be Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 15:12:02 -0800 Subject: [PATCH 02/25] add basic execution test Signed-off-by: Eric Liang --- .../ray/data/_internal/execution/bulk_executor.py | 8 ++++---- python/ray/data/_internal/execution/interfaces.py | 12 +++++++++++- python/ray/data/_internal/execution/operators.py | 4 ++-- python/ray/data/tests/test_execution.py | 15 ++++++++++----- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index dd5641ed27ea..8126d7c627eb 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -57,13 +57,13 @@ def execute_recursive(node: PhysicalOperator) -> List[RefBundle]: # Compute dependencies. inputs = [execute_recursive(dep) for dep in node.input_dependencies] - # Execute this node. + # Execute this operator (3 cases). output = [] if isinstance(node, OneToOneOperator): - assert len(inputs == 1), "OneToOne takes exactly 1 input stream" - _naive_task_execute(inputs[0], node) + assert len(inputs) == 1, "OneToOne takes exactly 1 input stream" + output = _naive_task_execute(inputs[0], node) elif isinstance(node, AllToAllOperator): - assert len(inputs == 1), "AllToAll takes exactly 1 input stream" + assert len(inputs) == 1, "AllToAll takes exactly 1 input stream" output = node.execute_all(inputs[0]) elif isinstance(node, BufferOperator): for i, ref_bundles in enumerate(inputs): diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 260913ff0545..025c6f82c53a 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -1,6 +1,7 @@ from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Iterator, Tuple +import ray from ray.data._internal.stats import DatasetStats from ray.data.block import Block, BlockMetadata from ray.types import ObjectRef @@ -33,6 +34,13 @@ class RefBundle: # Whether we own the blocks (can safely destroy them). owns_blocks: bool = False + def __post_init__(self): + for b in self.blocks: + assert isinstance(b, tuple), b + assert len(b) == 2, b + assert isinstance(b[0], ray.ObjectRef), b + assert isinstance(b[1], BlockMetadata), b + def destroy(self) -> None: """Clears the object store memory for these blocks.""" assert self.owns_blocks, "Should not destroy unowned blocks." @@ -71,12 +79,14 @@ class PhysicalOperator: def __init__(self, input_dependencies: List["PhysicalOperator"]): self._input_dependencies = input_dependencies + for x in input_dependencies: + assert isinstance(x, PhysicalOperator), x @property def input_dependencies(self) -> List["PhysicalOperator"]: """List of operators that provide inputs for this operator.""" assert hasattr( - self, "_input_dependenies" + self, "_input_dependencies" ), "PhysicalOperator.__init__() was not called." return self._input_dependencies diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index 388549e682e1..1478029bffa0 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -10,7 +10,7 @@ from ray.data._internal.compute import BlockTransform -class InputOperator(BufferOperator): +class InputDataBuffer(BufferOperator): """Defines the input data for the operator DAG.""" def __init__(self, input_data: List[RefBundle]): @@ -27,7 +27,7 @@ def get_next(self) -> RefBundle: class MapOperator(OneToOneOperator): """Defines a simple map operation over blocks.""" - def __init__(self, input_op: PhysicalOperator, block_transform: BlockTransform): + def __init__(self, block_transform: BlockTransform, input_op: PhysicalOperator): self._block_transform = block_transform super().__init__([input_op]) diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index 00d31cd57c1e..f4cc247a0bee 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -6,7 +6,7 @@ from ray.data.block import BlockAccessor from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle from ray.data._internal.execution.bulk_executor import BulkExecutor -from ray.data._internal.execution.operators import InputOperator, MapOperator +from ray.data._internal.execution.operators import InputDataBuffer, MapOperator def make_ref_bundles(simple_data: List[List[Any]]) -> List[RefBundle]: @@ -14,7 +14,12 @@ def make_ref_bundles(simple_data: List[List[Any]]) -> List[RefBundle]: for block in simple_data: output.append( RefBundle( - ray.put(block), BlockAccessor.for_block(block).get_metadata([], None) + [ + ( + ray.put(block), + BlockAccessor.for_block(block).get_metadata([], None), + ) + ] ) ) return output @@ -24,7 +29,7 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: output = [] for bundle in bundles: for block, _ in bundle.blocks: - output.extend(block) + output.append(ray.get(block)) return output @@ -37,10 +42,10 @@ def test_basic_bulk(): [7, 8, 9], ] ) - o1 = InputOperator(inputs) + o1 = InputDataBuffer(inputs) o2 = MapOperator(lambda block: [b * 2 for b in block], o1) o3 = MapOperator(lambda block: [b * -1 for b in block], o2) - it = executor.execute(inputs, o3) + it = executor.execute(o3) output = ref_bundles_to_list(it) expected = [ [-2, -4, -6], From cdf4adc1cde22787dbc8c355d52ac522d5d6f1b9 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 15:24:57 -0800 Subject: [PATCH 03/25] typo Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/interfaces.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 025c6f82c53a..5007f38d40b6 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -72,9 +72,12 @@ class PhysicalOperator: must be aware of in operator DAGs. Subclasses: - OneToOneOperator - AllToAllOperator - BufferOperator + OneToOneOperator: handles one-to-one operations (e.g., map) + AllToAllOperator: handles all-to-all operations (e.g., sort) + BufferOperator: handles stream manipulation operations (e.g., union) + + In summary, OneToOne and AllToAll transform the *data* of a single input stream. + BufferOperators transform the *structure* of one or more input streams. """ def __init__(self, input_dependencies: List["PhysicalOperator"]): @@ -92,7 +95,7 @@ def input_dependencies(self) -> List["PhysicalOperator"]: class Executor: - """Abstract class for executors, wihch implement physical operator execution. + """Abstract class for executors, which implement physical operator execution. Subclasses: BulkExecutor From ed36a068ce132314fa4b21126360f187485e0ce5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 15:38:58 -0800 Subject: [PATCH 04/25] doc compute strategy Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/bulk_executor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index 8126d7c627eb..fbf515418543 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -17,6 +17,11 @@ def _naive_task_execute( inputs: List[RefBundle], op: OneToOneOperator ) -> List[RefBundle]: + """Naively execute a 1:1 operation using Ray tasks. + + TODO: This should be reconciled with ComputeStrategy. + """ + @ray.remote(num_returns=2) def transform_one(block: Block) -> (Block, BlockMetadata): [out] = list(op.execute_one([block], {})) From 99212fe034b00b016840510ebf1ce1b9361a0c30 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 18:19:18 -0800 Subject: [PATCH 05/25] add basic pipelined executor Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 17 ++- .../_internal/execution/pipelined_executor.py | 126 ++++++++++++++++++ python/ray/data/tests/test_execution.py | 32 +++-- 3 files changed, 155 insertions(+), 20 deletions(-) create mode 100644 python/ray/data/_internal/execution/pipelined_executor.py diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index fbf515418543..ee96163da83d 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -14,6 +14,16 @@ from ray.data._internal.stats import DatasetStats +@ray.remote(num_returns=2) +def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata): + print("Processing", block) + import time + + time.sleep(1) + [out] = list(op.execute_one([block], {})) + return out, BlockAccessor.for_block(out).get_metadata([], None) + + def _naive_task_execute( inputs: List[RefBundle], op: OneToOneOperator ) -> List[RefBundle]: @@ -22,11 +32,6 @@ def _naive_task_execute( TODO: This should be reconciled with ComputeStrategy. """ - @ray.remote(num_returns=2) - def transform_one(block: Block) -> (Block, BlockMetadata): - [out] = list(op.execute_one([block], {})) - return out, BlockAccessor.for_block(out).get_metadata([], None) - input_blocks = [] for bundle in inputs: for block, _ in bundle.blocks: @@ -34,7 +39,7 @@ def transform_one(block: Block) -> (Block, BlockMetadata): out_blocks, out_meta = [], [] for in_b in input_blocks: - out_b, out_m = transform_one.remote(in_b) + out_b, out_m = _transform_one.remote(op, in_b) out_blocks.append(out_b) out_meta.append(out_m) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py new file mode 100644 index 000000000000..78d1535bfcea --- /dev/null +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -0,0 +1,126 @@ +from typing import Dict, List, Iterator, Optional + +import ray +from ray.data.block import Block, BlockMetadata +from ray.data._internal.execution.interfaces import ( + Executor, + RefBundle, + PhysicalOperator, + OneToOneOperator, + AllToAllOperator, + BufferOperator, +) +from ray.data._internal.execution.bulk_executor import _transform_one +from ray.data._internal.progress_bar import ProgressBar +from ray.data._internal.stats import DatasetStats +from ray.types import ObjectRef + + +class _OpState: + def __init__(self, num_inputs: int): + self.inqueues: List[List[RefBundle]] = [[] for _ in range(num_inputs)] + self.outqueue: List[RefBundle] = [] + + +# TODO: reconcile with ComputeStrategy +class _OneToOneTask: + def __init__(self, op: OneToOneOperator, state: _OpState, inputs: RefBundle): + self._op: OneToOneOperator = op + self._state: _OpState = state + self._inputs: RefBundle = inputs + self._block_ref: Optional[ObjectRef[Block]] = None + self._meta_ref: Optional[ObjectRef[BlockMetadata]] = None + + def execute(self) -> ObjectRef: + if len(self._inputs.blocks) != 1: + raise NotImplementedError("TODO: multi-block inputs") + self._block_ref, self._meta_ref = _transform_one.remote( + self._op, self._inputs.blocks[0][0] + ) + return self._meta_ref + + def completed(self): + meta = ray.get(self._meta_ref) + self._state.outqueue.append(RefBundle([(self._block_ref, meta)])) + + +class PipelinedExecutor(Executor): + def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]: + """Executes the DAG using a fully pipelined strategy. + + TODO: optimize memory usage by deleting intermediate results and marking + the `owned` field in the ref bundles correctly. + + TODO: implement order preservation. + """ + + # TODO: implement parallelism control and autoscaling strategies. + PARALLELISM_LIMIT = 2 + + # TODO: make these class members so we can unit test this. + operator_state: Dict[PhysicalOperator, _OpState] = {} + candidate_tasks: Dict[PhysicalOperator, _OneToOneTask] = {} + active_tasks: List[ObjectRef, _OneToOneTask] = {} + + # Setup the streaming topology. + def setup_state(node) -> _OpState: + if node in operator_state: + return operator_state[node] + + # Create state if it doesn't exist. + state = _OpState(len(node.input_dependencies)) + operator_state[node] = state + + # Wire up the input outqueues to this node's inqueues. + for i, parent in enumerate(node.input_dependencies): + parent_state = setup_state(parent) + state.inqueues[i] = parent_state.outqueue + + return state + + setup_state(dag) + buffer_state_change = True + + while candidate_tasks or active_tasks or buffer_state_change: + buffer_state_change = False + + # Process completed tasks. + if active_tasks: + [ref], _ = ray.wait(list(active_tasks), num_returns=1, fetch_local=True) + task = active_tasks.pop(ref) + task.completed() + + # Generate new tasks. + for op, state in operator_state.items(): + if isinstance(op, OneToOneOperator): + assert len(state.inqueues) == 1, "OneToOne takes exactly 1 input" + inqueue = state.inqueues[0] + if inqueue and op not in candidate_tasks: + candidate_tasks[op] = _OneToOneTask(op, state, inqueue.pop(0)) + elif isinstance(op, AllToAllOperator): + assert len(state.inqueues) == 1, "AllToAll takes exactly 1 input" + raise NotImplementedError + elif isinstance(op, BufferOperator): + for i, inqueue in enumerate(state.inqueues): + while inqueue: + op.add_next(state.inqueue.pop(0), input_index=i) + buffer_state_change = True + while op.has_next(): + state.outqueue.append(op.get_next()) + buffer_state_change = True + else: + assert False, "Unknown operator type: {}".format(op) + + # Yield outputs. + output = operator_state[dag] + while output.outqueue: + yield output.outqueue.pop(0) + + # Dispatch new tasks. + for op, task in list(candidate_tasks.items()): + if len(active_tasks) < PARALLELISM_LIMIT: + active_tasks[task.execute()] = task + del candidate_tasks[op] + + def get_stats() -> DatasetStats: + raise NotImplementedError diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index f4cc247a0bee..ca9edc4d78d0 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -6,6 +6,7 @@ from ray.data.block import BlockAccessor from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle from ray.data._internal.execution.bulk_executor import BulkExecutor +from ray.data._internal.execution.pipelined_executor import PipelinedExecutor from ray.data._internal.execution.operators import InputDataBuffer, MapOperator @@ -30,28 +31,31 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: for bundle in bundles: for block, _ in bundle.blocks: output.append(ray.get(block)) + print("Output", output[-1]) return output def test_basic_bulk(): executor = BulkExecutor(ExecutionOptions()) - inputs = make_ref_bundles( - [ - [1, 2, 3], - [4, 5, 6], - [7, 8, 9], - ] - ) + inputs = make_ref_bundles([[x] for x in range(10)]) o1 = InputDataBuffer(inputs) - o2 = MapOperator(lambda block: [b * 2 for b in block], o1) - o3 = MapOperator(lambda block: [b * -1 for b in block], o2) + o2 = MapOperator(lambda block: [b * -1 for b in block], o1) + o3 = MapOperator(lambda block: [b * 2 for b in block], o2) it = executor.execute(o3) output = ref_bundles_to_list(it) - expected = [ - [-2, -4, -6], - [-8, -10, -12], - [-14, -16, -18], - ] + expected = [[x * -2] for x in range(10)] + assert output == expected, (output, expected) + + +def test_basic_pipelined(): + executor = PipelinedExecutor(ExecutionOptions()) + inputs = make_ref_bundles([[x] for x in range(10)]) + o1 = InputDataBuffer(inputs) + o2 = MapOperator(lambda block: [b * -1 for b in block], o1) + o3 = MapOperator(lambda block: [b * 2 for b in block], o2) + it = executor.execute(o3) + output = ref_bundles_to_list(it) + expected = [[x * -2] for x in range(10)] assert output == expected, (output, expected) From 612896517137f93c9a726936f3b001ec7937b2a1 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 20:15:34 -0800 Subject: [PATCH 06/25] refactor to be unit testable Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 2 +- .../_internal/execution/pipelined_executor.py | 175 ++++++++++++------ python/ray/data/tests/test_execution.py | 2 +- 3 files changed, 118 insertions(+), 61 deletions(-) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index ee96163da83d..2c98bca5eec9 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -19,7 +19,7 @@ def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata) print("Processing", block) import time - time.sleep(1) + time.sleep(0.5) [out] = list(op.execute_one([block], {})) return out, BlockAccessor.for_block(out).get_metadata([], None) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index 78d1535bfcea..74067dcaafce 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -4,6 +4,7 @@ from ray.data.block import Block, BlockMetadata from ray.data._internal.execution.interfaces import ( Executor, + ExecutionOptions, RefBundle, PhysicalOperator, OneToOneOperator, @@ -15,8 +16,8 @@ from ray.data._internal.stats import DatasetStats from ray.types import ObjectRef - class _OpState: + """Execution state for a PhysicalOperator.""" def __init__(self, num_inputs: int): self.inqueues: List[List[RefBundle]] = [[] for _ in range(num_inputs)] self.outqueue: List[RefBundle] = [] @@ -24,6 +25,7 @@ def __init__(self, num_inputs: int): # TODO: reconcile with ComputeStrategy class _OneToOneTask: + """Execution state for OneToOneOperator task.""" def __init__(self, op: OneToOneOperator, state: _OpState, inputs: RefBundle): self._op: OneToOneOperator = op self._state: _OpState = state @@ -44,32 +46,65 @@ def completed(self): self._state.outqueue.append(RefBundle([(self._block_ref, meta)])) +# TODO: optimize memory usage by deleting intermediate results. +# TODO: implement order preservation. class PipelinedExecutor(Executor): - def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]: - """Executes the DAG using a fully pipelined strategy. + def __init__(self, options: ExecutionOptions): + # Operator state for the executing pipeline, populated on execution start. + self._operator_state: Dict[PhysicalOperator, _OpState] = {} + self._output_node: Optional[PhysicalOperator] = None + self._active_tasks: List[ObjectRef, _OneToOneTask] = {} + super().__init__(options) - TODO: optimize memory usage by deleting intermediate results and marking - the `owned` field in the ref bundles correctly. + def execute(self, dag: PhysicalOperator) -> Iterator[RefBundle]: + """Executes the DAG using a pipelined execution strategy. - TODO: implement order preservation. + We take an event-loop approach to scheduling. We block on the next scheduling + event using `ray.wait`, updating operator state and dispatching new tasks. """ + self._init_operator_state(dag) + i = 0 + while self._active_tasks or i == 0: + self._scheduling_loop_step() + i += 1 + output = self._operator_state[self._output_node] + while output.outqueue: + yield output.outqueue.pop(0) - # TODO: implement parallelism control and autoscaling strategies. - PARALLELISM_LIMIT = 2 + def get_stats() -> DatasetStats: + raise NotImplementedError - # TODO: make these class members so we can unit test this. - operator_state: Dict[PhysicalOperator, _OpState] = {} - candidate_tasks: Dict[PhysicalOperator, _OneToOneTask] = {} - active_tasks: List[ObjectRef, _OneToOneTask] = {} + def _scheduling_loop_step(self) -> None: + """Run one step of the pipeline scheduling loop. + + This runs a few general phases: + 1. Waiting for the next task completion using `ray.wait()`. + 2. Pushing updates through operator inqueues / outqueues. + 3. Selecting and dispatching new operator tasks. + """ + self._process_completed_tasks() + op = self._select_operator_to_run() + while op is not None: + self._dispatch_next_task(op) + op = self._select_operator_to_run() + + def _init_operator_state(self, dag: PhysicalOperator) -> None: + """Initialize operator state for the given DAG. + + This involves creating the operator state for each operator in the DAG, + registering it with this class, and wiring up the inqueues/outqueues of + dependent operator states. + """ + if self._operator_state: + raise ValueError("Cannot init operator state twice.") - # Setup the streaming topology. def setup_state(node) -> _OpState: - if node in operator_state: - return operator_state[node] + if node in self._operator_state: + return self._operator_state[node] # Create state if it doesn't exist. state = _OpState(len(node.input_dependencies)) - operator_state[node] = state + self._operator_state[node] = state # Wire up the input outqueues to this node's inqueues. for i, parent in enumerate(node.input_dependencies): @@ -79,48 +114,70 @@ def setup_state(node) -> _OpState: return state setup_state(dag) - buffer_state_change = True - - while candidate_tasks or active_tasks or buffer_state_change: - buffer_state_change = False - - # Process completed tasks. - if active_tasks: - [ref], _ = ray.wait(list(active_tasks), num_returns=1, fetch_local=True) - task = active_tasks.pop(ref) - task.completed() - - # Generate new tasks. - for op, state in operator_state.items(): - if isinstance(op, OneToOneOperator): - assert len(state.inqueues) == 1, "OneToOne takes exactly 1 input" - inqueue = state.inqueues[0] - if inqueue and op not in candidate_tasks: - candidate_tasks[op] = _OneToOneTask(op, state, inqueue.pop(0)) - elif isinstance(op, AllToAllOperator): - assert len(state.inqueues) == 1, "AllToAll takes exactly 1 input" - raise NotImplementedError - elif isinstance(op, BufferOperator): - for i, inqueue in enumerate(state.inqueues): - while inqueue: - op.add_next(state.inqueue.pop(0), input_index=i) - buffer_state_change = True - while op.has_next(): - state.outqueue.append(op.get_next()) - buffer_state_change = True - else: - assert False, "Unknown operator type: {}".format(op) - - # Yield outputs. - output = operator_state[dag] - while output.outqueue: - yield output.outqueue.pop(0) + self._output_node = dag - # Dispatch new tasks. - for op, task in list(candidate_tasks.items()): - if len(active_tasks) < PARALLELISM_LIMIT: - active_tasks[task.execute()] = task - del candidate_tasks[op] + def _process_completed_tasks(self) -> None: + """Process any newly completed tasks and update operator state. - def get_stats() -> DatasetStats: - raise NotImplementedError + This does not dispatch any new tasks, but pushes RefBundles through the + DAG topology (i.e., operator state inqueues/outqueues). + """ + if self._active_tasks: + [ref], _ = ray.wait( + list(self._active_tasks), num_returns=1, fetch_local=True + ) + task = self._active_tasks.pop(ref) + task.completed() + + for op, state in self._operator_state.items(): + if isinstance(op, BufferOperator): + for i, inqueue in enumerate(state.inqueues): + while inqueue: + op.add_next(state.inqueue.pop(0), input_index=i) + while op.has_next(): + state.outqueue.append(op.get_next()) + elif isinstance(op, AllToAllOperator): + pass + elif isinstance(op, OneToOneOperator): + pass + else: + assert False, "Unknown operator type: {}".format(op) + + def _select_operator_to_run(self) -> Optional[PhysicalOperator]: + """Select an operator to run, if possible. + + The objective of this function is to maximize the throughput of the overall + pipeline, subject to defined memory and parallelism limits. + """ + PARALLELISM_LIMIT = 2 + if len(self._active_tasks) >= PARALLELISM_LIMIT: + return None + + # TODO: pipeline scheduling and prioritization. + for op, state in self._operator_state.items(): + if isinstance(op, OneToOneOperator): + assert len(state.inqueues) == 1, "OneToOne takes exactly 1 input" + if state.inqueues[0]: + return op + elif isinstance(op, AllToAllOperator): + assert len(state.inqueues) == 1, "AllToAll takes exactly 1 input" + raise NotImplementedError + elif isinstance(op, BufferOperator): + pass + else: + assert False, "Unknown operator type: {}".format(op) + + def _dispatch_next_task(self, op: PhysicalOperator) -> None: + """Schedule the next task for the given operator. + + It is an error to call this if the given operator has no next tasks. + + Args: + op: The operator to schedule a task for. + """ + if isinstance(op, OneToOneOperator): + state = self._operator_state[op] + task = _OneToOneTask(op, state, state.inqueues[0].pop(0)) + self._active_tasks[task.execute()] = task + else: + raise NotImplementedError diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index ca9edc4d78d0..960cef8adb92 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -56,7 +56,7 @@ def test_basic_pipelined(): it = executor.execute(o3) output = ref_bundles_to_list(it) expected = [[x * -2] for x in range(10)] - assert output == expected, (output, expected) + assert sorted(output) == sorted(expected), (output, expected) if __name__ == "__main__": From ed1718d3734ad2db4c5d3e198e9d94ee7c47637f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 20:51:30 -0800 Subject: [PATCH 07/25] add progress bar Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 1 - .../data/_internal/execution/interfaces.py | 16 ++++++- .../ray/data/_internal/execution/operators.py | 10 ++-- .../_internal/execution/pipelined_executor.py | 48 ++++++++++++++++--- python/ray/data/tests/test_execution.py | 9 ++-- 5 files changed, 68 insertions(+), 16 deletions(-) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index 2c98bca5eec9..f70ac4eff04f 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -16,7 +16,6 @@ @ray.remote(num_returns=2) def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata): - print("Processing", block) import time time.sleep(0.5) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 5007f38d40b6..405a56cbe173 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -80,11 +80,16 @@ class PhysicalOperator: BufferOperators transform the *structure* of one or more input streams. """ - def __init__(self, input_dependencies: List["PhysicalOperator"]): + def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]): + self._name = name self._input_dependencies = input_dependencies for x in input_dependencies: assert isinstance(x, PhysicalOperator), x + @property + def name(self) -> str: + return self._name + @property def input_dependencies(self) -> List["PhysicalOperator"]: """List of operators that provide inputs for this operator.""" @@ -93,6 +98,15 @@ def input_dependencies(self) -> List["PhysicalOperator"]: ), "PhysicalOperator.__init__() was not called." return self._input_dependencies + def num_outputs_total(self) -> Optional[int]: + """Returns the total number of output bundles of this operator, if known. + + This is useful for reporting progress. + """ + if len(self.input_dependencies) == 1: + return self.input_dependencies[0].num_outputs_total() + return None + class Executor: """Abstract class for executors, which implement physical operator execution. diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index 1478029bffa0..2058241ac3f0 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -1,4 +1,4 @@ -from typing import List, Iterator +from typing import List, Iterator, Optional from ray.data.block import Block from ray.data._internal.execution.interfaces import ( @@ -15,7 +15,8 @@ class InputDataBuffer(BufferOperator): def __init__(self, input_data: List[RefBundle]): self._input_data = input_data - super().__init__([]) + self._num_outputs = len(input_data) + super().__init__("Input", []) def has_next(self) -> bool: return len(self._input_data) > 0 @@ -23,13 +24,16 @@ def has_next(self) -> bool: def get_next(self) -> RefBundle: return self._input_data.pop(0) + def num_outputs_total(self) -> Optional[int]: + return self._num_outputs + class MapOperator(OneToOneOperator): """Defines a simple map operation over blocks.""" def __init__(self, block_transform: BlockTransform, input_op: PhysicalOperator): self._block_transform = block_transform - super().__init__([input_op]) + super().__init__("Map", [input_op]) def execute_one(self, block_bundle: Iterator[Block], _) -> Iterator[Block]: def apply_transform(fn, block_bundle): diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index 74067dcaafce..f348cffdd060 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -12,20 +12,47 @@ BufferOperator, ) from ray.data._internal.execution.bulk_executor import _transform_one +from ray.data._internal.execution.operators import InputDataBuffer from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats from ray.types import ObjectRef + class _OpState: """Execution state for a PhysicalOperator.""" - def __init__(self, num_inputs: int): - self.inqueues: List[List[RefBundle]] = [[] for _ in range(num_inputs)] + + def __init__(self, op: PhysicalOperator): + self.inqueues: List[List[RefBundle]] = [ + [] for _ in range(len(op.input_dependencies)) + ] self.outqueue: List[RefBundle] = [] + self.op = op + self.progress_bar = None + self.num_active_tasks = 0 + + def initialize_progress_bar(self, index: int) -> None: + self.progress_bar = ProgressBar( + self.op.name, self.op.num_outputs_total(), index + ) + + def add_output(self, ref: RefBundle) -> None: + self.outqueue.append(ref) + if self.progress_bar: + self.progress_bar.update(1) + self.refresh_progress_bar() + + def refresh_progress_bar(self) -> None: + if self.progress_bar: + queued = len(self.inqueues[0]) if self.inqueues else 0 + self.progress_bar.set_description( + f"{self.op.name}: {self.num_active_tasks} active, {queued} queued" + ) # TODO: reconcile with ComputeStrategy class _OneToOneTask: """Execution state for OneToOneOperator task.""" + def __init__(self, op: OneToOneOperator, state: _OpState, inputs: RefBundle): self._op: OneToOneOperator = op self._state: _OpState = state @@ -39,11 +66,14 @@ def execute(self) -> ObjectRef: self._block_ref, self._meta_ref = _transform_one.remote( self._op, self._inputs.blocks[0][0] ) + self._state.num_active_tasks += 1 + self._state.refresh_progress_bar() return self._meta_ref def completed(self): meta = ray.get(self._meta_ref) - self._state.outqueue.append(RefBundle([(self._block_ref, meta)])) + self._state.num_active_tasks -= 1 + self._state.add_output(RefBundle([(self._block_ref, meta)])) # TODO: optimize memory usage by deleting intermediate results. @@ -76,7 +106,7 @@ def get_stats() -> DatasetStats: def _scheduling_loop_step(self) -> None: """Run one step of the pipeline scheduling loop. - + This runs a few general phases: 1. Waiting for the next task completion using `ray.wait()`. 2. Pushing updates through operator inqueues / outqueues. @@ -103,7 +133,7 @@ def setup_state(node) -> _OpState: return self._operator_state[node] # Create state if it doesn't exist. - state = _OpState(len(node.input_dependencies)) + state = _OpState(node) self._operator_state[node] = state # Wire up the input outqueues to this node's inqueues. @@ -116,6 +146,12 @@ def setup_state(node) -> _OpState: setup_state(dag) self._output_node = dag + i = 0 + for state in list(self._operator_state.values())[::-1]: + if not isinstance(state.op, InputDataBuffer): + state.initialize_progress_bar(i) + i += 1 + def _process_completed_tasks(self) -> None: """Process any newly completed tasks and update operator state. @@ -135,7 +171,7 @@ def _process_completed_tasks(self) -> None: while inqueue: op.add_next(state.inqueue.pop(0), input_index=i) while op.has_next(): - state.outqueue.append(op.get_next()) + state.add_output(op.get_next()) elif isinstance(op, AllToAllOperator): pass elif isinstance(op, OneToOneOperator): diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index 960cef8adb92..f4903dedc31e 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -31,31 +31,30 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: for bundle in bundles: for block, _ in bundle.blocks: output.append(ray.get(block)) - print("Output", output[-1]) return output def test_basic_bulk(): executor = BulkExecutor(ExecutionOptions()) - inputs = make_ref_bundles([[x] for x in range(10)]) + inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator(lambda block: [b * -1 for b in block], o1) o3 = MapOperator(lambda block: [b * 2 for b in block], o2) it = executor.execute(o3) output = ref_bundles_to_list(it) - expected = [[x * -2] for x in range(10)] + expected = [[x * -2] for x in range(20)] assert output == expected, (output, expected) def test_basic_pipelined(): executor = PipelinedExecutor(ExecutionOptions()) - inputs = make_ref_bundles([[x] for x in range(10)]) + inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator(lambda block: [b * -1 for b in block], o1) o3 = MapOperator(lambda block: [b * 2 for b in block], o2) it = executor.execute(o3) output = ref_bundles_to_list(it) - expected = [[x * -2] for x in range(10)] + expected = [[x * -2] for x in range(20)] assert sorted(output) == sorted(expected), (output, expected) From d3b61fcfa5da59afe7553fed6c68e6dab2d100c5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 21:11:12 -0800 Subject: [PATCH 08/25] add basic priority policy Signed-off-by: Eric Liang --- .../_internal/execution/pipelined_executor.py | 17 +++++++++++------ python/ray/data/tests/test_execution.py | 8 +++++--- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index f348cffdd060..7f0e2c6406e9 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -35,15 +35,17 @@ def initialize_progress_bar(self, index: int) -> None: self.op.name, self.op.num_outputs_total(), index ) + def num_queued(self) -> int: + return sum(len(q) for q in self.inqueues) + def add_output(self, ref: RefBundle) -> None: self.outqueue.append(ref) if self.progress_bar: self.progress_bar.update(1) - self.refresh_progress_bar() def refresh_progress_bar(self) -> None: if self.progress_bar: - queued = len(self.inqueues[0]) if self.inqueues else 0 + queued = self.num_queued() self.progress_bar.set_description( f"{self.op.name}: {self.num_active_tasks} active, {queued} queued" ) @@ -67,7 +69,6 @@ def execute(self) -> ObjectRef: self._op, self._inputs.blocks[0][0] ) self._state.num_active_tasks += 1 - self._state.refresh_progress_bar() return self._meta_ref def completed(self): @@ -178,6 +179,7 @@ def _process_completed_tasks(self) -> None: pass else: assert False, "Unknown operator type: {}".format(op) + state.refresh_progress_bar() def _select_operator_to_run(self) -> Optional[PhysicalOperator]: """Select an operator to run, if possible. @@ -185,12 +187,15 @@ def _select_operator_to_run(self) -> Optional[PhysicalOperator]: The objective of this function is to maximize the throughput of the overall pipeline, subject to defined memory and parallelism limits. """ - PARALLELISM_LIMIT = 2 + PARALLELISM_LIMIT = 4 if len(self._active_tasks) >= PARALLELISM_LIMIT: return None - # TODO: pipeline scheduling and prioritization. - for op, state in self._operator_state.items(): + # TODO: improve the prioritization. + pairs = list(self._operator_state.items()) + pairs.sort(key=lambda p: p[1].num_active_tasks) + + for op, state in pairs: if isinstance(op, OneToOneOperator): assert len(state.inqueues) == 1, "OneToOne takes exactly 1 input" if state.inqueues[0]: diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index f4903dedc31e..afcb0f6243f8 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -48,13 +48,15 @@ def test_basic_bulk(): def test_basic_pipelined(): executor = PipelinedExecutor(ExecutionOptions()) - inputs = make_ref_bundles([[x] for x in range(20)]) + inputs = make_ref_bundles([[x] for x in range(100)]) o1 = InputDataBuffer(inputs) o2 = MapOperator(lambda block: [b * -1 for b in block], o1) o3 = MapOperator(lambda block: [b * 2 for b in block], o2) - it = executor.execute(o3) + o4 = MapOperator(lambda block: [b * 1 for b in block], o3) + o5 = MapOperator(lambda block: [b * 1 for b in block], o4) + it = executor.execute(o5) output = ref_bundles_to_list(it) - expected = [[x * -2] for x in range(20)] + expected = [[x * -2] for x in range(100)] assert sorted(output) == sorted(expected), (output, expected) From c7f97562aaff1eb715dbbe216d764cfa46e587eb Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 21:47:11 -0800 Subject: [PATCH 09/25] remove sleep Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 3 --- .../_internal/execution/pipelined_executor.py | 6 ++++-- python/ray/data/tests/test_execution.py | 17 +++++++++++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index f70ac4eff04f..fd57a9ae557c 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -16,9 +16,6 @@ @ray.remote(num_returns=2) def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata): - import time - - time.sleep(0.5) [out] = list(op.execute_one([block], {})) return out, BlockAccessor.for_block(out).get_metadata([], None) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index 7f0e2c6406e9..e2f07a5fa742 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -29,6 +29,7 @@ def __init__(self, op: PhysicalOperator): self.op = op self.progress_bar = None self.num_active_tasks = 0 + self.num_completed_tasks = 0 def initialize_progress_bar(self, index: int) -> None: self.progress_bar = ProgressBar( @@ -40,6 +41,7 @@ def num_queued(self) -> int: def add_output(self, ref: RefBundle) -> None: self.outqueue.append(ref) + self.num_completed_tasks += 1 if self.progress_bar: self.progress_bar.update(1) @@ -187,13 +189,13 @@ def _select_operator_to_run(self) -> Optional[PhysicalOperator]: The objective of this function is to maximize the throughput of the overall pipeline, subject to defined memory and parallelism limits. """ - PARALLELISM_LIMIT = 4 + PARALLELISM_LIMIT = 8 if len(self._active_tasks) >= PARALLELISM_LIMIT: return None # TODO: improve the prioritization. pairs = list(self._operator_state.items()) - pairs.sort(key=lambda p: p[1].num_active_tasks) + pairs.sort(key=lambda p: len(p[1].outqueue) + p[1].num_active_tasks) for op, state in pairs: if isinstance(op, OneToOneOperator): diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index afcb0f6243f8..17163f3f1c11 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -1,5 +1,6 @@ import pytest +import time from typing import List, Any import ray @@ -46,14 +47,22 @@ def test_basic_bulk(): assert output == expected, (output, expected) +def s(s, f): + def func(x): + time.sleep(s) + return f(x) + + return func + + def test_basic_pipelined(): executor = PipelinedExecutor(ExecutionOptions()) inputs = make_ref_bundles([[x] for x in range(100)]) o1 = InputDataBuffer(inputs) - o2 = MapOperator(lambda block: [b * -1 for b in block], o1) - o3 = MapOperator(lambda block: [b * 2 for b in block], o2) - o4 = MapOperator(lambda block: [b * 1 for b in block], o3) - o5 = MapOperator(lambda block: [b * 1 for b in block], o4) + o2 = MapOperator(s(0.5, lambda block: [b * -1 for b in block]), o1) + o3 = MapOperator(s(3, lambda block: [b * 2 for b in block]), o2) + o4 = MapOperator(s(0.5, lambda block: [b * 1 for b in block]), o3) + o5 = MapOperator(s(2, lambda block: [b * 1 for b in block]), o4) it = executor.execute(o5) output = ref_bundles_to_list(it) expected = [[x * -2] for x in range(100)] From 7b0422f9fa725f838365aea46d0ca469f780da5d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Nov 2022 23:30:18 -0800 Subject: [PATCH 10/25] cache desc Signed-off-by: Eric Liang --- python/ray/data/_internal/progress_bar.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index 1e6a9d329ce4..7f7b8cbf1d5d 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -44,11 +44,12 @@ class ProgressBar: """Thin wrapper around tqdm to handle soft imports.""" def __init__(self, name: str, total: int, position: int = 0): + self._desc = name if not _enabled or threading.current_thread() is not threading.main_thread(): self._bar = None elif tqdm: self._bar = tqdm.tqdm(total=total, position=position) - self._bar.set_description(name) + self._bar.set_description(self._desc) else: global needs_warning if needs_warning: @@ -83,8 +84,9 @@ def fetch_until_complete(self, refs: List[ObjectRef]) -> List[Any]: return [ref_to_result[ref] for ref in refs] def set_description(self, name: str) -> None: - if self._bar: - self._bar.set_description(name) + if self._bar and name != self._desc: + self._desc = name + self._bar.set_description(self._desc) def update(self, i: int) -> None: if self._bar and i != 0: From 44f88ac5c3c132e675b5f93778e25b28b0729941 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 14 Nov 2022 13:48:22 -0800 Subject: [PATCH 11/25] rename buffer to exchange Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 8 +- .../data/_internal/execution/interfaces.py | 86 ++++++++++--------- .../ray/data/_internal/execution/operators.py | 4 +- .../_internal/execution/pipelined_executor.py | 12 +-- 4 files changed, 52 insertions(+), 58 deletions(-) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index fd57a9ae557c..fa0b14fc3221 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -7,8 +7,7 @@ RefBundle, PhysicalOperator, OneToOneOperator, - AllToAllOperator, - BufferOperator, + ExchangeOperator, ) from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats @@ -68,10 +67,7 @@ def execute_recursive(node: PhysicalOperator) -> List[RefBundle]: if isinstance(node, OneToOneOperator): assert len(inputs) == 1, "OneToOne takes exactly 1 input stream" output = _naive_task_execute(inputs[0], node) - elif isinstance(node, AllToAllOperator): - assert len(inputs) == 1, "AllToAll takes exactly 1 input stream" - output = node.execute_all(inputs[0]) - elif isinstance(node, BufferOperator): + elif isinstance(node, ExchangeOperator): for i, ref_bundles in enumerate(inputs): for r in ref_bundles: node.add_input(r, input_index=i) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 405a56cbe173..ef1931ba3ed8 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -72,12 +72,8 @@ class PhysicalOperator: must be aware of in operator DAGs. Subclasses: - OneToOneOperator: handles one-to-one operations (e.g., map) - AllToAllOperator: handles all-to-all operations (e.g., sort) - BufferOperator: handles stream manipulation operations (e.g., union) - - In summary, OneToOne and AllToAll transform the *data* of a single input stream. - BufferOperators transform the *structure* of one or more input streams. + OneToOneOperator: handles one-to-one operations (e.g., map, filter) + ExchangeOperator: for stream manipulation operations (e.g., shuffle, union) """ def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]): @@ -159,62 +155,70 @@ def execute_one( raise NotImplementedError -class AllToAllOperator(PhysicalOperator): - """Abstract class for operators defining their entire distributed execution. +class ExchangeOperator(PhysicalOperator): + """A streaming operator that does not map inputs 1:1 with outputs. - Used to implement all:all transformations. + For example, this can take two operators and combine their blocks + pairwise for zip, group adjacent blocks for repartition, etc. - This also defines a barrier between operators in the DAG. Operators - before and after an AllToAllOperator will not run concurrently. + Exchanges can be metadata-only (i.e., not transforming any block data), or can + also manipulate data (e.g., for sort / shuffle). Subclasses: - SortMap + AllToAllOperator """ - def __init__(self, preprocessor: Optional[OneToOneOperator] = None): - self._preprocessor = preprocessor + def add_input(self, refs: RefBundle, input_index: int) -> None: + """Called when an upstream result is available.""" + raise NotImplementedError - def execute_all(self, inputs: List[RefBundle]) -> List[RefBundle]: - """Execute distributedly from a driver process. + def inputs_done(self, input_index: int) -> None: + """Called when an upstream operator finishes.""" + raise NotImplementedError - This is a synchronous call that blocks until the computation is completed. + def has_next(self) -> bool: + """Returns when a downstream output is available.""" + raise NotImplementedError - Args: - inputs: List of ref bundles. - """ + def get_next(self) -> RefBundle: + """Get the next downstream output.""" raise NotImplementedError -class BufferOperator(PhysicalOperator): - """A streaming operator that buffers blocks for downstream operators. +class AllToAllOperator(ExchangeOperator): + """A type of ExchangeOperator that doesn't execute until all inputs are available. - For example, this can take two operators and combine their blocks - pairwise for zip, group adjacent blocks for repartition, etc. - - Buffers do not read or transform any block data; they only operate - on block metadata. + Used to implement all:all transformations such as sort / shuffle. - Examples: - Zip = ZipBuffer + DoZip - Union = UnionBuffer - Repartition(False) = SplitBuf + Splitter + CombineBuf + Combiner - Cache = CacheBuffer - Limit = LimitBuffer + MaybeTruncate - RandomizeBlockOrder = RandomizeBlockOrderBuffer + Subclasses: + SortMap """ + def __init__(self, preprocessor: Optional[OneToOneOperator] = None): + self._preprocessor = preprocessor + self._buffer = [] + self._outbox = None + def add_input(self, refs: RefBundle, input_index: int) -> None: - """Called when an upstream result is available.""" - raise NotImplementedError + assert input_index == 0, "AllToAll only supports one input." + self._buffer.append(refs) def inputs_done(self, input_index: int) -> None: - """Called when an upstream operator finishes.""" - raise NotImplementedError + # Note: blocking synchronous execution for now. + self._outbox = self.execute_all(self._buffer) def has_next(self) -> bool: - """Returns when a downstream output is available.""" - raise NotImplementedError + return bool(self._outbox) def get_next(self) -> RefBundle: - """Get the next downstream output.""" + return self._outbox.pop(0) + + def execute_all(self, inputs: List[RefBundle]) -> List[RefBundle]: + """Execute distributedly from a driver process. + + This is a synchronous call that blocks until the computation is completed. + + Args: + inputs: List of ref bundles. + """ raise NotImplementedError diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index 2058241ac3f0..af5275e61587 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -4,13 +4,13 @@ from ray.data._internal.execution.interfaces import ( RefBundle, OneToOneOperator, - BufferOperator, + ExchangeOperator, PhysicalOperator, ) from ray.data._internal.compute import BlockTransform -class InputDataBuffer(BufferOperator): +class InputDataBuffer(ExchangeOperator): """Defines the input data for the operator DAG.""" def __init__(self, input_data: List[RefBundle]): diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index e2f07a5fa742..f3686f1c3860 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -8,8 +8,7 @@ RefBundle, PhysicalOperator, OneToOneOperator, - AllToAllOperator, - BufferOperator, + ExchangeOperator, ) from ray.data._internal.execution.bulk_executor import _transform_one from ray.data._internal.execution.operators import InputDataBuffer @@ -169,14 +168,12 @@ def _process_completed_tasks(self) -> None: task.completed() for op, state in self._operator_state.items(): - if isinstance(op, BufferOperator): + if isinstance(op, ExchangeOperator): for i, inqueue in enumerate(state.inqueues): while inqueue: op.add_next(state.inqueue.pop(0), input_index=i) while op.has_next(): state.add_output(op.get_next()) - elif isinstance(op, AllToAllOperator): - pass elif isinstance(op, OneToOneOperator): pass else: @@ -202,10 +199,7 @@ def _select_operator_to_run(self) -> Optional[PhysicalOperator]: assert len(state.inqueues) == 1, "OneToOne takes exactly 1 input" if state.inqueues[0]: return op - elif isinstance(op, AllToAllOperator): - assert len(state.inqueues) == 1, "AllToAll takes exactly 1 input" - raise NotImplementedError - elif isinstance(op, BufferOperator): + elif isinstance(op, ExchangeOperator): pass else: assert False, "Unknown operator type: {}".format(op) From d25c183f847ec6a63127560391f9cbc517cdc92a Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 14 Nov 2022 13:54:42 -0800 Subject: [PATCH 12/25] update docs Signed-off-by: Eric Liang --- .../data/_internal/execution/interfaces.py | 24 +++++++++---------- python/ray/data/tests/test_execution.py | 8 +++---- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index ef1931ba3ed8..6b674f107115 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -73,7 +73,7 @@ class PhysicalOperator: Subclasses: OneToOneOperator: handles one-to-one operations (e.g., map, filter) - ExchangeOperator: for stream manipulation operations (e.g., shuffle, union) + ExchangeOperator: handles other types of operations (e.g., shuffle, union) """ def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]): @@ -126,11 +126,11 @@ def get_stats() -> DatasetStats: class OneToOneOperator(PhysicalOperator): - """Abstract class for operators that run on a single process. + """A streaming operator that maps inputs 1:1 to outputs. - Used to implement 1:1 transformations. The executor will need to - wrap the operator in Ray tasks or actors for actual execution, - e.g., using TaskPoolStrategy or ActorPoolStrategy. + Subclasses need only define a single `execute_one` method that runs in a single + process, leaving the implementation of parallel and distributed execution to the + Executor implementation. Subclasses: Read @@ -143,7 +143,7 @@ class OneToOneOperator(PhysicalOperator): def execute_one( self, block_bundle: Iterator[Block], input_metadata: Dict[str, Any] ) -> Iterator[Block]: - """Execute locally on a worker process. + """Execute a block transformation locally on a worker process. Args: block_bundle: Iterator over input blocks of a RefBundle. Typically, this @@ -156,13 +156,11 @@ def execute_one( class ExchangeOperator(PhysicalOperator): - """A streaming operator that does not map inputs 1:1 with outputs. + """A streaming operator for more complex parallel transformations. - For example, this can take two operators and combine their blocks - pairwise for zip, group adjacent blocks for repartition, etc. - - Exchanges can be metadata-only (i.e., not transforming any block data), or can - also manipulate data (e.g., for sort / shuffle). + Subclasses have full control over how to buffer and transform input blocks, which + enables them to implement metadata-only stream transformations (e.g., union), + as well as all-to-all transformations (e.g., shuffle, zip). Subclasses: AllToAllOperator @@ -186,7 +184,7 @@ def get_next(self) -> RefBundle: class AllToAllOperator(ExchangeOperator): - """A type of ExchangeOperator that doesn't execute until all inputs are available. + """An ExchangeOperator that doesn't execute until all inputs are available. Used to implement all:all transformations such as sort / shuffle. diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index 17163f3f1c11..c1450d6442e6 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -59,10 +59,10 @@ def test_basic_pipelined(): executor = PipelinedExecutor(ExecutionOptions()) inputs = make_ref_bundles([[x] for x in range(100)]) o1 = InputDataBuffer(inputs) - o2 = MapOperator(s(0.5, lambda block: [b * -1 for b in block]), o1) - o3 = MapOperator(s(3, lambda block: [b * 2 for b in block]), o2) - o4 = MapOperator(s(0.5, lambda block: [b * 1 for b in block]), o3) - o5 = MapOperator(s(2, lambda block: [b * 1 for b in block]), o4) + o2 = MapOperator(s(0.05, lambda block: [b * -1 for b in block]), o1) + o3 = MapOperator(s(0.3, lambda block: [b * 2 for b in block]), o2) + o4 = MapOperator(s(0.05, lambda block: [b * 1 for b in block]), o3) + o5 = MapOperator(s(0.2, lambda block: [b * 1 for b in block]), o4) it = executor.execute(o5) output = ref_bundles_to_list(it) expected = [[x * -2] for x in range(100)] From a4a8951da62dacfced296ce2c0826cd902999eca Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 15:47:50 -0800 Subject: [PATCH 13/25] add execution option support for pipeline execution Signed-off-by: Eric Liang --- .../data/_internal/execution/interfaces.py | 3 ++ .../ray/data/_internal/execution/operators.py | 26 +++++++++++- .../_internal/execution/pipelined_executor.py | 25 +++++++++-- python/ray/data/_internal/execution/util.py | 21 ++++++++++ python/ray/data/tests/test_execution.py | 41 ++++++++++--------- 5 files changed, 90 insertions(+), 26 deletions(-) create mode 100644 python/ray/data/_internal/execution/util.py diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 6b674f107115..76fdb40db234 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -51,6 +51,9 @@ def destroy(self) -> None: class ExecutionOptions: """Common options that should be supported by all Executor implementations.""" + # Max number of in flight tasks. + parallelism_limit: Optional[int] = None + # Example: set to 1GB and executor will try to limit object store # memory usage to 1GB. memory_limit_bytes: Optional[int] = None diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index af5275e61587..7bc8bf9b1db3 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -8,6 +8,7 @@ PhysicalOperator, ) from ray.data._internal.compute import BlockTransform +from ray.data._internal.execution.util import _make_ref_bundles class InputDataBuffer(ExchangeOperator): @@ -31,9 +32,14 @@ def num_outputs_total(self) -> Optional[int]: class MapOperator(OneToOneOperator): """Defines a simple map operation over blocks.""" - def __init__(self, block_transform: BlockTransform, input_op: PhysicalOperator): + def __init__( + self, + block_transform: BlockTransform, + input_op: PhysicalOperator, + name: str = "Map", + ): self._block_transform = block_transform - super().__init__("Map", [input_op]) + super().__init__(name, [input_op]) def execute_one(self, block_bundle: Iterator[Block], _) -> Iterator[Block]: def apply_transform(fn, block_bundle): @@ -41,3 +47,19 @@ def apply_transform(fn, block_bundle): yield fn(b) return apply_transform(self._block_transform, block_bundle) + + +# For testing only. +def _from_dataset_read_tasks(ds) -> PhysicalOperator: + read_tasks = ds._plan._snapshot_blocks._tasks + inputs = InputDataBuffer(_make_ref_bundles([[r] for r in read_tasks])) + + def do_read(block): + import time + + time.sleep(1) + for read_task in block: + for output_block in read_task(): + return output_block # TODO handle remaining blocks + + return MapOperator(do_read, inputs, name="DoRead") diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index f3686f1c3860..bb10ce101d53 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -14,6 +14,7 @@ from ray.data._internal.execution.operators import InputDataBuffer from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray.types import ObjectRef @@ -56,17 +57,33 @@ def refresh_progress_bar(self) -> None: class _OneToOneTask: """Execution state for OneToOneOperator task.""" - def __init__(self, op: OneToOneOperator, state: _OpState, inputs: RefBundle): + def __init__( + self, + op: OneToOneOperator, + state: _OpState, + inputs: RefBundle, + options: ExecutionOptions, + ): self._op: OneToOneOperator = op self._state: _OpState = state self._inputs: RefBundle = inputs self._block_ref: Optional[ObjectRef[Block]] = None self._meta_ref: Optional[ObjectRef[BlockMetadata]] = None + self._options = options def execute(self) -> ObjectRef: if len(self._inputs.blocks) != 1: raise NotImplementedError("TODO: multi-block inputs") - self._block_ref, self._meta_ref = _transform_one.remote( + transform_fn = _transform_one + if self._options.locality_with_output: + transform_fn = transform_fn.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + ray.get_runtime_context().get_node_id(), + soft=True, + ) + ) + print("AFFINITY") + self._block_ref, self._meta_ref = transform_fn.remote( self._op, self._inputs.blocks[0][0] ) self._state.num_active_tasks += 1 @@ -186,7 +203,7 @@ def _select_operator_to_run(self) -> Optional[PhysicalOperator]: The objective of this function is to maximize the throughput of the overall pipeline, subject to defined memory and parallelism limits. """ - PARALLELISM_LIMIT = 8 + PARALLELISM_LIMIT = self._options.parallelism_limit or 8 if len(self._active_tasks) >= PARALLELISM_LIMIT: return None @@ -214,7 +231,7 @@ def _dispatch_next_task(self, op: PhysicalOperator) -> None: """ if isinstance(op, OneToOneOperator): state = self._operator_state[op] - task = _OneToOneTask(op, state, state.inqueues[0].pop(0)) + task = _OneToOneTask(op, state, state.inqueues[0].pop(0), self._options) self._active_tasks[task.execute()] = task else: raise NotImplementedError diff --git a/python/ray/data/_internal/execution/util.py b/python/ray/data/_internal/execution/util.py new file mode 100644 index 000000000000..85d99e2a7c48 --- /dev/null +++ b/python/ray/data/_internal/execution/util.py @@ -0,0 +1,21 @@ +from typing import List, Any + +import ray +from ray.data.block import BlockAccessor +from ray.data._internal.execution.interfaces import RefBundle + + +def _make_ref_bundles(simple_data: List[List[Any]]) -> List[RefBundle]: + output = [] + for block in simple_data: + output.append( + RefBundle( + [ + ( + ray.put(block), + BlockAccessor.for_block(block).get_metadata([], None), + ) + ] + ) + ) + return output diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index c1450d6442e6..f11d48cd4844 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -4,27 +4,15 @@ from typing import List, Any import ray -from ray.data.block import BlockAccessor from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle from ray.data._internal.execution.bulk_executor import BulkExecutor from ray.data._internal.execution.pipelined_executor import PipelinedExecutor -from ray.data._internal.execution.operators import InputDataBuffer, MapOperator - - -def make_ref_bundles(simple_data: List[List[Any]]) -> List[RefBundle]: - output = [] - for block in simple_data: - output.append( - RefBundle( - [ - ( - ray.put(block), - BlockAccessor.for_block(block).get_metadata([], None), - ) - ] - ) - ) - return output +from ray.data._internal.execution.operators import ( + InputDataBuffer, + MapOperator, + _from_dataset_read_tasks, +) +from ray.data._internal.execution.util import _make_ref_bundles def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: @@ -37,7 +25,7 @@ def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: def test_basic_bulk(): executor = BulkExecutor(ExecutionOptions()) - inputs = make_ref_bundles([[x] for x in range(20)]) + inputs = _make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator(lambda block: [b * -1 for b in block], o1) o3 = MapOperator(lambda block: [b * 2 for b in block], o2) @@ -47,6 +35,19 @@ def test_basic_bulk(): assert output == expected, (output, expected) +def test_ds_adapter(): + executor = PipelinedExecutor( + ExecutionOptions(parallelism_limit=3, locality_with_output=True) + ) + o1 = _from_dataset_read_tasks(ray.data.range(20)) + o2 = MapOperator(lambda block: [b * -1 for b in block], o1, name="Negate") + o3 = MapOperator(s(0.3, lambda block: [b * 2 for b in block]), o2, name="Multiply") + it = executor.execute(o3) + output = ref_bundles_to_list(it) + expected = [[x * -2] for x in range(20)] + assert sorted(output) == sorted(expected), (output, expected) + + def s(s, f): def func(x): time.sleep(s) @@ -57,7 +58,7 @@ def func(x): def test_basic_pipelined(): executor = PipelinedExecutor(ExecutionOptions()) - inputs = make_ref_bundles([[x] for x in range(100)]) + inputs = _make_ref_bundles([[x] for x in range(100)]) o1 = InputDataBuffer(inputs) o2 = MapOperator(s(0.05, lambda block: [b * -1 for b in block]), o1) o3 = MapOperator(s(0.3, lambda block: [b * 2 for b in block]), o2) From 45653141a3a14a909d7a06e23379dce8bb3077c2 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 16:02:59 -0800 Subject: [PATCH 14/25] remove affinity print Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/pipelined_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index bb10ce101d53..4e8a867e66a5 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -82,7 +82,6 @@ def execute(self) -> ObjectRef: soft=True, ) ) - print("AFFINITY") self._block_ref, self._meta_ref = transform_fn.remote( self._op, self._inputs.blocks[0][0] ) From 5993253dc29c9ce2134b393c7cf8f0721732b4ed Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 16:24:55 -0800 Subject: [PATCH 15/25] fetch local false Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/pipelined_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index 4e8a867e66a5..e391a399448d 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -178,7 +178,7 @@ def _process_completed_tasks(self) -> None: """ if self._active_tasks: [ref], _ = ray.wait( - list(self._active_tasks), num_returns=1, fetch_local=True + list(self._active_tasks), num_returns=1, fetch_local=False ) task = self._active_tasks.pop(ref) task.completed() From ee74977b1936e5935e373a88eca3740927e4d0e9 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 16:34:09 -0800 Subject: [PATCH 16/25] remove sleep Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/operators.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index 7bc8bf9b1db3..f3949dc84070 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -55,9 +55,6 @@ def _from_dataset_read_tasks(ds) -> PhysicalOperator: inputs = InputDataBuffer(_make_ref_bundles([[r] for r in read_tasks])) def do_read(block): - import time - - time.sleep(1) for read_task in block: for output_block in read_task(): return output_block # TODO handle remaining blocks From 9a39851bd87ccaf37635870d7bbd0c50b2dba19b Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 16:40:38 -0800 Subject: [PATCH 17/25] move reporting up Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/pipelined_executor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index e391a399448d..701ac16c4833 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -176,6 +176,9 @@ def _process_completed_tasks(self) -> None: This does not dispatch any new tasks, but pushes RefBundles through the DAG topology (i.e., operator state inqueues/outqueues). """ + for state in self._operator_state.values(): + state.refresh_progress_bar() + if self._active_tasks: [ref], _ = ray.wait( list(self._active_tasks), num_returns=1, fetch_local=False @@ -194,7 +197,6 @@ def _process_completed_tasks(self) -> None: pass else: assert False, "Unknown operator type: {}".format(op) - state.refresh_progress_bar() def _select_operator_to_run(self) -> Optional[PhysicalOperator]: """Select an operator to run, if possible. From ca43867f5ccec19b63597895a397d9371a1f4809 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 17:58:53 -0800 Subject: [PATCH 18/25] add bundle sizes Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/interfaces.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 76fdb40db234..852d5efb8890 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -40,6 +40,16 @@ def __post_init__(self): assert len(b) == 2, b assert isinstance(b[0], ray.ObjectRef), b assert isinstance(b[1], BlockMetadata), b + assert b[1].num_rows is not None, b + assert b[1].size_bytes is not None, b + + def num_rows(self) -> int: + """Number of rows present in this bundle.""" + return sum(b[1].num_rows for b in self.blocks) + + def size_bytes(self) -> int: + """Size of the blocks of this bundle in bytes.""" + return sum(b[1].size_bytes for b in self.blocks) def destroy(self) -> None: """Clears the object store memory for these blocks.""" From 612f8cc9b7e1b4b8834e51c68589d007b6888fd5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 19:30:35 -0800 Subject: [PATCH 19/25] add actor pool Signed-off-by: Eric Liang --- .../data/_internal/execution/interfaces.py | 12 ++++ .../ray/data/_internal/execution/operators.py | 13 +++- .../_internal/execution/pipelined_executor.py | 60 ++++++++++++++++++- python/ray/data/tests/test_execution.py | 19 ++++++ 4 files changed, 101 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 852d5efb8890..311acadd438f 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -2,6 +2,7 @@ from typing import Any, Dict, List, Optional, Iterator, Tuple import ray +from ray.data._internal.compute import ComputeStrategy, TaskPoolStrategy from ray.data._internal.stats import DatasetStats from ray.data.block import Block, BlockMetadata from ray.types import ObjectRef @@ -167,6 +168,17 @@ def execute_one( """ raise NotImplementedError + def compute_strategy(self) -> ComputeStrategy: + """Return the compute strategy to use for executing these tasks. + + Supported strategies: {TaskPoolStrategy, ActorPoolStrategy}. + """ + return TaskPoolStrategy() + + def ray_remote_args(self) -> Dict[str, Any]: + """Return extra ray remote args to use for execution.""" + return {} + class ExchangeOperator(PhysicalOperator): """A streaming operator for more complex parallel transformations. diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index f3949dc84070..ac8e7a3c2490 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -1,6 +1,7 @@ -from typing import List, Iterator, Optional +from typing import List, Iterator, Optional, Any, Dict from ray.data.block import Block +from ray.data._internal.compute import ComputeStrategy, TaskPoolStrategy from ray.data._internal.execution.interfaces import ( RefBundle, OneToOneOperator, @@ -37,8 +38,12 @@ def __init__( block_transform: BlockTransform, input_op: PhysicalOperator, name: str = "Map", + compute_strategy: Optional[ComputeStrategy] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, ): self._block_transform = block_transform + self._strategy = compute_strategy or TaskPoolStrategy() + self._remote_args = (ray_remote_args or {}).copy() super().__init__(name, [input_op]) def execute_one(self, block_bundle: Iterator[Block], _) -> Iterator[Block]: @@ -48,6 +53,12 @@ def apply_transform(fn, block_bundle): return apply_transform(self._block_transform, block_bundle) + def compute_strategy(self): + return self._strategy + + def ray_remote_args(self): + return self._remote_args + # For testing only. def _from_dataset_read_tasks(ds) -> PhysicalOperator: diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index 701ac16c4833..caaa5a57610e 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -1,7 +1,8 @@ -from typing import Dict, List, Iterator, Optional +from typing import Dict, List, Iterator, Optional, Any import ray -from ray.data.block import Block, BlockMetadata +from ray.data.block import Block, BlockMetadata, BlockAccessor +from ray.data._internal.compute import ActorPoolStrategy from ray.data._internal.execution.interfaces import ( Executor, ExecutionOptions, @@ -18,6 +19,35 @@ from ray.types import ObjectRef +@ray.remote +class _Actor: + @ray.method(num_returns=2) + def transform_one(self, op, block): + [out] = list(op.execute_one([block], {})) + return out, BlockAccessor.for_block(out).get_metadata([], None) + + +class _ActorPool: + def __init__( + self, + size: int, + ray_remote_args: Dict[str, Any] = None, + ): + cls = _Actor + if ray_remote_args: + cls = cls.options(**ray_remote_args) + self.actors = {cls.remote(): 0 for _ in range(size)} + + def pick_actor(self): + actors = sorted(list(self.actors.items()), key=lambda a: a[1]) + least_busy = actors[0][0] + self.actors[least_busy] += 1 + return least_busy + + def return_actor(self, actor): + self.actors[actor] -= 1 + + class _OpState: """Execution state for a PhysicalOperator.""" @@ -30,6 +60,14 @@ def __init__(self, op: PhysicalOperator): self.progress_bar = None self.num_active_tasks = 0 self.num_completed_tasks = 0 + if isinstance(op, OneToOneOperator): + self.compute_strategy = op.compute_strategy() + else: + self.compute_strategy = None + if isinstance(self.compute_strategy, ActorPoolStrategy): + self.actor_pool = _ActorPool(self.compute_strategy.max_size) + else: + self.actor_pool = None def initialize_progress_bar(self, index: int) -> None: self.progress_bar = ProgressBar( @@ -70,10 +108,17 @@ def __init__( self._block_ref: Optional[ObjectRef[Block]] = None self._meta_ref: Optional[ObjectRef[BlockMetadata]] = None self._options = options + self._actor = None def execute(self) -> ObjectRef: if len(self._inputs.blocks) != 1: raise NotImplementedError("TODO: multi-block inputs") + if self._state.actor_pool: + return self._execute_actor() + else: + return self._execute_task() + + def _execute_task(self): transform_fn = _transform_one if self._options.locality_with_output: transform_fn = transform_fn.options( @@ -88,10 +133,21 @@ def execute(self) -> ObjectRef: self._state.num_active_tasks += 1 return self._meta_ref + def _execute_actor(self): + actor = self._state.actor_pool.pick_actor() + self._actor = actor + self._block_ref, self._meta_ref = actor.transform_one.remote( + self._op, self._inputs.blocks[0][0] + ) + self._state.num_active_tasks += 1 + return self._meta_ref + def completed(self): meta = ray.get(self._meta_ref) self._state.num_active_tasks -= 1 self._state.add_output(RefBundle([(self._block_ref, meta)])) + if self._actor: + self._state.actor_pool.return_actor(self._actor) # TODO: optimize memory usage by deleting intermediate results. diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index f11d48cd4844..dd8a280e1cee 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -4,6 +4,7 @@ from typing import List, Any import ray +from ray.data._internal.compute import ActorPoolStrategy from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle from ray.data._internal.execution.bulk_executor import BulkExecutor from ray.data._internal.execution.pipelined_executor import PipelinedExecutor @@ -35,6 +36,24 @@ def test_basic_bulk(): assert output == expected, (output, expected) +def test_actor_strategy(): + executor = PipelinedExecutor(ExecutionOptions()) + inputs = _make_ref_bundles([[x] for x in range(20)]) + o1 = InputDataBuffer(inputs) + o2 = MapOperator(lambda block: [b * -1 for b in block], o1) + o3 = MapOperator( + s(0.8, lambda block: [b * 2 for b in block]), + o2, + compute_strategy=ActorPoolStrategy(1, 2), + ray_remote_args={"num_cpus": 1}, + name="ActorMap", + ) + it = executor.execute(o3) + output = ref_bundles_to_list(it) + expected = [[x * -2] for x in range(20)] + assert sorted(output) == sorted(expected), (output, expected) + + def test_ds_adapter(): executor = PipelinedExecutor( ExecutionOptions(parallelism_limit=3, locality_with_output=True) From f23df9c2cc4a1e3a1361b855cabee514bbe32104 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 20:13:03 -0800 Subject: [PATCH 20/25] fix num gpus Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/pipelined_executor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index caaa5a57610e..ed845e8fe13e 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -65,7 +65,9 @@ def __init__(self, op: PhysicalOperator): else: self.compute_strategy = None if isinstance(self.compute_strategy, ActorPoolStrategy): - self.actor_pool = _ActorPool(self.compute_strategy.max_size) + self.actor_pool = _ActorPool( + self.compute_strategy.max_size, self.op.ray_remote_args() + ) else: self.actor_pool = None From b3b6b0757ca78040b953901a64156631653f4003 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 15 Nov 2022 20:27:09 -0800 Subject: [PATCH 21/25] add spread Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/pipelined_executor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index ed845e8fe13e..ecd18f5a7c29 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -129,6 +129,8 @@ def _execute_task(self): soft=True, ) ) + else: + transform_fn = transform_fn.options(scheduling_strategy="SPREAD") self._block_ref, self._meta_ref = transform_fn.remote( self._op, self._inputs.blocks[0][0] ) From 076dd2fcec429d85899389a2b10c94da32c4dd07 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 2 Dec 2022 14:46:52 -0800 Subject: [PATCH 22/25] wip Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 109 ++++++++++-------- .../data/_internal/execution/interfaces.py | 45 +++++--- 2 files changed, 93 insertions(+), 61 deletions(-) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index fa0b14fc3221..32959eacb0a8 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -11,37 +11,37 @@ ) from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats - - -@ray.remote(num_returns=2) -def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata): - [out] = list(op.execute_one([block], {})) - return out, BlockAccessor.for_block(out).get_metadata([], None) - - -def _naive_task_execute( - inputs: List[RefBundle], op: OneToOneOperator -) -> List[RefBundle]: - """Naively execute a 1:1 operation using Ray tasks. - - TODO: This should be reconciled with ComputeStrategy. - """ - - input_blocks = [] - for bundle in inputs: - for block, _ in bundle.blocks: - input_blocks.append(block) - - out_blocks, out_meta = [], [] - for in_b in input_blocks: - out_b, out_m = _transform_one.remote(op, in_b) - out_blocks.append(out_b) - out_meta.append(out_m) - - bar = ProgressBar("OneToOne", total=len(out_meta)) - out_meta = bar.fetch_until_complete(out_meta) - - return [RefBundle([(b, m)]) for b, m in zip(out_blocks, out_meta)] +# +# +#@ray.remote(num_returns=2) +#def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata): +# [out] = list(op.execute_one([block], {})) +# return out, BlockAccessor.for_block(out).get_metadata([], None) +# +# +#def _naive_task_execute( +# inputs: List[RefBundle], op: OneToOneOperator +#) -> List[RefBundle]: +# """Naively execute a 1:1 operation using Ray tasks. +# +# TODO: This should be reconciled with ComputeStrategy. +# """ +# +# input_blocks = [] +# for bundle in inputs: +# for block, _ in bundle.blocks: +# input_blocks.append(block) +# +# out_blocks, out_meta = [], [] +# for in_b in input_blocks: +# out_b, out_m = _transform_one.remote(op, in_b) +# out_blocks.append(out_b) +# out_meta.append(out_m) +# +# bar = ProgressBar("OneToOne", total=len(out_meta)) +# out_meta = bar.fetch_until_complete(out_meta) +# +# return [RefBundle([(b, m)]) for b, m in zip(out_blocks, out_meta)] class BulkExecutor(Executor): @@ -62,20 +62,13 @@ def execute_recursive(node: PhysicalOperator) -> List[RefBundle]: # Compute dependencies. inputs = [execute_recursive(dep) for dep in node.input_dependencies] - # Execute this operator (3 cases). - output = [] - if isinstance(node, OneToOneOperator): - assert len(inputs) == 1, "OneToOne takes exactly 1 input stream" - output = _naive_task_execute(inputs[0], node) - elif isinstance(node, ExchangeOperator): - for i, ref_bundles in enumerate(inputs): - for r in ref_bundles: - node.add_input(r, input_index=i) - node.inputs_done(i) - while node.has_next(): - output.append(node.get_next()) - else: - assert False, "Unknown operator type: {}".format(node) + # Fully execute this operator. + for i, ref_bundles in enumerate(inputs): + for r in ref_bundles: + node.add_input(r, input_index=i) + node.inputs_done(i) + output = _naive_run_until_complete(node) + node.release_unused_resources() # Cache and return output. saved_outputs[node] = output @@ -85,3 +78,29 @@ def execute_recursive(node: PhysicalOperator) -> List[RefBundle]: def get_stats() -> DatasetStats: raise NotImplementedError + + +def _naive_run_until_complete(node: PhysicalOperator) -> List[RefBundle]: + """Run this operator until completion, assuming all inputs have been submitted. + + Args: + node: The operator to run. + + Returns: + The list of output ref bundles for the operator. + """ + output = [] + tasks = node.get_tasks() + if tasks: + bar = ProgressBar(node.name, total=node.num_outputs_total()) + while tasks: + [ready], remaining = ray.wait(tasks, num_returns=1, fetch_local=True) + node.notify_task_completed(ready) + tasks = node.get_tasks() + while node.has_next(): + bar.update(1) + output.append(node.get_next()) + bar.close() + while node.has_next(): + output.append(node.get_next()) + return output diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 311acadd438f..a7b211b8e80a 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -117,6 +117,34 @@ def num_outputs_total(self) -> Optional[int]: return self.input_dependencies[0].num_outputs_total() return None + def add_input(self, refs: RefBundle, input_index: int) -> None: + """Called when an upstream result is available.""" + raise NotImplementedError + + def inputs_done(self, input_index: int) -> None: + """Called when an upstream operator finishes.""" + raise NotImplementedError + + def has_next(self) -> bool: + """Returns when a downstream output is available.""" + raise NotImplementedError + + def get_next(self) -> RefBundle: + """Get the next downstream output.""" + raise NotImplementedError + + def get_tasks(self) -> List[ray.ObjectRef]: + """Get a list of object references the executor should wait on.""" + raise NotImplementedError + + def notify_task_completed(self, task: ray.ObjectRef) -> None: + """Executor calls this when the given task is completed and local.""" + raise NotImplementedError + + def release_unused_resources(self) -> None: + """Release any currently unused operator resources.""" + raise NotImplementedError + class Executor: """Abstract class for executors, which implement physical operator execution. @@ -190,22 +218,7 @@ class ExchangeOperator(PhysicalOperator): Subclasses: AllToAllOperator """ - - def add_input(self, refs: RefBundle, input_index: int) -> None: - """Called when an upstream result is available.""" - raise NotImplementedError - - def inputs_done(self, input_index: int) -> None: - """Called when an upstream operator finishes.""" - raise NotImplementedError - - def has_next(self) -> bool: - """Returns when a downstream output is available.""" - raise NotImplementedError - - def get_next(self) -> RefBundle: - """Get the next downstream output.""" - raise NotImplementedError + pass class AllToAllOperator(ExchangeOperator): From 3cb54400526b3cc6119997e7d1340969dfd26619 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 2 Dec 2022 15:25:56 -0800 Subject: [PATCH 23/25] wip Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 31 ------- .../data/_internal/execution/interfaces.py | 48 ++--------- .../_internal/execution/one_to_one_state.py | 44 ++++++++++ .../ray/data/_internal/execution/operators.py | 86 +++++++++++++++++-- 4 files changed, 129 insertions(+), 80 deletions(-) create mode 100644 python/ray/data/_internal/execution/one_to_one_state.py diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index 32959eacb0a8..dba8d9ef67cf 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -11,37 +11,6 @@ ) from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats -# -# -#@ray.remote(num_returns=2) -#def _transform_one(op: OneToOneOperator, block: Block) -> (Block, BlockMetadata): -# [out] = list(op.execute_one([block], {})) -# return out, BlockAccessor.for_block(out).get_metadata([], None) -# -# -#def _naive_task_execute( -# inputs: List[RefBundle], op: OneToOneOperator -#) -> List[RefBundle]: -# """Naively execute a 1:1 operation using Ray tasks. -# -# TODO: This should be reconciled with ComputeStrategy. -# """ -# -# input_blocks = [] -# for bundle in inputs: -# for block, _ in bundle.blocks: -# input_blocks.append(block) -# -# out_blocks, out_meta = [], [] -# for in_b in input_blocks: -# out_b, out_m = _transform_one.remote(op, in_b) -# out_blocks.append(out_b) -# out_meta.append(out_m) -# -# bar = ProgressBar("OneToOne", total=len(out_meta)) -# out_meta = bar.fetch_until_complete(out_meta) -# -# return [RefBundle([(b, m)]) for b, m in zip(out_blocks, out_meta)] class BulkExecutor(Executor): diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index a7b211b8e80a..6f22946a3538 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -108,6 +108,9 @@ def input_dependencies(self) -> List["PhysicalOperator"]: ), "PhysicalOperator.__init__() was not called." return self._input_dependencies + def __reduce__(self): + raise ValueError("PhysicalOperator is not serializable.") + def num_outputs_total(self) -> Optional[int]: """Returns the total number of output bundles of this operator, if known. @@ -135,7 +138,7 @@ def get_next(self) -> RefBundle: def get_tasks(self) -> List[ray.ObjectRef]: """Get a list of object references the executor should wait on.""" - raise NotImplementedError + raise [] def notify_task_completed(self, task: ray.ObjectRef) -> None: """Executor calls this when the given task is completed and local.""" @@ -143,7 +146,7 @@ def notify_task_completed(self, task: ray.ObjectRef) -> None: def release_unused_resources(self) -> None: """Release any currently unused operator resources.""" - raise NotImplementedError + pass class Executor: @@ -167,47 +170,6 @@ def get_stats() -> DatasetStats: raise NotImplementedError -class OneToOneOperator(PhysicalOperator): - """A streaming operator that maps inputs 1:1 to outputs. - - Subclasses need only define a single `execute_one` method that runs in a single - process, leaving the implementation of parallel and distributed execution to the - Executor implementation. - - Subclasses: - Read - Map - Write - SortReduce - WholeStage - """ - - def execute_one( - self, block_bundle: Iterator[Block], input_metadata: Dict[str, Any] - ) -> Iterator[Block]: - """Execute a block transformation locally on a worker process. - - Args: - block_bundle: Iterator over input blocks of a RefBundle. Typically, this - will yield only a single block, unless the transformation has multiple - inputs, e.g., in the SortReduce or ZipBlocks cases. It is an iterator - instead of a list for memory efficiency. - input_metadata: Extra metadata provided from the upstream operator. - """ - raise NotImplementedError - - def compute_strategy(self) -> ComputeStrategy: - """Return the compute strategy to use for executing these tasks. - - Supported strategies: {TaskPoolStrategy, ActorPoolStrategy}. - """ - return TaskPoolStrategy() - - def ray_remote_args(self) -> Dict[str, Any]: - """Return extra ray remote args to use for execution.""" - return {} - - class ExchangeOperator(PhysicalOperator): """A streaming operator for more complex parallel transformations. diff --git a/python/ray/data/_internal/execution/one_to_one_state.py b/python/ray/data/_internal/execution/one_to_one_state.py new file mode 100644 index 000000000000..10e2aca655bf --- /dev/null +++ b/python/ray/data/_internal/execution/one_to_one_state.py @@ -0,0 +1,44 @@ +from typing import Any, Dict, Callable + +from ray.data._internal.compute import ComputeStrategy +from ray.data._internal.execution.interfaces import ( + RefBundle, +) +from ray.types import ObjectRef + + +@ray.remote(num_returns=2) +def _transform_one(fn: Callable, block: Block) -> (Block, BlockMetadata): + [out] = list(fn([block], {})) + return out, BlockAccessor.for_block(out).get_metadata([], None) + + +# TODO: handle block splitting? +class _Task: + def __init__(self, block_ref: ray.ObjectRef): + self.block_ref = ray.ObjectRef + + +class OneToOneOperatorState: + def __init__(self, op: OneToOneOperator): + self._transform = op.get_transform_fn() + self._compute_strategy = op.compute_strategy() + self._ray_remote_args = op.ray_remote_args() + self.outputs = [] + self.tasks = {} + + def add_input(self, bundle: RefBundle) -> None: + input_blocks = [] + for block, _ in bundle.blocks: + input_blocks.append(block) + for in_b in input_blocks: + out_b, out_m = _transform_one.remote(self._transform_fn, in_b) + self.tasks[out_m] = _Task(out_b) + + def task_completed(self, ref: ray.ObjectRef) -> None: + task = self.tasks.pop(ref) + block_meta = ray.get(ref) + self.outputs.append(RefBundle([(task.block_ref, block_meta)])) + + def release_unused_resources(self) -> None: + pass diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index ac8e7a3c2490..21ecfa06b598 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -4,12 +4,12 @@ from ray.data._internal.compute import ComputeStrategy, TaskPoolStrategy from ray.data._internal.execution.interfaces import ( RefBundle, - OneToOneOperator, ExchangeOperator, PhysicalOperator, ) from ray.data._internal.compute import BlockTransform from ray.data._internal.execution.util import _make_ref_bundles +from ray.data._internal.execution.one_to_one_state import OneToOneOperatorState class InputDataBuffer(ExchangeOperator): @@ -30,6 +30,75 @@ def num_outputs_total(self) -> Optional[int]: return self._num_outputs +class OneToOneOperator(PhysicalOperator): + """A streaming operator that maps inputs 1:1 to outputs. + + Subclasses need only define a single `execute_one` method that runs in a single + process, leaving the implementation of parallel and distributed execution to the + Executor implementation. + + Subclasses: + Read + Map + Write + SortReduce + WholeStage + """ + + def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]): + super().__init__(name, [input_op]) + self._execution_state = OneToOneOperatorState( + self.compute_strategy(), self.ray_remote_args()) + + def get_transform_fn(self) -> Callable[[Iterator[Block], Dict[str, Any]], Iterator[Block]]: + """Return the block transformation to run on a worker process. + + This callable must be serializable as it will be sent to remote processes. + + Returns: + A callable taking the following inputs: + block_bundle: Iterator over input blocks of a RefBundle. Typically, + this will yield only a single block, unless the transformation has + multiple inputs, e.g., in the SortReduce or ZipBlocks cases. It is + an iterator instead of a list for memory efficiency. + input_metadata: Extra metadata provided from the upstream operator. + """ + raise NotImplementedError + + def compute_strategy(self) -> ComputeStrategy: + """Return the compute strategy to use for executing these tasks. + + Supported strategies: {TaskPoolStrategy, ActorPoolStrategy}. + """ + return TaskPoolStrategy() + + def ray_remote_args(self) -> Dict[str, Any]: + """Return extra ray remote args to use for execution.""" + return {} + + def add_input(self, refs: RefBundle, input_index: int) -> None: + assert input_index == 0, input_index + self._execution_state.add_input(refs) + + def inputs_done(self, input_index: int) -> None: + pass + + def has_next(self) -> bool: + return len(self._execution_state.outputs) > 0 + + def get_next(self) -> RefBundle: + return self._execution_state.outputs.pop(0) + + def get_tasks(self) -> List[ray.ObjectRef]: + return list(self._execution_state.tasks) + + def notify_task_completed(self, task: ray.ObjectRef) -> None: + self._execution_state.task_completed(task) + + def release_unused_resources(self) -> None: + self._execution_state.release_unused_resources() + + class MapOperator(OneToOneOperator): """Defines a simple map operation over blocks.""" @@ -46,12 +115,17 @@ def __init__( self._remote_args = (ray_remote_args or {}).copy() super().__init__(name, [input_op]) - def execute_one(self, block_bundle: Iterator[Block], _) -> Iterator[Block]: - def apply_transform(fn, block_bundle): - for b in block_bundle: - yield fn(b) + def get_transform_fn(self): + transform = self.block_transform + + def execute_one(block_bundle: Iterator[Block], _) -> Iterator[Block]: + def apply_transform(fn, block_bundle): + for b in block_bundle: + yield fn(b) + + return apply_transform(transform, block_bundle) - return apply_transform(self._block_transform, block_bundle) + return execute_one def compute_strategy(self): return self._strategy From 72dab9f032d4c6f64b01b7f53e0fe1f1d5ffb291 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 2 Dec 2022 15:35:14 -0800 Subject: [PATCH 24/25] wip Signed-off-by: Eric Liang --- .../data/_internal/execution/bulk_executor.py | 3 --- .../ray/data/_internal/execution/interfaces.py | 6 +++--- .../data/_internal/execution/one_to_one_state.py | 16 ++++++++++------ python/ray/data/_internal/execution/operators.py | 12 ++++++++---- .../_internal/execution/pipelined_executor.py | 3 +-- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index dba8d9ef67cf..43b7fa4f16e8 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -1,13 +1,10 @@ from typing import Dict, List, Iterator import ray -from ray.data.block import Block, BlockMetadata, BlockAccessor from ray.data._internal.execution.interfaces import ( Executor, RefBundle, PhysicalOperator, - OneToOneOperator, - ExchangeOperator, ) from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 6f22946a3538..3cc86a7dec7e 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -1,8 +1,7 @@ from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, Iterator, Tuple +from typing import Any, Dict, List, Optional, Iterator, Tuple, Callable import ray -from ray.data._internal.compute import ComputeStrategy, TaskPoolStrategy from ray.data._internal.stats import DatasetStats from ray.data.block import Block, BlockMetadata from ray.types import ObjectRef @@ -180,6 +179,7 @@ class ExchangeOperator(PhysicalOperator): Subclasses: AllToAllOperator """ + pass @@ -192,7 +192,7 @@ class AllToAllOperator(ExchangeOperator): SortMap """ - def __init__(self, preprocessor: Optional[OneToOneOperator] = None): + def __init__(self, preprocessor: Optional[Callable] = None): self._preprocessor = preprocessor self._buffer = [] self._outbox = None diff --git a/python/ray/data/_internal/execution/one_to_one_state.py b/python/ray/data/_internal/execution/one_to_one_state.py index 10e2aca655bf..a83fc6dbd7a5 100644 --- a/python/ray/data/_internal/execution/one_to_one_state.py +++ b/python/ray/data/_internal/execution/one_to_one_state.py @@ -1,11 +1,15 @@ -from typing import Any, Dict, Callable +from typing import Callable, TYPE_CHECKING -from ray.data._internal.compute import ComputeStrategy +import ray from ray.data._internal.execution.interfaces import ( RefBundle, ) +from ray.data.block import Block, BlockMetadata, BlockAccessor from ray.types import ObjectRef +if TYPE_CHECKING: + from ray.data._internal.execution.operators import OneToOneOperator + @ray.remote(num_returns=2) def _transform_one(fn: Callable, block: Block) -> (Block, BlockMetadata): @@ -15,12 +19,12 @@ def _transform_one(fn: Callable, block: Block) -> (Block, BlockMetadata): # TODO: handle block splitting? class _Task: - def __init__(self, block_ref: ray.ObjectRef): - self.block_ref = ray.ObjectRef + def __init__(self, block_ref: ObjectRef): + self.block_ref = block_ref class OneToOneOperatorState: - def __init__(self, op: OneToOneOperator): + def __init__(self, op: "OneToOneOperator"): self._transform = op.get_transform_fn() self._compute_strategy = op.compute_strategy() self._ray_remote_args = op.ray_remote_args() @@ -35,7 +39,7 @@ def add_input(self, bundle: RefBundle) -> None: out_b, out_m = _transform_one.remote(self._transform_fn, in_b) self.tasks[out_m] = _Task(out_b) - def task_completed(self, ref: ray.ObjectRef) -> None: + def task_completed(self, ref: ObjectRef) -> None: task = self.tasks.pop(ref) block_meta = ray.get(ref) self.outputs.append(RefBundle([(task.block_ref, block_meta)])) diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index 21ecfa06b598..f684619381fa 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -1,5 +1,6 @@ -from typing import List, Iterator, Optional, Any, Dict +from typing import List, Iterator, Optional, Any, Dict, Callable +import ray from ray.data.block import Block from ray.data._internal.compute import ComputeStrategy, TaskPoolStrategy from ray.data._internal.execution.interfaces import ( @@ -46,11 +47,14 @@ class OneToOneOperator(PhysicalOperator): """ def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]): - super().__init__(name, [input_op]) + super().__init__(name, input_dependencies) self._execution_state = OneToOneOperatorState( - self.compute_strategy(), self.ray_remote_args()) + self.compute_strategy(), self.ray_remote_args() + ) - def get_transform_fn(self) -> Callable[[Iterator[Block], Dict[str, Any]], Iterator[Block]]: + def get_transform_fn( + self, + ) -> Callable[[Iterator[Block], Dict[str, Any]], Iterator[Block]]: """Return the block transformation to run on a worker process. This callable must be serializable as it will be sent to remote processes. diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index ecd18f5a7c29..d6ad8da3717c 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -8,11 +8,10 @@ ExecutionOptions, RefBundle, PhysicalOperator, - OneToOneOperator, ExchangeOperator, ) from ray.data._internal.execution.bulk_executor import _transform_one -from ray.data._internal.execution.operators import InputDataBuffer +from ray.data._internal.execution.operators import InputDataBuffer, OneToOneOperator from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy From 87ba983df7c05eb400d191b4e5c8f6e9a0de19f8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 2 Dec 2022 15:39:30 -0800 Subject: [PATCH 25/25] passing basic tests Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/interfaces.py | 2 +- python/ray/data/_internal/execution/one_to_one_state.py | 2 +- python/ray/data/_internal/execution/operators.py | 6 ++---- python/ray/data/_internal/execution/pipelined_executor.py | 2 +- python/ray/data/tests/test_execution.py | 4 ++-- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 3cc86a7dec7e..6fa2081db294 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -137,7 +137,7 @@ def get_next(self) -> RefBundle: def get_tasks(self) -> List[ray.ObjectRef]: """Get a list of object references the executor should wait on.""" - raise [] + return [] def notify_task_completed(self, task: ray.ObjectRef) -> None: """Executor calls this when the given task is completed and local.""" diff --git a/python/ray/data/_internal/execution/one_to_one_state.py b/python/ray/data/_internal/execution/one_to_one_state.py index a83fc6dbd7a5..97c5f1710278 100644 --- a/python/ray/data/_internal/execution/one_to_one_state.py +++ b/python/ray/data/_internal/execution/one_to_one_state.py @@ -25,7 +25,7 @@ def __init__(self, block_ref: ObjectRef): class OneToOneOperatorState: def __init__(self, op: "OneToOneOperator"): - self._transform = op.get_transform_fn() + self._transform_fn = op.get_transform_fn() self._compute_strategy = op.compute_strategy() self._ray_remote_args = op.ray_remote_args() self.outputs = [] diff --git a/python/ray/data/_internal/execution/operators.py b/python/ray/data/_internal/execution/operators.py index f684619381fa..0f74766c1dd2 100644 --- a/python/ray/data/_internal/execution/operators.py +++ b/python/ray/data/_internal/execution/operators.py @@ -48,9 +48,7 @@ class OneToOneOperator(PhysicalOperator): def __init__(self, name: str, input_dependencies: List["PhysicalOperator"]): super().__init__(name, input_dependencies) - self._execution_state = OneToOneOperatorState( - self.compute_strategy(), self.ray_remote_args() - ) + self._execution_state = OneToOneOperatorState(self) def get_transform_fn( self, @@ -120,7 +118,7 @@ def __init__( super().__init__(name, [input_op]) def get_transform_fn(self): - transform = self.block_transform + transform = self._block_transform def execute_one(block_bundle: Iterator[Block], _) -> Iterator[Block]: def apply_transform(fn, block_bundle): diff --git a/python/ray/data/_internal/execution/pipelined_executor.py b/python/ray/data/_internal/execution/pipelined_executor.py index d6ad8da3717c..6b931d191a4f 100644 --- a/python/ray/data/_internal/execution/pipelined_executor.py +++ b/python/ray/data/_internal/execution/pipelined_executor.py @@ -10,7 +10,7 @@ PhysicalOperator, ExchangeOperator, ) -from ray.data._internal.execution.bulk_executor import _transform_one +from ray.data._internal.execution.one_to_one_state import _transform_one from ray.data._internal.execution.operators import InputDataBuffer, OneToOneOperator from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats diff --git a/python/ray/data/tests/test_execution.py b/python/ray/data/tests/test_execution.py index dd8a280e1cee..acc8a077e565 100644 --- a/python/ray/data/tests/test_execution.py +++ b/python/ray/data/tests/test_execution.py @@ -31,8 +31,8 @@ def test_basic_bulk(): o2 = MapOperator(lambda block: [b * -1 for b in block], o1) o3 = MapOperator(lambda block: [b * 2 for b in block], o2) it = executor.execute(o3) - output = ref_bundles_to_list(it) - expected = [[x * -2] for x in range(20)] + output = sorted(ref_bundles_to_list(it)) # TODO: preserve order option + expected = sorted([[x * -2] for x in range(20)]) assert output == expected, (output, expected)