Skip to content

Commit

Permalink
prototype of tqdm
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Nov 15, 2023
1 parent 56cd1d6 commit 43025a0
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 31 deletions.
12 changes: 10 additions & 2 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class PartitionTask(Generic[PartitionT]):
instructions: list[Instruction]
resource_request: ResourceRequest
num_results: int
stage_id: int
_id: int = field(default_factory=lambda: next(ID_GEN))

def id(self) -> str:
Expand Down Expand Up @@ -110,7 +111,7 @@ def add_instruction(
self.num_results = instruction.num_outputs()
return self

def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[PartitionT]:
def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPartitionTask[PartitionT]:
"""Create a SingleOutputPartitionTask from this PartitionTaskBuilder.
Returns a "frozen" version of this PartitionTask that cannot have instructions added.
Expand All @@ -125,12 +126,13 @@ def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[Par

return SingleOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=1,
resource_request=resource_request_final_cpu,
)

def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[PartitionT]:
def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]:
"""Create a MultiOutputPartitionTask from this PartitionTaskBuilder.
Same as finalize_partition_task_single_output, except the output of this PartitionTask is a list of partitions.
Expand All @@ -143,6 +145,7 @@ def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[Parti
)
return MultiOutputPartitionTask[PartitionT](
inputs=self.inputs,
stage_id=stage_id,
instructions=self.instructions,
num_results=self.num_results,
resource_request=resource_request_final_cpu,
Expand Down Expand Up @@ -566,6 +569,11 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]


@dataclass(frozen=True)
class GlobalLimit(LocalLimit):
pass


@dataclass(frozen=True)
class MapPartition(SingleOutputInstruction):
map_op: MapPartitionOp
Expand Down
65 changes: 42 additions & 23 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@
MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]]


def _stage_id_counter():
counter = 0
while True:
counter += 1
yield counter


stage_id_counter = _stage_id_counter()


def partition_read(
partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None
) -> InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -81,6 +91,7 @@ def file_read(
Yield a plan to read those filenames.
"""
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
output_partition_index = 0

while True:
Expand Down Expand Up @@ -119,7 +130,7 @@ def file_read(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand Down Expand Up @@ -185,7 +196,7 @@ def join(
# As the materializations complete, emit new steps to join each left and right partition.
left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()
right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
yield_left = True

while True:
Expand Down Expand Up @@ -237,7 +248,7 @@ def join(
try:
step = next(next_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
next_requests.append(step)
yield step

Expand Down Expand Up @@ -302,7 +313,7 @@ def global_limit(
remaining_partitions = num_partitions

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
# To dynamically schedule the global limit, we need to apply an appropriate limit to each child partition.
# We don't know their exact sizes since they are pending execution, so we will have to iteratively execute them,
# count their rows, and then apply and update the remaining limit.
Expand All @@ -317,16 +328,17 @@ def global_limit(
# Apply and deduct the rolling global limit.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()

limit = remaining_rows and min(remaining_rows, done_task.partition_metadata().num_rows)
done_task_metadata = done_task.partition_metadata()
limit = remaining_rows and min(remaining_rows, done_task_metadata.num_rows)

global_limit_step = PartitionTaskBuilder[PartitionT](
inputs=[done_task.partition()],
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
partial_metadatas=[done_task_metadata],
resource_request=ResourceRequest(memory_bytes=done_task_metadata.size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(limit),
instruction=execution_step.GlobalLimit(limit),
)

yield global_limit_step
remaining_partitions -= 1
remaining_rows -= limit
Expand All @@ -346,7 +358,7 @@ def global_limit(
partial_metadatas=[done_task.partition_metadata()],
resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes),
).add_instruction(
instruction=execution_step.LocalLimit(0),
instruction=execution_step.GlobalLimit(0),
)
for _ in range(remaining_partitions)
)
Expand Down Expand Up @@ -376,10 +388,11 @@ def global_limit(
if len(materializations) == 0 and remaining_rows > 0 and partial_meta.num_rows is not None:
limit = min(remaining_rows, partial_meta.num_rows)
child_step = child_step.add_instruction(instruction=execution_step.LocalLimit(limit))

remaining_partitions -= 1
remaining_rows -= limit
else:
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -395,7 +408,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
"""Wrap a plan that emits multi-output tasks to a plan that emits single-output tasks."""

materializations: deque[MultiOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()
Expand All @@ -409,7 +422,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand All @@ -436,10 +449,10 @@ def split(
# Splitting evenly is fairly important if this operation is to be used for parallelism.
# (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.)
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -503,7 +516,7 @@ def coalesce(
merges_per_result = deque([stop - start for start, stop in zip(starts, stops)])

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# See if we can emit a coalesced partition.
num_partitions_to_merge = merges_per_result[0]
Expand Down Expand Up @@ -545,7 +558,7 @@ def coalesce(
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output()
child_step = child_step.finalize_partition_task_single_output(stage_id)
materializations.append(child_step)
yield child_step

Expand All @@ -570,11 +583,12 @@ def reduce(
"""

materializations = list()
stage_id = next(stage_id_counter)

# Dispatch all fanouts.
for step in fanout_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_multi_output()
step = step.finalize_partition_task_multi_output(stage_id=stage_id)
materializations.append(step)
yield step

Expand Down Expand Up @@ -611,14 +625,17 @@ def sort(

# First, materialize the child plan.
source_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_children = next(stage_id_counter)
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id_children)
source_materializations.append(step)
yield step

# Sample all partitions (to be used for calculating sort boundaries).
sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id_sampling = next(stage_id_counter)

for source in source_materializations:
while not source.done():
logger.debug("sort blocked on completion of source: %s", source)
Expand All @@ -632,7 +649,7 @@ def sort(
.add_instruction(
instruction=execution_step.Sample(sort_by=sort_by),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_sampling)
)

sample_materializations.append(sample)
Expand All @@ -643,6 +660,8 @@ def sort(
logger.debug("sort blocked on completion of all samples: %s", sample_materializations)
yield None

stage_id_reduce = next(stage_id_counter)

# Reduce the samples to get sort boundaries.
boundaries = (
PartitionTaskBuilder[PartitionT](
Expand All @@ -656,7 +675,7 @@ def sort(
descending=descending,
),
)
.finalize_partition_task_single_output()
.finalize_partition_task_single_output(stage_id=stage_id_reduce)
)
yield boundaries

Expand Down Expand Up @@ -714,7 +733,7 @@ def materialize(
"""

materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

stage_id = next(stage_id_counter)
while True:
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
Expand All @@ -725,7 +744,7 @@ def materialize(
try:
step = next(child_plan)
if isinstance(step, PartitionTaskBuilder):
step = step.finalize_partition_task_single_output()
step = step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(step)
assert isinstance(step, (PartitionTask, type(None)))

Expand Down
3 changes: 3 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@ def resolve_schema(self, schema: Schema) -> Schema:
fields = [e._to_field(schema) for e in self]
return Schema._from_field_name_and_types([(f.name, f.dtype) for f in fields])

def __repr__(self) -> str:
return f"{self._output_name_to_exprs.values()}"


class ExpressionImageNamespace(ExpressionNamespace):
"""Expression operations for image columns."""
Expand Down
27 changes: 21 additions & 6 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import multiprocessing
from concurrent import futures
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable, Iterator
from typing import Iterable, Iterator

import psutil
from tqdm.auto import tqdm

from daft.daft import (
FileFormatConfig,
Expand All @@ -32,10 +33,6 @@
from daft.runners.runner import Runner
from daft.table import Table

if TYPE_CHECKING:
pass


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -149,7 +146,6 @@ def run_iter(
}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False)

with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
partitions_gen = self._physical_plan_to_partitions(tasks)
yield from partitions_gen
Expand All @@ -162,6 +158,9 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
inflight_tasks_resources: dict[str, ResourceRequest] = dict()
future_to_task: dict[futures.Future, str] = dict()

pbars = dict()
# tqdm.set_lock(TRLock())
# initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)
with futures.ThreadPoolExecutor() as thread_pool:
try:
next_step = next(plan)
Expand Down Expand Up @@ -206,12 +205,24 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP

else:
# Submit the task for execution.

logger.debug("Submitting task for execution: %s", next_step)
stage_id = next_step.stage_id
if stage_id not in pbars:
name = "-".join(i.__class__.__name__ for i in next_step.instructions)
pbars[stage_id] = tqdm(total=float("inf"), desc=name)
pb = pbars[stage_id]
if pb.total is None:
pb.total = 1
else:
pb.total += 1
pb.refresh()
future = thread_pool.submit(
self.build_partitions, next_step.instructions, *next_step.inputs
)
# Register the inflight task and resources used.
future_to_task[future] = next_step.id()

inflight_tasks[next_step.id()] = next_step
inflight_tasks_resources[next_step.id()] = next_step.resource_request

Expand All @@ -226,15 +237,19 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP
done_id = future_to_task.pop(done_future)
del inflight_tasks_resources[done_id]
done_task = inflight_tasks.pop(done_id)
stage_id = done_task.stage_id
partitions = done_future.result()

pbars[stage_id].update(1)
logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions))
done_task.set_result([PyMaterializedResult(partition) for partition in partitions])

if next_step is None:
next_step = next(plan)

except StopIteration:
for p in pbars.values():
p.close()
return

def _check_resource_requests(self, resource_request: ResourceRequest) -> None:
Expand Down

0 comments on commit 43025a0

Please sign in to comment.