From 43025a02999aac686ade00380bd5817a3c04286b Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 14 Nov 2023 16:03:00 -0800 Subject: [PATCH 01/13] prototype of tqdm --- daft/execution/execution_step.py | 12 +++++- daft/execution/physical_plan.py | 65 +++++++++++++++++++++----------- daft/expressions/expressions.py | 3 ++ daft/runners/pyrunner.py | 27 ++++++++++--- 4 files changed, 76 insertions(+), 31 deletions(-) diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index 80a2fbbfcf..edfcd2b2ad 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -51,6 +51,7 @@ class PartitionTask(Generic[PartitionT]): instructions: list[Instruction] resource_request: ResourceRequest num_results: int + stage_id: int _id: int = field(default_factory=lambda: next(ID_GEN)) def id(self) -> str: @@ -110,7 +111,7 @@ def add_instruction( self.num_results = instruction.num_outputs() return self - def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[PartitionT]: + def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPartitionTask[PartitionT]: """Create a SingleOutputPartitionTask from this PartitionTaskBuilder. Returns a "frozen" version of this PartitionTask that cannot have instructions added. @@ -125,12 +126,13 @@ def finalize_partition_task_single_output(self) -> SingleOutputPartitionTask[Par return SingleOutputPartitionTask[PartitionT]( inputs=self.inputs, + stage_id=stage_id, instructions=self.instructions, num_results=1, resource_request=resource_request_final_cpu, ) - def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[PartitionT]: + def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]: """Create a MultiOutputPartitionTask from this PartitionTaskBuilder. Same as finalize_partition_task_single_output, except the output of this PartitionTask is a list of partitions. @@ -143,6 +145,7 @@ def finalize_partition_task_multi_output(self) -> MultiOutputPartitionTask[Parti ) return MultiOutputPartitionTask[PartitionT]( inputs=self.inputs, + stage_id=stage_id, instructions=self.instructions, num_results=self.num_results, resource_request=resource_request_final_cpu, @@ -566,6 +569,11 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) ] +@dataclass(frozen=True) +class GlobalLimit(LocalLimit): + pass + + @dataclass(frozen=True) class MapPartition(SingleOutputInstruction): map_op: MapPartitionOp diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index cd3fa8430c..79d7114a4c 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -53,6 +53,16 @@ MaterializedPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionT]] +def _stage_id_counter(): + counter = 0 + while True: + counter += 1 + yield counter + + +stage_id_counter = _stage_id_counter() + + def partition_read( partitions: Iterator[PartitionT], metadatas: Iterator[PartialPartitionMetadata] | None = None ) -> InProgressPhysicalPlan[PartitionT]: @@ -81,6 +91,7 @@ def file_read( Yield a plan to read those filenames. """ materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id = next(stage_id_counter) output_partition_index = 0 while True: @@ -119,7 +130,7 @@ def file_read( try: child_step = next(child_plan) if isinstance(child_step, PartitionTaskBuilder): - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(child_step) yield child_step @@ -185,7 +196,7 @@ def join( # As the materializations complete, emit new steps to join each left and right partition. left_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque() right_requests: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) yield_left = True while True: @@ -237,7 +248,7 @@ def join( try: step = next(next_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) next_requests.append(step) yield step @@ -302,7 +313,7 @@ def global_limit( remaining_partitions = num_partitions materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) # To dynamically schedule the global limit, we need to apply an appropriate limit to each child partition. # We don't know their exact sizes since they are pending execution, so we will have to iteratively execute them, # count their rows, and then apply and update the remaining limit. @@ -317,16 +328,17 @@ def global_limit( # Apply and deduct the rolling global limit. while len(materializations) > 0 and materializations[0].done(): done_task = materializations.popleft() - - limit = remaining_rows and min(remaining_rows, done_task.partition_metadata().num_rows) + done_task_metadata = done_task.partition_metadata() + limit = remaining_rows and min(remaining_rows, done_task_metadata.num_rows) global_limit_step = PartitionTaskBuilder[PartitionT]( inputs=[done_task.partition()], - partial_metadatas=[done_task.partition_metadata()], - resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes), + partial_metadatas=[done_task_metadata], + resource_request=ResourceRequest(memory_bytes=done_task_metadata.size_bytes), ).add_instruction( - instruction=execution_step.LocalLimit(limit), + instruction=execution_step.GlobalLimit(limit), ) + yield global_limit_step remaining_partitions -= 1 remaining_rows -= limit @@ -346,7 +358,7 @@ def global_limit( partial_metadatas=[done_task.partition_metadata()], resource_request=ResourceRequest(memory_bytes=done_task.partition_metadata().size_bytes), ).add_instruction( - instruction=execution_step.LocalLimit(0), + instruction=execution_step.GlobalLimit(0), ) for _ in range(remaining_partitions) ) @@ -376,10 +388,11 @@ def global_limit( if len(materializations) == 0 and remaining_rows > 0 and partial_meta.num_rows is not None: limit = min(remaining_rows, partial_meta.num_rows) child_step = child_step.add_instruction(instruction=execution_step.LocalLimit(limit)) + remaining_partitions -= 1 remaining_rows -= limit else: - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(child_step) yield child_step @@ -395,7 +408,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh """Wrap a plan that emits multi-output tasks to a plan that emits single-output tasks.""" materializations: deque[MultiOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: while len(materializations) > 0 and materializations[0].done(): done_task = materializations.popleft() @@ -409,7 +422,7 @@ def flatten_plan(child_plan: InProgressPhysicalPlan[PartitionT]) -> InProgressPh try: step = next(child_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_multi_output() + step = step.finalize_partition_task_multi_output(stage_id=stage_id) materializations.append(step) yield step @@ -436,10 +449,10 @@ def split( # Splitting evenly is fairly important if this operation is to be used for parallelism. # (optimization TODO: don't materialize if num_rows is already available in physical plan metadata.) materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) for step in child_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(step) yield step @@ -503,7 +516,7 @@ def coalesce( merges_per_result = deque([stop - start for start, stop in zip(starts, stops)]) materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: # See if we can emit a coalesced partition. num_partitions_to_merge = merges_per_result[0] @@ -545,7 +558,7 @@ def coalesce( try: child_step = next(child_plan) if isinstance(child_step, PartitionTaskBuilder): - child_step = child_step.finalize_partition_task_single_output() + child_step = child_step.finalize_partition_task_single_output(stage_id) materializations.append(child_step) yield child_step @@ -570,11 +583,12 @@ def reduce( """ materializations = list() + stage_id = next(stage_id_counter) # Dispatch all fanouts. for step in fanout_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_multi_output() + step = step.finalize_partition_task_multi_output(stage_id=stage_id) materializations.append(step) yield step @@ -611,14 +625,17 @@ def sort( # First, materialize the child plan. source_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id_children = next(stage_id_counter) for step in child_plan: if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id_children) source_materializations.append(step) yield step # Sample all partitions (to be used for calculating sort boundaries). sample_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() + stage_id_sampling = next(stage_id_counter) + for source in source_materializations: while not source.done(): logger.debug("sort blocked on completion of source: %s", source) @@ -632,7 +649,7 @@ def sort( .add_instruction( instruction=execution_step.Sample(sort_by=sort_by), ) - .finalize_partition_task_single_output() + .finalize_partition_task_single_output(stage_id=stage_id_sampling) ) sample_materializations.append(sample) @@ -643,6 +660,8 @@ def sort( logger.debug("sort blocked on completion of all samples: %s", sample_materializations) yield None + stage_id_reduce = next(stage_id_counter) + # Reduce the samples to get sort boundaries. boundaries = ( PartitionTaskBuilder[PartitionT]( @@ -656,7 +675,7 @@ def sort( descending=descending, ), ) - .finalize_partition_task_single_output() + .finalize_partition_task_single_output(stage_id=stage_id_reduce) ) yield boundaries @@ -714,7 +733,7 @@ def materialize( """ materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() - + stage_id = next(stage_id_counter) while True: # Check if any inputs finished executing. while len(materializations) > 0 and materializations[0].done(): @@ -725,7 +744,7 @@ def materialize( try: step = next(child_plan) if isinstance(step, PartitionTaskBuilder): - step = step.finalize_partition_task_single_output() + step = step.finalize_partition_task_single_output(stage_id=stage_id) materializations.append(step) assert isinstance(step, (PartitionTask, type(None))) diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 3769de2894..e5af9fa1f7 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -754,6 +754,9 @@ def resolve_schema(self, schema: Schema) -> Schema: fields = [e._to_field(schema) for e in self] return Schema._from_field_name_and_types([(f.name, f.dtype) for f in fields]) + def __repr__(self) -> str: + return f"{self._output_name_to_exprs.values()}" + class ExpressionImageNamespace(ExpressionNamespace): """Expression operations for image columns.""" diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index ea818b6755..4a56f52451 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -4,9 +4,10 @@ import multiprocessing from concurrent import futures from dataclasses import dataclass -from typing import TYPE_CHECKING, Iterable, Iterator +from typing import Iterable, Iterator import psutil +from tqdm.auto import tqdm from daft.daft import ( FileFormatConfig, @@ -32,10 +33,6 @@ from daft.runners.runner import Runner from daft.table import Table -if TYPE_CHECKING: - pass - - logger = logging.getLogger(__name__) @@ -149,7 +146,6 @@ def run_iter( } # Get executable tasks from planner. tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False) - with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"): partitions_gen = self._physical_plan_to_partitions(tasks) yield from partitions_gen @@ -162,6 +158,9 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP inflight_tasks_resources: dict[str, ResourceRequest] = dict() future_to_task: dict[futures.Future, str] = dict() + pbars = dict() + # tqdm.set_lock(TRLock()) + # initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),) with futures.ThreadPoolExecutor() as thread_pool: try: next_step = next(plan) @@ -206,12 +205,24 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP else: # Submit the task for execution. + logger.debug("Submitting task for execution: %s", next_step) + stage_id = next_step.stage_id + if stage_id not in pbars: + name = "-".join(i.__class__.__name__ for i in next_step.instructions) + pbars[stage_id] = tqdm(total=float("inf"), desc=name) + pb = pbars[stage_id] + if pb.total is None: + pb.total = 1 + else: + pb.total += 1 + pb.refresh() future = thread_pool.submit( self.build_partitions, next_step.instructions, *next_step.inputs ) # Register the inflight task and resources used. future_to_task[future] = next_step.id() + inflight_tasks[next_step.id()] = next_step inflight_tasks_resources[next_step.id()] = next_step.resource_request @@ -226,8 +237,10 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP done_id = future_to_task.pop(done_future) del inflight_tasks_resources[done_id] done_task = inflight_tasks.pop(done_id) + stage_id = done_task.stage_id partitions = done_future.result() + pbars[stage_id].update(1) logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions)) done_task.set_result([PyMaterializedResult(partition) for partition in partitions]) @@ -235,6 +248,8 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP next_step = next(plan) except StopIteration: + for p in pbars.values(): + p.close() return def _check_resource_requests(self, resource_request: ResourceRequest) -> None: From e689316095a7d3b121986c755563a7cc29ed6e72 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 14 Nov 2023 16:03:25 -0800 Subject: [PATCH 02/13] add tqdm to deps --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index fb4bcafb37..0a86af0852 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "pyarrow >= 6.0.1", "fsspec[http]", "psutil", + "tqdm", "typing-extensions >= 4.0.0; python_version < '3.10'", "pickle5 >= 0.0.12; python_version < '3.8'" ] From 15fdb670b8d1270d6c99437692d88dce4c2bc293 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 14 Nov 2023 16:07:50 -0800 Subject: [PATCH 03/13] remove dead code --- daft/runners/pyrunner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 4a56f52451..aab47bb7d9 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -159,8 +159,6 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP future_to_task: dict[futures.Future, str] = dict() pbars = dict() - # tqdm.set_lock(TRLock()) - # initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),) with futures.ThreadPoolExecutor() as thread_pool: try: next_step = next(plan) From b52f6ecf7617070e31694bb018c0c1c90367afe2 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 14 Nov 2023 16:35:13 -0800 Subject: [PATCH 04/13] empty From b9f2b1be83e86fcae84155027582d2644a5f5dc7 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 15 Nov 2023 10:14:37 -0800 Subject: [PATCH 05/13] add ray runner --- daft/runners/ray_runner.py | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index e14c76294b..b7bf3ba40a 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -381,7 +381,7 @@ def _ray_num_cpus_provider(ttl_seconds: int = 1) -> Generator[int, None, None]: class Scheduler: - def __init__(self, max_task_backlog: int | None) -> None: + def __init__(self, max_task_backlog: int | None, tqdm_builder) -> None: """ max_task_backlog: Max number of inflight tasks waiting for cores. """ @@ -401,6 +401,7 @@ def __init__(self, max_task_backlog: int | None) -> None: self.threads_by_df: dict[str, threading.Thread] = dict() self.results_by_df: dict[str, Queue] = {} self.active_by_df: dict[str, bool] = dict() + self.tqdm_builder = tqdm_builder def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: # Case: thread is terminated and no longer exists. @@ -464,7 +465,7 @@ def _run_plan( inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict() inflight_ref_to_task: dict[ray.ObjectRef, str] = dict() - + pbars = dict() num_cpus_provider = _ray_num_cpus_provider() start = datetime.now() @@ -523,6 +524,17 @@ def _run_plan( for result in results: inflight_ref_to_task[result] = task.id() + for task in tasks_to_dispatch: + stage_id = task.stage_id + if stage_id not in pbars: + name = "-".join(i.__class__.__name__ for i in task.instructions) + position = len(pbars) + pbars[stage_id] = self.tqdm_builder(total=1, desc=name, position=position) + else: + pb = pbars[stage_id] + pb.total += 1 + pb.refresh() + if dispatches_allowed == 0 or next_step is None: break @@ -555,6 +567,9 @@ def _run_plan( completed_task_ids.append(task_id) # Mark the entire task associated with the result as done. task = inflight_tasks[task_id] + stage_id = task.stage_id + pb = pbars[stage_id] + pb.update(1) if isinstance(task, SingleOutputPartitionTask): del inflight_ref_to_task[ready] elif isinstance(task, MultiOutputPartitionTask): @@ -576,8 +591,15 @@ def _run_plan( # Ensure that all Exceptions are correctly propagated to the consumer before reraising to kill thread except Exception as e: self.results_by_df[result_uuid].put(e) + for p in pbars.values(): + p.close() + del pbars raise + for p in pbars.values(): + p.close() + del pbars + @ray.remote(num_cpus=1) class SchedulerActor(Scheduler): @@ -625,14 +647,16 @@ def __init__( self.ray_context = ray.init(address=address, ignore_reinit_error=True) if isinstance(self.ray_context, ray.client_builder.ClientContext): + from ray.experimental import tqdm_ray + # Run scheduler remotely if the cluster is connected remotely. self.scheduler_actor = SchedulerActor.remote( # type: ignore - max_task_backlog=max_task_backlog, + max_task_backlog=max_task_backlog, tqdm_builder=tqdm_ray ) else: - self.scheduler = Scheduler( - max_task_backlog=max_task_backlog, - ) + from tqdm.auto import tqdm + + self.scheduler = Scheduler(max_task_backlog=max_task_backlog, tqdm_builder=tqdm) def active_plans(self) -> list[str]: if isinstance(self.ray_context, ray.client_builder.ClientContext): From cb18780934f53501f66abc17e3630bed8f62b8c6 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 15 Nov 2023 11:26:12 -0800 Subject: [PATCH 06/13] only enable when in interactive mode --- daft/execution/physical_plan.py | 6 ++-- daft/runners/pyrunner.py | 31 ++++++++++------- daft/runners/ray_runner.py | 61 ++++++++++++++++++++------------- daft/runners/tqdm.py | 27 +++++++++++++++ 4 files changed, 85 insertions(+), 40 deletions(-) create mode 100644 daft/runners/tqdm.py diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 79d7114a4c..8d527ac6ee 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -257,9 +257,9 @@ def join( # Are we still waiting for materializations to complete? (We will emit more joins from them). if len(left_requests) + len(right_requests) > 0: logger.debug( - "join blocked on completion of sources.\n" - f"Left sources: {left_requests}\n" - f"Right sources: {right_requests}", + "join blocked on completion of sources.\n Left sources: %s\nRight sources: %s", + left_requests, + right_requests, ) yield None diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index aab47bb7d9..e6e5b4f65e 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -114,6 +114,9 @@ def __init__(self, use_thread_pool: bool | None) -> None: self.num_cpus = multiprocessing.cpu_count() self.num_gpus = cuda_device_count() self.bytes_memory = psutil.virtual_memory().total + from .tqdm import IS_INTERACTIVE + + self.show_progress = IS_INTERACTIVE def runner_io(self) -> PyRunnerIO: return PyRunnerIO() @@ -158,7 +161,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP inflight_tasks_resources: dict[str, ResourceRequest] = dict() future_to_task: dict[futures.Future, str] = dict() - pbars = dict() + pbars: dict[int, tqdm] = dict() with futures.ThreadPoolExecutor() as thread_pool: try: next_step = next(plan) @@ -206,15 +209,17 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP logger.debug("Submitting task for execution: %s", next_step) stage_id = next_step.stage_id - if stage_id not in pbars: - name = "-".join(i.__class__.__name__ for i in next_step.instructions) - pbars[stage_id] = tqdm(total=float("inf"), desc=name) - pb = pbars[stage_id] - if pb.total is None: - pb.total = 1 - else: - pb.total += 1 - pb.refresh() + + if self.show_progress: + if stage_id not in pbars: + name = "-".join(i.__class__.__name__ for i in next_step.instructions) + position = len(pbars) + pbars[stage_id] = tqdm(total=1, desc=name, position=position) + else: + pb = pbars[stage_id] + pb.total += 1 + pb.refresh() + future = thread_pool.submit( self.build_partitions, next_step.instructions, *next_step.inputs ) @@ -235,10 +240,10 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP done_id = future_to_task.pop(done_future) del inflight_tasks_resources[done_id] done_task = inflight_tasks.pop(done_id) - stage_id = done_task.stage_id partitions = done_future.result() - - pbars[stage_id].update(1) + if self.show_progress: + stage_id = done_task.stage_id + pbars[stage_id].update(1) logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions)) done_task.set_result([PyMaterializedResult(partition) for partition in partitions]) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index b7bf3ba40a..4f7e80fd6b 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -59,6 +59,7 @@ import pandas as pd from ray.data.block import Block as RayDatasetBlock from ray.data.dataset import Dataset as RayDataset + from tqdm import tqdm _RAY_FROM_ARROW_REFS_AVAILABLE = True try: @@ -381,7 +382,7 @@ def _ray_num_cpus_provider(ttl_seconds: int = 1) -> Generator[int, None, None]: class Scheduler: - def __init__(self, max_task_backlog: int | None, tqdm_builder) -> None: + def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None: """ max_task_backlog: Max number of inflight tasks waiting for cores. """ @@ -401,7 +402,19 @@ def __init__(self, max_task_backlog: int | None, tqdm_builder) -> None: self.threads_by_df: dict[str, threading.Thread] = dict() self.results_by_df: dict[str, Queue] = {} self.active_by_df: dict[str, bool] = dict() - self.tqdm_builder = tqdm_builder + + from .tqdm import IS_INTERACTIVE + + if use_ray_tqdm: + from ray.experimental import tqdm_ray + + self.tqdm_builder = tqdm_ray + else: + from .tqdm import tqdm + + self.tqdm_builder = tqdm + + self.show_progress = IS_INTERACTIVE def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: # Case: thread is terminated and no longer exists. @@ -465,7 +478,7 @@ def _run_plan( inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict() inflight_ref_to_task: dict[ray.ObjectRef, str] = dict() - pbars = dict() + pbars: dict[int, tqdm] = dict() num_cpus_provider = _ray_num_cpus_provider() start = datetime.now() @@ -515,7 +528,9 @@ def _run_plan( # Dispatch the batch of tasks. logger.debug( - f"{(datetime.now() - start).total_seconds()}s: RayRunner dispatching {len(tasks_to_dispatch)} tasks:" + "%ss: RayRunner dispatching %s tasks", + (datetime.now() - start).total_seconds(), + len(tasks_to_dispatch), ) for task in tasks_to_dispatch: results = _build_partitions(task) @@ -524,16 +539,17 @@ def _run_plan( for result in results: inflight_ref_to_task[result] = task.id() - for task in tasks_to_dispatch: - stage_id = task.stage_id - if stage_id not in pbars: - name = "-".join(i.__class__.__name__ for i in task.instructions) - position = len(pbars) - pbars[stage_id] = self.tqdm_builder(total=1, desc=name, position=position) - else: - pb = pbars[stage_id] - pb.total += 1 - pb.refresh() + if self.show_progress: + for task in tasks_to_dispatch: + stage_id = task.stage_id + if stage_id not in pbars: + name = "-".join(i.__class__.__name__ for i in task.instructions) + position = len(pbars) + pbars[stage_id] = self.tqdm_builder(total=1, desc=name, position=position) + else: + pb = pbars[stage_id] + pb.total += 1 + pb.refresh() if dispatches_allowed == 0 or next_step is None: break @@ -567,19 +583,20 @@ def _run_plan( completed_task_ids.append(task_id) # Mark the entire task associated with the result as done. task = inflight_tasks[task_id] - stage_id = task.stage_id - pb = pbars[stage_id] - pb.update(1) if isinstance(task, SingleOutputPartitionTask): del inflight_ref_to_task[ready] elif isinstance(task, MultiOutputPartitionTask): for partition in task.partitions(): del inflight_ref_to_task[partition] + if self.show_progress: + stage_id = task.stage_id + pb = pbars[stage_id] + pb.update(1) del inflight_tasks[task_id] logger.debug( - f"+{(datetime.now() - dispatch).total_seconds()}s to await results from {completed_task_ids}" + "%ss to await results from %s", (datetime.now() - dispatch).total_seconds(), completed_task_ids ) if next_step is None: @@ -647,16 +664,12 @@ def __init__( self.ray_context = ray.init(address=address, ignore_reinit_error=True) if isinstance(self.ray_context, ray.client_builder.ClientContext): - from ray.experimental import tqdm_ray - # Run scheduler remotely if the cluster is connected remotely. self.scheduler_actor = SchedulerActor.remote( # type: ignore - max_task_backlog=max_task_backlog, tqdm_builder=tqdm_ray + max_task_backlog=max_task_backlog, use_ray_tqdm=True ) else: - from tqdm.auto import tqdm - - self.scheduler = Scheduler(max_task_backlog=max_task_backlog, tqdm_builder=tqdm) + self.scheduler = Scheduler(max_task_backlog=max_task_backlog, use_ray_tqdm=False) def active_plans(self) -> list[str]: if isinstance(self.ray_context, ray.client_builder.ClientContext): diff --git a/daft/runners/tqdm.py b/daft/runners/tqdm.py new file mode 100644 index 0000000000..fd13c9c338 --- /dev/null +++ b/daft/runners/tqdm.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import sys +from warnings import warn + +import tqdm + +IS_INTERACTIVE = False +try: + get_ipython = sys.modules["IPython"].get_ipython + if "IPKernelApp" not in get_ipython().config: # pragma: no cover + raise ImportError("console") + from tqdm.notebook import WARN_NOIPYW, IProgress + + if IProgress is None: + from tqdm.std import TqdmWarning + + warn(WARN_NOIPYW, TqdmWarning, stacklevel=2) + raise ImportError("ipywidgets") +except Exception: + from tqdm.std import tqdm, trange +else: # pragma: no cover + + from tqdm.notebook import tqdm, trange + + IS_INTERACTIVE = True +__all__ = ["tqdm", "trange"] From f261cd57b21004ac23931a34ba7a393e758f2740 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 15 Nov 2023 12:20:44 -0800 Subject: [PATCH 07/13] only run in ipython --- daft/runners/pyrunner.py | 4 ++-- daft/runners/ray_runner.py | 6 +++--- daft/runners/tqdm.py | 29 ++++++----------------------- 3 files changed, 11 insertions(+), 28 deletions(-) diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index e6e5b4f65e..002f8a8204 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -114,9 +114,9 @@ def __init__(self, use_thread_pool: bool | None) -> None: self.num_cpus = multiprocessing.cpu_count() self.num_gpus = cuda_device_count() self.bytes_memory = psutil.virtual_memory().total - from .tqdm import IS_INTERACTIVE + from .tqdm import is_running_from_ipython - self.show_progress = IS_INTERACTIVE + self.show_progress = is_running_from_ipython() def runner_io(self) -> PyRunnerIO: return PyRunnerIO() diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 4f7e80fd6b..67e4feae5c 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -403,18 +403,18 @@ def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None: self.results_by_df: dict[str, Queue] = {} self.active_by_df: dict[str, bool] = dict() - from .tqdm import IS_INTERACTIVE + from .tqdm import is_running_from_ipython if use_ray_tqdm: from ray.experimental import tqdm_ray self.tqdm_builder = tqdm_ray else: - from .tqdm import tqdm + from tqdm.auto import tqdm self.tqdm_builder = tqdm - self.show_progress = IS_INTERACTIVE + self.show_progress = is_running_from_ipython() def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: # Case: thread is terminated and no longer exists. diff --git a/daft/runners/tqdm.py b/daft/runners/tqdm.py index fd13c9c338..11232afcb1 100644 --- a/daft/runners/tqdm.py +++ b/daft/runners/tqdm.py @@ -1,27 +1,10 @@ from __future__ import annotations -import sys -from warnings import warn -import tqdm +def is_running_from_ipython(): + try: + from IPython import get_ipython -IS_INTERACTIVE = False -try: - get_ipython = sys.modules["IPython"].get_ipython - if "IPKernelApp" not in get_ipython().config: # pragma: no cover - raise ImportError("console") - from tqdm.notebook import WARN_NOIPYW, IProgress - - if IProgress is None: - from tqdm.std import TqdmWarning - - warn(WARN_NOIPYW, TqdmWarning, stacklevel=2) - raise ImportError("ipywidgets") -except Exception: - from tqdm.std import tqdm, trange -else: # pragma: no cover - - from tqdm.notebook import tqdm, trange - - IS_INTERACTIVE = True -__all__ = ["tqdm", "trange"] + return get_ipython() is not None + except Exception: + return False From 187dabf9dbd2a90aa5b7d0999186700a6cf0e495 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 15 Nov 2023 13:28:38 -0800 Subject: [PATCH 08/13] add tasks to top level --- daft/runners/pyrunner.py | 13 +++++++++++-- daft/runners/ray_runner.py | 9 +++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 002f8a8204..9fae67b9fb 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -211,14 +211,21 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP stage_id = next_step.stage_id if self.show_progress: + if len(pbars) == 0: + pbars[-1] = tqdm(total=1, desc="Tasks", position=0) + else: + task_pbar = pbars[-1] + task_pbar.total += 1 + # task_pbar.refresh() + if stage_id not in pbars: name = "-".join(i.__class__.__name__ for i in next_step.instructions) position = len(pbars) - pbars[stage_id] = tqdm(total=1, desc=name, position=position) + pbars[stage_id] = tqdm(total=1, desc=name, position=position, leave=False) else: pb = pbars[stage_id] pb.total += 1 - pb.refresh() + # pb.refresh() future = thread_pool.submit( self.build_partitions, next_step.instructions, *next_step.inputs @@ -244,6 +251,8 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP if self.show_progress: stage_id = done_task.stage_id pbars[stage_id].update(1) + pbars[-1].update(1) + logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions)) done_task.set_result([PyMaterializedResult(partition) for partition in partitions]) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 67e4feae5c..6047881e2a 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -541,6 +541,13 @@ def _run_plan( if self.show_progress: for task in tasks_to_dispatch: + if len(pbars) == 0: + pbars[-1] = self.tqdm_builder(total=1, desc="Tasks", position=0) + else: + task_pbar = pbars[-1] + task_pbar.total += 1 + task_pbar.refresh() + stage_id = task.stage_id if stage_id not in pbars: name = "-".join(i.__class__.__name__ for i in task.instructions) @@ -592,6 +599,8 @@ def _run_plan( stage_id = task.stage_id pb = pbars[stage_id] pb.update(1) + pb = pbars[-1] + pb.update(1) del inflight_tasks[task_id] From 7ae353844c5de326536ef4f652f9676813b6c5dc Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 16 Nov 2023 16:18:18 -0800 Subject: [PATCH 09/13] Update daft/runners/tqdm.py Co-authored-by: Jay Chia <17691182+jaychia@users.noreply.github.com> --- daft/runners/tqdm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daft/runners/tqdm.py b/daft/runners/tqdm.py index 11232afcb1..af764e9540 100644 --- a/daft/runners/tqdm.py +++ b/daft/runners/tqdm.py @@ -6,5 +6,5 @@ def is_running_from_ipython(): from IPython import get_ipython return get_ipython() is not None - except Exception: + except ImportError: return False From 8fcf00ffd300b67fc65a89c2b33b0004f2bf0e02 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 16 Nov 2023 17:23:20 -0800 Subject: [PATCH 10/13] refactor to pbar --- daft/runners/progress_bar.py | 63 ++++++++++++++++++++++++++++++++++++ daft/runners/pyrunner.py | 38 +++++----------------- daft/runners/ray_runner.py | 51 ++++------------------------- daft/runners/tqdm.py | 10 ------ 4 files changed, 78 insertions(+), 84 deletions(-) create mode 100644 daft/runners/progress_bar.py delete mode 100644 daft/runners/tqdm.py diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py new file mode 100644 index 0000000000..9673b57b93 --- /dev/null +++ b/daft/runners/progress_bar.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import os +from typing import Any + +from tqdm.auto import tqdm + +from daft.execution.execution_step import PartitionTask + + +class ProgressBar: + def __init__(self, use_ray_tqdm: bool, disable: bool = False) -> None: + print("use ray tqdm", use_ray_tqdm) + self.use_ray_tqdm = use_ray_tqdm + self.tqdm_mod = tqdm + self.pbars: dict[int, tqdm] = dict() + self.disable = ( + disable + or not bool(int(os.environ.get("RAY_TQDM", "1"))) + or not bool(int(os.environ.get("DAFT_PROGRESS_BAR", "1"))) + ) + + def _make_new_bar(self, stage_id: int, name: str): + if self.use_ray_tqdm: + self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars)) + else: + self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars), leave=False) + + def mark_task_start(self, step: PartitionTask[Any]) -> None: + if self.disable: + return + if len(self.pbars) == 0: + self._make_new_bar(-1, "Tasks") + else: + task_pbar = self.pbars[-1] + task_pbar.total += 1 + if self.use_ray_tqdm: + task_pbar.refresh() + + stage_id = step.stage_id + + if stage_id not in self.pbars: + name = "-".join(i.__class__.__name__ for i in step.instructions) + self._make_new_bar(stage_id, name) + + else: + pb = self.pbars[stage_id] + pb.total += 1 + if self.use_ray_tqdm: + pb.refresh() + + def mark_task_done(self, step: PartitionTask[Any]) -> None: + if self.disable: + return + + stage_id = step.stage_id + self.pbars[stage_id].update(1) + self.pbars[-1].update(1) + + def close(self) -> None: + for p in self.pbars.values(): + p.close() + del p diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 9fae67b9fb..1e950ea767 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -7,7 +7,6 @@ from typing import Iterable, Iterator import psutil -from tqdm.auto import tqdm from daft.daft import ( FileFormatConfig, @@ -30,6 +29,7 @@ PartitionSet, ) from daft.runners.profiler import profiler +from daft.runners.progress_bar import ProgressBar from daft.runners.runner import Runner from daft.table import Table @@ -114,9 +114,6 @@ def __init__(self, use_thread_pool: bool | None) -> None: self.num_cpus = multiprocessing.cpu_count() self.num_gpus = cuda_device_count() self.bytes_memory = psutil.virtual_memory().total - from .tqdm import is_running_from_ipython - - self.show_progress = is_running_from_ipython() def runner_io(self) -> PyRunnerIO: return PyRunnerIO() @@ -161,7 +158,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP inflight_tasks_resources: dict[str, ResourceRequest] = dict() future_to_task: dict[futures.Future, str] = dict() - pbars: dict[int, tqdm] = dict() + pbar = ProgressBar(use_ray_tqdm=False) with futures.ThreadPoolExecutor() as thread_pool: try: next_step = next(plan) @@ -206,26 +203,10 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP else: # Submit the task for execution. - logger.debug("Submitting task for execution: %s", next_step) - stage_id = next_step.stage_id - - if self.show_progress: - if len(pbars) == 0: - pbars[-1] = tqdm(total=1, desc="Tasks", position=0) - else: - task_pbar = pbars[-1] - task_pbar.total += 1 - # task_pbar.refresh() - - if stage_id not in pbars: - name = "-".join(i.__class__.__name__ for i in next_step.instructions) - position = len(pbars) - pbars[stage_id] = tqdm(total=1, desc=name, position=position, leave=False) - else: - pb = pbars[stage_id] - pb.total += 1 - # pb.refresh() + + # update progress bar + pbar.mark_task_start(next_step) future = thread_pool.submit( self.build_partitions, next_step.instructions, *next_step.inputs @@ -248,10 +229,8 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP del inflight_tasks_resources[done_id] done_task = inflight_tasks.pop(done_id) partitions = done_future.result() - if self.show_progress: - stage_id = done_task.stage_id - pbars[stage_id].update(1) - pbars[-1].update(1) + + pbar.mark_task_done(done_task) logger.debug("Task completed: %s -> <%s partitions>", done_id, len(partitions)) done_task.set_result([PyMaterializedResult(partition) for partition in partitions]) @@ -260,8 +239,7 @@ def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalP next_step = next(plan) except StopIteration: - for p in pbars.values(): - p.close() + pbar.close() return def _check_resource_requests(self, resource_request: ResourceRequest) -> None: diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 6047881e2a..7bb56bd1d5 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -13,6 +13,7 @@ from daft.logical.builder import LogicalPlanBuilder from daft.plan_scheduler import PhysicalPlanScheduler +from daft.runners.progress_bar import ProgressBar logger = logging.getLogger(__name__) @@ -59,7 +60,6 @@ import pandas as pd from ray.data.block import Block as RayDatasetBlock from ray.data.dataset import Dataset as RayDataset - from tqdm import tqdm _RAY_FROM_ARROW_REFS_AVAILABLE = True try: @@ -403,18 +403,7 @@ def __init__(self, max_task_backlog: int | None, use_ray_tqdm: bool) -> None: self.results_by_df: dict[str, Queue] = {} self.active_by_df: dict[str, bool] = dict() - from .tqdm import is_running_from_ipython - - if use_ray_tqdm: - from ray.experimental import tqdm_ray - - self.tqdm_builder = tqdm_ray - else: - from tqdm.auto import tqdm - - self.tqdm_builder = tqdm - - self.show_progress = is_running_from_ipython() + self.use_ray_tqdm = use_ray_tqdm def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: # Case: thread is terminated and no longer exists. @@ -478,7 +467,7 @@ def _run_plan( inflight_tasks: dict[str, PartitionTask[ray.ObjectRef]] = dict() inflight_ref_to_task: dict[ray.ObjectRef, str] = dict() - pbars: dict[int, tqdm] = dict() + pbar = ProgressBar(use_ray_tqdm=self.use_ray_tqdm) num_cpus_provider = _ray_num_cpus_provider() start = datetime.now() @@ -539,24 +528,7 @@ def _run_plan( for result in results: inflight_ref_to_task[result] = task.id() - if self.show_progress: - for task in tasks_to_dispatch: - if len(pbars) == 0: - pbars[-1] = self.tqdm_builder(total=1, desc="Tasks", position=0) - else: - task_pbar = pbars[-1] - task_pbar.total += 1 - task_pbar.refresh() - - stage_id = task.stage_id - if stage_id not in pbars: - name = "-".join(i.__class__.__name__ for i in task.instructions) - position = len(pbars) - pbars[stage_id] = self.tqdm_builder(total=1, desc=name, position=position) - else: - pb = pbars[stage_id] - pb.total += 1 - pb.refresh() + pbar.mark_task_start(task) if dispatches_allowed == 0 or next_step is None: break @@ -595,13 +567,8 @@ def _run_plan( elif isinstance(task, MultiOutputPartitionTask): for partition in task.partitions(): del inflight_ref_to_task[partition] - if self.show_progress: - stage_id = task.stage_id - pb = pbars[stage_id] - pb.update(1) - pb = pbars[-1] - pb.update(1) + pbar.mark_task_done(task) del inflight_tasks[task_id] logger.debug( @@ -617,14 +584,10 @@ def _run_plan( # Ensure that all Exceptions are correctly propagated to the consumer before reraising to kill thread except Exception as e: self.results_by_df[result_uuid].put(e) - for p in pbars.values(): - p.close() - del pbars + pbar.close() raise - for p in pbars.values(): - p.close() - del pbars + pbar.close() @ray.remote(num_cpus=1) diff --git a/daft/runners/tqdm.py b/daft/runners/tqdm.py deleted file mode 100644 index af764e9540..0000000000 --- a/daft/runners/tqdm.py +++ /dev/null @@ -1,10 +0,0 @@ -from __future__ import annotations - - -def is_running_from_ipython(): - try: - from IPython import get_ipython - - return get_ipython() is not None - except ImportError: - return False From c3c3493defc8afc105eec09722fb5f9b2899b4d6 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 16 Nov 2023 17:25:55 -0800 Subject: [PATCH 11/13] remove print statement --- daft/runners/progress_bar.py | 1 - 1 file changed, 1 deletion(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index 9673b57b93..5608dded12 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -10,7 +10,6 @@ class ProgressBar: def __init__(self, use_ray_tqdm: bool, disable: bool = False) -> None: - print("use ray tqdm", use_ray_tqdm) self.use_ray_tqdm = use_ray_tqdm self.tqdm_mod = tqdm self.pbars: dict[int, tqdm] = dict() From 44770e38afe02448e4621b1c1d1cb7ea596c8937 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 16 Nov 2023 17:33:41 -0800 Subject: [PATCH 12/13] disable global task bar by default --- daft/runners/progress_bar.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index 5608dded12..f748987fc5 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -9,8 +9,9 @@ class ProgressBar: - def __init__(self, use_ray_tqdm: bool, disable: bool = False) -> None: + def __init__(self, use_ray_tqdm: bool, show_tasks_bar: bool = False, disable: bool = False) -> None: self.use_ray_tqdm = use_ray_tqdm + self.show_tasks_bar = show_tasks_bar self.tqdm_mod = tqdm self.pbars: dict[int, tqdm] = dict() self.disable = ( @@ -28,20 +29,20 @@ def _make_new_bar(self, stage_id: int, name: str): def mark_task_start(self, step: PartitionTask[Any]) -> None: if self.disable: return - if len(self.pbars) == 0: - self._make_new_bar(-1, "Tasks") - else: - task_pbar = self.pbars[-1] - task_pbar.total += 1 - if self.use_ray_tqdm: - task_pbar.refresh() + if self.show_tasks_bar: + if len(self.pbars) == 0: + self._make_new_bar(-1, "Tasks") + else: + task_pbar = self.pbars[-1] + task_pbar.total += 1 + if self.use_ray_tqdm: + task_pbar.refresh() stage_id = step.stage_id if stage_id not in self.pbars: name = "-".join(i.__class__.__name__ for i in step.instructions) self._make_new_bar(stage_id, name) - else: pb = self.pbars[stage_id] pb.total += 1 @@ -54,7 +55,8 @@ def mark_task_done(self, step: PartitionTask[Any]) -> None: stage_id = step.stage_id self.pbars[stage_id].update(1) - self.pbars[-1].update(1) + if self.show_tasks_bar: + self.pbars[-1].update(1) def close(self) -> None: for p in self.pbars.values(): From 9ce28b96108d7e0a3f847169e2f6755aff8d740f Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 16 Nov 2023 19:25:11 -0800 Subject: [PATCH 13/13] dont refresh ray progress bar and set update freq to 1 second --- daft/runners/progress_bar.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index f748987fc5..3a6ff6210d 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -24,7 +24,9 @@ def _make_new_bar(self, stage_id: int, name: str): if self.use_ray_tqdm: self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars)) else: - self.pbars[stage_id] = self.tqdm_mod(total=1, desc=name, position=len(self.pbars), leave=False) + self.pbars[stage_id] = self.tqdm_mod( + total=1, desc=name, position=len(self.pbars), leave=False, mininterval=1.0 + ) def mark_task_start(self, step: PartitionTask[Any]) -> None: if self.disable: @@ -35,8 +37,6 @@ def mark_task_start(self, step: PartitionTask[Any]) -> None: else: task_pbar = self.pbars[-1] task_pbar.total += 1 - if self.use_ray_tqdm: - task_pbar.refresh() stage_id = step.stage_id @@ -46,8 +46,6 @@ def mark_task_start(self, step: PartitionTask[Any]) -> None: else: pb = self.pbars[stage_id] pb.total += 1 - if self.use_ray_tqdm: - pb.refresh() def mark_task_done(self, step: PartitionTask[Any]) -> None: if self.disable: