Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Enable Progress Bars for PyRunner and RayRunner #1609

Merged
merged 13 commits into from
Nov 17, 2023
12 changes: 10 additions & 2 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class PartitionTask(Generic[PartitionT]):
instructions: list[Instruction]
resource_request: ResourceRequest
num_results: int
stage_id: int
_id: int = field(default_factory=lambda: next(ID_GEN))

def id(self) -> str:
Expand Down Expand Up @@ -110,7 +111,7 @@ def add_instruction(
self.num_results = instruction.num_outputs()
return self

def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[PartitionT]:
def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPartitionTask[PartitionT]:
"""Create a SingleOutputPartitionTask from this PartitionTaskBuilder.

Returns a "frozen" version of this PartitionTask that cannot have instructions added.
Expand All @@ -125,12 +126,13 @@ def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[Par

return SingleOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=1,
resource_request=resource_request_final_cpu,
)

def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[PartitionT]:
def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]:
"""Create a MultiOutputPartitionTask from this PartitionTaskBuilder.

Same as finalize_partition_task_single_output, except the output of this PartitionTask is a list of partitions.
Expand All @@ -143,6 +145,7 @@ def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[Parti
)
return MultiOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=self.num_results,
resource_request=resource_request_final_cpu,
Expand Down Expand Up @@ -566,6 +569,11 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]


@dataclass(frozen=True)
class GlobalLimit(LocalLimit):
pass


@dataclass(frozen=True)
class MapPartition(SingleOutputInstruction):
map_op: MapPartitionOp
Expand Down
71 changes: 45 additions & 26 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]]


def _stage_id_counter():
counter = 0
while True:
counter += 1
yield counter


stage_id_counter = _stage_id_counter()


def partition_read(
partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -81,6 +91,7 @@ def file_read(
Yield a plan to read those filenames.
"""
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
output_partition_index = 0

while True:
Expand Down Expand Up @@ -119,7 +130,7 @@ def file_read(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand Down Expand Up @@ -185,7 +196,7 @@ def join(
# As the materializations complete, emit new steps to join each left and right partition.
left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()
right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
yield_left = True

while True:
Expand Down Expand Up @@ -237,7 +248,7 @@ def join(
try:
step = next(next_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
next_requests.append(step)
yield step

Expand All @@ -246,9 +257,9 @@ def join(
# Are we still waiting for materializations to complete? (We will emit more joins from them).
if len(left_requests) + len(right_requests) > 0:
logger.debug(
"join blocked on completion of sources.\n"
f"Left sources: {left_requests}\n"
f"Right sources: {right_requests}",
"join blocked on completion of sources.\n Left sources: %s\nRight sources: %s",
left_requests,
right_requests,
)
yield None

Expand Down Expand Up @@ -302,7 +313,7 @@ def global_limit(
remaining_partitions = num_partitions

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
# To dynamically schedule the global limit, we need to apply an appropriate limit to each child partition.
# We don't know their exact sizes since they are pending execution, so we will have to iteratively execute them,
# count their rows, and then apply and update the remaining limit.
Expand All @@ -317,16 +328,17 @@ def global_limit(
# Apply and deduct the rolling global limit.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()

limit = remaining_rows and min(remaining_rows, done_task.partition_metadata().num_rows)
done_task_metadata = done_task.partition_metadata()
limit = remaining_rows and min(remaining_rows, done_task_metadata.num_rows)

global_limit_step = PartitionTaskBuilder[PartitionT](
inputs=[done_task.partition()],
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
partial_metadatas=[done_task_metadata],
resource_request=ResourceRequest(memory_bytes=done_task_metadata.size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(limit),
instruction=execution_step.GlobalLimit(limit),
)

yield global_limit_step
remaining_partitions -= 1
remaining_rows -= limit
Expand All @@ -346,7 +358,7 @@ def global_limit(
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(0),
instruction=execution_step.GlobalLimit(0),
)
for _ in range(remaining_partitions)
)
Expand Down Expand Up @@ -376,10 +388,11 @@ def global_limit(
if len(materializations) == 0 and remaining_rows > 0 and partial_meta.num_rows is not None:
limit = min(remaining_rows, partial_meta.num_rows)
child_step = child_step.add_instruction(instruction=execution_step.LocalLimit(limit))

remaining_partitions -= 1
remaining_rows -= limit
else:
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -395,7 +408,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
"""Wrap a plan that emits multi-output tasks to a plan that emits single-output tasks."""

materializations: deque[MultiOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()
Expand All @@ -409,7 +422,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand All @@ -436,10 +449,10 @@ def split(
# Splitting evenly is fairly important if this operation is to be used for parallelism.
# (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.)
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -503,7 +516,7 @@ def coalesce(
merges_per_result = deque([stop - start for start, stop in zip(starts, stops)])

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# See if we can emit a coalesced partition.
num_partitions_to_merge = merges_per_result[0]
Expand Down Expand Up @@ -545,7 +558,7 @@ def coalesce(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -570,11 +583,12 @@ def reduce(
"""

materializations = list()
stage_id = next(stage_id_counter)

# Dispatch all fanouts.
for step in fanout_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -611,14 +625,17 @@ def sort(

# First, materialize the child plan.
source_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_children = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id_children)
source_materializations.append(step)
yield step

# Sample all partitions (to be used for calculating sort boundaries).
sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_sampling = next(stage_id_counter)

for source in source_materializations:
while not source.done():
logger.debug("sort blocked on completion of source: %s", source)
Expand All @@ -632,7 +649,7 @@ def sort(
.add_instruction(
instruction=execution_step.Sample(sort_by=sort_by),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_sampling)
)

sample_materializations.append(sample)
Expand All @@ -643,6 +660,8 @@ def sort(
logger.debug("sort blocked on completion of all samples: %s", sample_materializations)
yield None

stage_id_reduce = next(stage_id_counter)

# Reduce the samples to get sort boundaries.
boundaries = (
PartitionTaskBuilder[PartitionT](
Expand All @@ -656,7 +675,7 @@ def sort(
descending=descending,
),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_reduce)
)
yield boundaries

Expand Down Expand Up @@ -714,7 +733,7 @@ def materialize(
"""

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
Expand All @@ -725,7 +744,7 @@ def materialize(
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
assert isinstance(step, (PartitionTask, type(None)))

Expand Down
3 changes: 3 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@
fields = [e._to_field(schema) for e in self]
return Schema._from_field_name_and_types([(f.name, f.dtype) for f in fields])

def __repr__(self) -> str:
return f"{self._output_name_to_exprs.values()}"

Check warning on line 758 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L758

Added line #L758 was not covered by tests


class ExpressionImageNamespace(ExpressionNamespace):
"""Expression operations for image columns."""
Expand Down
64 changes: 64 additions & 0 deletions daft/runners/progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

import os
from typing import Any

from tqdm.auto import tqdm

from daft.execution.execution_step import PartitionTask


class ProgressBar:
def __init__(self, use_ray_tqdm: bool, show_tasks_bar: bool = False, disable: bool = False) -> None:
self.use_ray_tqdm = use_ray_tqdm
self.show_tasks_bar = show_tasks_bar
self.tqdm_mod = tqdm
self.pbars: dict[int, tqdm] = dict()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize as self.pbars = {-1: ...}? We could also document here that -1 represents the progress for "All tasks".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we actually dont want to initialize the the progress bar until we get a task. Otherwise the total is 0 and it's in a weird state.

self.disable = (
disable
or not bool(int(os.environ.get("RAY_TQDM", "1")))
or not bool(int(os.environ.get("DAFT_PROGRESS_BAR", "1")))
)

def _make_new_bar(self, stage_id: int, name: str):
if self.use_ray_tqdm:
self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars))

Check warning on line 25 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L25

Added line #L25 was not covered by tests
else:
self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars), leave=False)

def mark_task_start(self, step: PartitionTask[Any]) -> None:
if self.disable:
return

Check warning on line 31 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L31

Added line #L31 was not covered by tests
if self.show_tasks_bar:
if len(self.pbars) == 0:
self._make_new_bar(-1, "Tasks")

Check warning on line 34 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L33-L34

Added lines #L33 - L34 were not covered by tests
else:
task_pbar = self.pbars[-1]
task_pbar.total += 1
if self.use_ray_tqdm:
task_pbar.refresh()

Check warning on line 39 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L36-L39

Added lines #L36 - L39 were not covered by tests

stage_id = step.stage_id

if stage_id not in self.pbars:
name = "-".join(i.__class__.__name__ for i in step.instructions)
self._make_new_bar(stage_id, name)
else:
pb = self.pbars[stage_id]
pb.total += 1
if self.use_ray_tqdm:
pb.refresh()

Check warning on line 50 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L50

Added line #L50 was not covered by tests

def mark_task_done(self, step: PartitionTask[Any]) -> None:
if self.disable:
return

Check warning on line 54 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L54

Added line #L54 was not covered by tests

stage_id = step.stage_id
self.pbars[stage_id].update(1)
if self.show_tasks_bar:
self.pbars[-1].update(1)

Check warning on line 59 in daft/runners/progress_bar.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/progress_bar.py#L59

Added line #L59 was not covered by tests

def close(self) -> None:
for p in self.pbars.values():
p.close()
del p
Loading
Loading