Skip to content

Commit

Permalink
[data] Streaming executor fixes #2 (ray-project#32759)
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
jianoaix authored and edoakes committed Mar 22, 2023
1 parent fa0f3db commit 23f66c4
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
4 changes: 3 additions & 1 deletion python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,13 @@ def bulk_fn(

def _bundles_to_block_list(bundles: Iterator[RefBundle]) -> BlockList:
blocks, metadata = [], []
owns_blocks = True
for ref_bundle in bundles:
if not ref_bundle.owns_blocks:
owns_blocks = False
for block, meta in ref_bundle.blocks:
blocks.append(block)
metadata.append(meta)
owns_blocks = all(b.owns_blocks for b in bundles)
return BlockList(blocks, metadata, owned_by_consumer=owns_blocks)


Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2825,7 +2825,7 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]]
for batch in self.iter_batches(
batch_size=None, prefetch_blocks=prefetch_blocks, batch_format=batch_format
):
batch = BlockAccessor.for_block(batch)
batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
for row in batch.iter_rows():
yield row

Expand Down
46 changes: 37 additions & 9 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,17 +1369,26 @@ def test_count_lazy(ray_start_regular_shared):

def test_lazy_loading_exponential_rampup(ray_start_regular_shared):
ds = ray.data.range(100, parallelism=20)
assert ds._plan.execute()._num_computed() == 0

def check_num_computed(expected):
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
# In streaing executor, ds.take() will not invoke partial execution
# in LazyBlocklist.
assert ds._plan.execute()._num_computed() == 0
else:
assert ds._plan.execute()._num_computed() == expected

check_num_computed(0)
assert ds.take(10) == list(range(10))
assert ds._plan.execute()._num_computed() == 2
check_num_computed(2)
assert ds.take(20) == list(range(20))
assert ds._plan.execute()._num_computed() == 4
check_num_computed(4)
assert ds.take(30) == list(range(30))
assert ds._plan.execute()._num_computed() == 8
check_num_computed(8)
assert ds.take(50) == list(range(50))
assert ds._plan.execute()._num_computed() == 16
check_num_computed(16)
assert ds.take(100) == list(range(100))
assert ds._plan.execute()._num_computed() == 20
check_num_computed(20)


def test_dataset_repr(ray_start_regular_shared):
Expand Down Expand Up @@ -1696,7 +1705,14 @@ def to_pylist(table):
# Default ArrowRows.
for row, t_row in zip(ds.iter_rows(), to_pylist(t)):
assert isinstance(row, TableRow)
assert isinstance(row, ArrowRow)
# In streaming, we set batch_format to "default" because calling
# ds.dataset_format() will still invoke bulk execution and we want
# to avoid that. As a result, it's receiving PandasRow (the defaut
# batch format).
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert isinstance(row, PandasRow)
else:
assert isinstance(row, ArrowRow)
assert row == t_row

# PandasRows after conversion.
Expand All @@ -1710,7 +1726,14 @@ def to_pylist(table):
# Prefetch.
for row, t_row in zip(ds.iter_rows(prefetch_blocks=1), to_pylist(t)):
assert isinstance(row, TableRow)
assert isinstance(row, ArrowRow)
# In streaming, we set batch_format to "default" because calling
# ds.dataset_format() will still invoke bulk execution and we want
# to avoid that. As a result, it's receiving PandasRow (the defaut
# batch format).
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert isinstance(row, PandasRow)
else:
assert isinstance(row, ArrowRow)
assert row == t_row


Expand Down Expand Up @@ -2181,7 +2204,12 @@ def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared):
ds = ray.data.range(32, parallelism=8)
expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8]
for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks):
assert ds._plan.execute()._num_computed() == expected
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
# In streaming execution of ds.iter_batches(), there is no partial
# execution so _num_computed() in LazyBlocklist is 0.
assert ds._plan.execute()._num_computed() == 0
else:
assert ds._plan.execute()._num_computed() == expected


def test_add_column(ray_start_regular_shared):
Expand Down

0 comments on commit 23f66c4

Please sign in to comment.