Skip to content

Commit

Permalink
refactor: fix queue lock blocking when multiple threads are running
Browse files Browse the repository at this point in the history
This would cause the program to process every image sequentailly, as the
next thread would wait for the lock while the image processed or saved.
We fix that by dropping the locks before beginning processing
  • Loading branch information
Daru-san committed Dec 17, 2024
1 parent 0150320 commit 02ff23b
Showing 1 changed file with 68 additions and 42 deletions.
110 changes: 68 additions & 42 deletions src/app/run/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,66 +198,91 @@ impl BatchRunner {
self.tasks_pool.scope(|s| -> Result<()> {
loop {
let tasks_queue = Arc::clone(&self.tasks_queue);

let progress = Arc::clone(&self.progress);

s.spawn(move |_| {
let mut task_lock = tasks_queue.lock().unwrap();
let mut progress_lock = progress.lock().unwrap();

let task_id = if !task_lock.decoded_tasks().is_empty() {
task_lock.decoded_tasks()[0].id
} else {
return;
};

let mut current_task = {
match task_lock.task_by_id_mut(task_id) {
Some(task) => std::mem::take(task),
_ => {
task_lock.fail_task(task_id, TaskError::NoSuchTask.to_string());
return;
let mut task_lock = tasks_queue.lock().unwrap();

let task_id = if !task_lock.decoded_tasks().is_empty() {
task_lock.decoded_tasks()[0].id
} else {
return;
};

let task = {
match task_lock.task_by_id_mut(task_id) {
Some(task) => match task.state {
TaskState::Decoded => take(task),
TaskState::Failed(_) => return,
_ => return,
},
_ => {
task_lock.fail_task(task_id, TaskError::NoSuchTask.to_string());
return;
}
}
}
};

task
};

progress_lock.start_task(&format!(
"Processing image {:?}",
current_task.out_path.file_name().as_slice()
));
{
let progress_lock = progress.lock().unwrap();
progress_lock.start_task(
&command_msg(
command,
current_task.out_path.file_name().as_slice()[0]
.to_string_lossy()
.as_ref(),
)
.unwrap(),
);
}

let result = run_command(command, &mut current_task.image);

let mut task_lock = tasks_queue.lock().unwrap();

let mut progress_lock = progress.lock().unwrap();

match run_command(command, &mut current_task.image) {
match result {
Ok(mut image) => task_lock.processed_task(&mut image, current_task.id),
Err(error) => progress_lock.error_sub_task(&format!("Error: {}", error)),
};
});

let tasks_queue = Arc::clone(&self.tasks_queue);

let progress = Arc::clone(&self.progress);

s.spawn(move |_| {
let mut task_lock = tasks_queue.lock().unwrap();

let task_id = if !task_lock.processed_tasks().is_empty() {
task_lock.processed_tasks()[0].id
} else {
return;
};

let current_task = {
match task_lock.task_by_id_mut(task_id) {
Some(task) => std::mem::take(task),
_ => {
task_lock.fail_task(task_id, TaskError::NoSuchTask.to_string());
return;
}
}
};
let mut task_lock = tasks_queue.lock().unwrap();

if let Some(task) = task_lock.task_by_id(task_id) {
if let TaskState::Failed(_) = task.state {
let task_id = if !task_lock.processed_tasks().is_empty() {
task_lock.processed_tasks()[0].id
} else {
return;
}
}
};

let task = {
match task_lock.task_by_id_mut(task_id) {
Some(task) => match task.state {
TaskState::Processed => take(task),
TaskState::Failed(_) => return,
_ => return,
},
_ => {
task_lock.fail_task(task_id, TaskError::NoSuchTask.to_string());
return;
}
}
};

task
};

let image_result = save_image_format(
&current_task.image,
Expand All @@ -266,17 +291,18 @@ impl BatchRunner {
);

let mut progress_lock = progress.lock().unwrap();
let mut task_lock = tasks_queue.lock().unwrap();

match image_result {
Ok(()) => {
task_lock.completed_task(task_id);
task_lock.completed_task(current_task.id);
progress_lock.finish_sub_task(&format!(
"Image saved successfully: {:?}",
&current_task.out_path.file_name().as_slice()
));
}
Err(save_error) => {
task_lock.fail_task(task_id, save_error.to_string());
task_lock.fail_task(current_task.id, save_error.to_string());
progress_lock.error_sub_task(&format!(
"Image processing failed with error: {:?}",
&current_task.out_path.file_name().as_slice(),
Expand Down

0 comments on commit 02ff23b

Please sign in to comment.