Skip to content

Commit

Permalink
chore: Build progress bar only on first update (#3626)
Browse files Browse the repository at this point in the history
Theres 2 progress bars for swordfish, a rust based one thats for
terminals, and a python one thats for jupyter notebooks (uses tqdm).

This pr makes it so that the tqdm bar is only rendered on first update
(the rust based one is already lazy rendered), + increase the refresh
rate of both progress bars to 500ms.

native


https://github.com/user-attachments/assets/c55e51a4-4919-4246-9add-2fe13d305435

py


https://github.com/user-attachments/assets/8be184c5-5b48-4fee-9579-deb485f37e07

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2024
1 parent f9a89a7 commit 1c0f780
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 36 deletions.
33 changes: 21 additions & 12 deletions daft/runners/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,34 @@ def __init__(self) -> None:
self._maxinterval = 5.0
self.tqdm_mod = get_tqdm(False)
self.pbars: dict[int, Any] = dict()
self.bar_configs: dict[int, str] = dict()
self.next_id = 0

def make_new_bar(self, bar_format: str, initial_message: str) -> int:
pbar_id = len(self.pbars)
self.pbars[pbar_id] = self.tqdm_mod(
bar_format=bar_format,
desc=initial_message,
position=pbar_id,
leave=False,
mininterval=1.0,
maxinterval=self._maxinterval,
)
def make_new_bar(self, bar_format: str) -> int:
pbar_id = self.next_id
self.next_id += 1
self.bar_configs[pbar_id] = bar_format
return pbar_id

def update_bar(self, pbar_id: int, message: str) -> None:
if pbar_id not in self.pbars:
if pbar_id not in self.bar_configs:
raise ValueError(f"No bar configuration found for id {pbar_id}")
bar_format = self.bar_configs[pbar_id]
self.pbars[pbar_id] = self.tqdm_mod(
bar_format=bar_format,
position=pbar_id,
leave=False,
mininterval=1.0,
maxinterval=self._maxinterval,
)
del self.bar_configs[pbar_id]
self.pbars[pbar_id].set_description_str(message)

def close_bar(self, pbar_id: int) -> None:
self.pbars[pbar_id].close()
del self.pbars[pbar_id]
if pbar_id in self.pbars:
self.pbars[pbar_id].close()
del self.pbars[pbar_id]

def close(self) -> None:
for p in self.pbars.values():
Expand Down
4 changes: 1 addition & 3 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ impl ExecutionRuntimeContext {
runtime_stats: Arc<RuntimeStatsContext>,
) -> Option<Arc<OperatorProgressBar>> {
if let Some(ref pb_manager) = self.progress_bar_manager {
let pb = pb_manager
.make_new_bar(color, prefix, show_received)
.unwrap();
let pb = pb_manager.make_new_bar(color, prefix).unwrap();
Some(Arc::new(OperatorProgressBar::new(
pb,
runtime_stats,
Expand Down
25 changes: 4 additions & 21 deletions src/daft-local-execution/src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub trait ProgressBarManager {
&self,
color: ProgressBarColor,
prefix: &str,
show_received: bool,
) -> DaftResult<Box<dyn ProgressBar>>;

fn close_all(&self) -> DaftResult<()>;
Expand Down Expand Up @@ -52,8 +51,8 @@ pub struct OperatorProgressBar {
}

impl OperatorProgressBar {
// 100ms = 100_000_000ns
const UPDATE_INTERVAL: u64 = 100_000_000;
// 500ms = 500_000_000ns
const UPDATE_INTERVAL: u64 = 500_000_000;

pub fn new(
progress_bar: Box<dyn ProgressBar>,
Expand Down Expand Up @@ -146,27 +145,19 @@ impl ProgressBarManager for IndicatifProgressBarManager {
&self,
color: ProgressBarColor,
prefix: &str,
show_received: bool,
) -> DaftResult<Box<dyn ProgressBar>> {
let template_str = format!(
"🗡️ 🐟 {{spinner:.green}} {{prefix:.{color}/bold}} | [{{elapsed_precise}}] {{msg}}",
color = color.to_str(),
);

let initial_message = if show_received {
"0 rows received, 0 rows emitted".to_string()
} else {
"0 rows emitted".to_string()
};

let pb = indicatif::ProgressBar::new_spinner()
.with_style(
ProgressStyle::default_spinner()
.template(template_str.as_str())
.unwrap(),
)
.with_prefix(prefix.to_string())
.with_message(initial_message);
.with_prefix(prefix.to_string());

self.multi_progress.add(pb.clone());
DaftResult::Ok(Box::new(IndicatifProgressBar(pb)))
Expand Down Expand Up @@ -263,18 +254,10 @@ mod python {
&self,
_color: ProgressBarColor,
prefix: &str,
show_received: bool,
) -> DaftResult<Box<dyn ProgressBar>> {
let bar_format = format!("🗡️ 🐟 {prefix}: {{elapsed}} {{desc}}", prefix = prefix);
let initial_message = if show_received {
"0 rows received, 0 rows emitted".to_string()
} else {
"0 rows emitted".to_string()
};
let pb_id = Python::with_gil(|py| {
let pb_id =
self.inner
.call_method1(py, "make_new_bar", (bar_format, initial_message))?;
let pb_id = self.inner.call_method1(py, "make_new_bar", (bar_format,))?;
let pb_id = pb_id.extract::<usize>(py)?;
DaftResult::Ok(pb_id)
})?;
Expand Down

0 comments on commit 1c0f780

Please sign in to comment.