Skip to content

Commit

Permalink
Some fixes on job progress reporting and save
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed May 16, 2024
1 parent 72809f5 commit caa23a4
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 71 deletions.
8 changes: 8 additions & 0 deletions core/crates/heavy-lifting/src/file_identifier/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ impl FileIdentifier {

self.metadata.seeking_orphans_time = start.elapsed();
} else {
ctx.progress(vec![
ProgressUpdate::TaskCount(self.metadata.total_found_orphans),
ProgressUpdate::Message(format!(
"{} files to be identified",
self.metadata.total_found_orphans
)),
])
.await;
pending_running_tasks.extend(mem::take(&mut self.pending_tasks_on_resume));
}

Expand Down
22 changes: 19 additions & 3 deletions core/crates/heavy-lifting/src/indexer/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ impl Indexer {
self.metadata.total_tasks += handles.len() as u64;

ctx.progress(vec![
ProgressUpdate::TaskCount(handles.len() as u64),
ProgressUpdate::TaskCount(self.metadata.total_tasks),
ProgressUpdate::message(format!(
"Found {to_create_count} new files and {to_update_count} to update"
)),
Expand Down Expand Up @@ -551,7 +551,7 @@ impl Indexer {
dispatcher: &JobTaskDispatcher,
) -> Result<(), indexer::Error> {
// if we don't have any pending task, then this is a fresh job
if self.pending_tasks_on_resume.is_empty() {
let updates = if self.pending_tasks_on_resume.is_empty() {
let walker_root_path = Arc::new(
get_full_path_from_sub_path(
self.location.id,
Expand All @@ -578,10 +578,26 @@ impl Indexer {
.await,
);

self.metadata.total_tasks = 1;

let updates = vec![
ProgressUpdate::TaskCount(self.metadata.total_tasks),
ProgressUpdate::Message(format!("Indexing {}", walker_root_path.display())),
];

self.walker_root_path = Some(walker_root_path);

updates
} else {
pending_running_tasks.extend(mem::take(&mut self.pending_tasks_on_resume));
}

vec![
ProgressUpdate::TaskCount(self.metadata.total_tasks),
ProgressUpdate::Message("Resuming tasks".to_string()),
]
};

ctx.progress(updates).await;

Ok(())
}
Expand Down
114 changes: 65 additions & 49 deletions core/crates/heavy-lifting/src/job_system/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct JobsWorktables {
}

pub(super) struct JobSystemRunner<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> {
on_shutdown_mode: bool,
base_dispatcher: BaseTaskDispatcher<Error>,
handles: HashMap<JobId, JobHandle<OuterCtx, JobCtx>>,
worktables: JobsWorktables,
Expand All @@ -93,6 +94,7 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystemRunner<Outer
job_outputs_tx: chan::Sender<(JobId, Result<JobOutput, JobSystemError>)>,
) -> Self {
Self {
on_shutdown_mode: false,
base_dispatcher,
handles: HashMap::with_capacity(JOBS_INITIAL_CAPACITY),
worktables: JobsWorktables {
Expand Down Expand Up @@ -253,6 +255,7 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystemRunner<Outer
status: Result<ReturnStatus, Error>,
) -> Result<(), JobSystemError> {
let Self {
on_shutdown_mode,
handles,
worktables,
job_outputs_tx,
Expand All @@ -272,8 +275,8 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystemRunner<Outer
.expect("a JobName and location_id must've been inserted in the map with the job id");

assert!(worktables.running_jobs_set.remove(&(job_name, location_id)));

assert!(worktables.job_hashes.remove(&job_hash).is_some());

let mut handle = handles.remove(&job_id).expect("it must be here");

let res = match status {
Expand All @@ -291,44 +294,58 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystemRunner<Outer
handle.complete_job(job_return).await
}

Ok(ReturnStatus::Shutdown(Ok(Some(serialized_job)))) => {
let name = handle.ctx.report().await.name;

let Some(next_jobs) =
serialize_next_jobs_to_shutdown(job_id, job_name, handle.next_jobs).await
else {
return Ok(());
};

worktables
.jobs_to_store_by_ctx_id
.entry(handle.ctx.id())
.or_default()
.push(StoredJobEntry {
location_id,
root_job: StoredJob {
id: job_id,
name,
serialized_job,
},
next_jobs,
});
Ok(ReturnStatus::Shutdown(res)) => {
match res {
Ok(Some(serialized_job)) => {
let name = {
let db = handle.ctx.db();
let mut report = handle.ctx.report_mut().await;
if let Err(e) = report.update(db).await {
error!("failed to update report on job shutdown: {e:#?}");
}
report.name
};

worktables
.jobs_to_store_by_ctx_id
.entry(handle.ctx.id())
.or_default()
.push(StoredJobEntry {
location_id,
root_job: StoredJob {
id: job_id,
name,
serialized_job,
},
next_jobs: serialize_next_jobs_to_shutdown(
job_id,
job_name,
handle.next_jobs,
)
.await
.unwrap_or_default(),
});

debug!("Job was shutdown and serialized: <id='{job_id}', name='{name}'>");
}

debug!("Job was shutdown and serialized: <id='{job_id}', name='{name}'>");
Ok(None) => {
debug!(
"Job was shutdown but didn't returned any serialized data, \
probably it isn't resumable job: <id='{job_id}'>"
);
}

return Ok(());
}
Err(e) => {
error!("Failed to serialize job: {e:#?}");
}
}

Ok(ReturnStatus::Shutdown(Ok(None))) => {
debug!(
"Job was shutdown but didn't returned any serialized data, \
probably it isn't resumable job: <id='{job_id}'>"
);
return Ok(());
}
if *on_shutdown_mode && handles.is_empty() {
// Job system is empty and in shutdown mode so we close this channel to finish the shutdown process
job_return_status_tx.close();
}

Ok(ReturnStatus::Shutdown(Err(e))) => {
error!("Failed to serialize job: {e:#?}");
return Ok(());
}

Expand Down Expand Up @@ -594,27 +611,26 @@ pub(super) async fn run<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>>(
}

StreamMessage::RunnerMessage(RunnerMessage::Shutdown) => {
runner.on_shutdown_mode = true;
// Consuming all pending return status messages
loop {
while let Ok((job_id, status)) = job_return_status_rx_to_shutdown.try_recv() {
if let Err(e) = runner.process_return_status(job_id, status).await {
error!("Failed to process return status before shutting down: {e:#?}");
}
}

if runner.is_empty() {
break;
}
if !runner.is_empty() {
let mut job_return_status_stream = pin!(job_return_status_rx_to_shutdown);

debug!(
"Waiting for {} jobs to shutdown before shutting down the job system...",
runner.total_jobs()
);
}

// Now the runner can shutdown
if let Err(e) = runner.save_jobs(store_jobs_file).await {
error!("Failed to save jobs before shutting down: {e:#?}");
while let Some((job_id, status)) = job_return_status_stream.next().await {
if let Err(e) = runner.process_return_status(job_id, status).await {
error!("Failed to process return status before shutting down: {e:#?}");
}
}

// Now the runner can shutdown
if let Err(e) = runner.save_jobs(store_jobs_file).await {
error!("Failed to save jobs before shutting down: {e:#?}");
}
}

return;
Expand Down
68 changes: 58 additions & 10 deletions core/crates/heavy-lifting/src/media_processor/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use itertools::Itertools;
use prisma_client_rust::{raw, PrismaValue};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{debug, error, warn};
use tracing::{debug, warn};

use super::{
helpers,
Expand Down Expand Up @@ -85,6 +85,7 @@ pub struct MediaProcessor {
sub_path: Option<PathBuf>,
regenerate_thumbnails: bool,

total_media_data_extraction_files: u64,
total_media_data_extraction_tasks: u64,
total_thumbnailer_tasks: u64,
total_thumbnailer_files: u64,
Expand Down Expand Up @@ -209,6 +210,7 @@ impl MediaProcessor {
location: Arc::new(location),
sub_path,
regenerate_thumbnails,
total_media_data_extraction_files: 0,
total_media_data_extraction_tasks: 0,
total_thumbnailer_tasks: 0,
total_thumbnailer_files: 0,
Expand Down Expand Up @@ -260,6 +262,7 @@ impl MediaProcessor {
dispatcher,
)
.await?;
self.total_media_data_extraction_files = total_media_data_extraction_files;
self.total_media_data_extraction_tasks = task_handles.len() as u64;

pending_running_tasks.extend(task_handles);
Expand All @@ -276,18 +279,50 @@ impl MediaProcessor {
.await;

// Now we dispatch thumbnailer tasks
let (total_thumbnailer_tasks, task_handles) = dispatch_thumbnailer_tasks(
let (total_thumbnailer_files, task_handles) = dispatch_thumbnailer_tasks(
&iso_file_path,
self.regenerate_thumbnails,
&self.location_path,
dispatcher,
job_ctx,
)
.await?;
pending_running_tasks.extend(task_handles);

self.total_thumbnailer_tasks = total_thumbnailer_tasks;
self.total_thumbnailer_tasks = task_handles.len() as u64;
self.total_thumbnailer_files = total_thumbnailer_files;

pending_running_tasks.extend(task_handles);
} else {
let updates = match self.phase {
Phase::MediaDataExtraction => vec![
ProgressUpdate::TaskCount(self.total_media_data_extraction_files),
ProgressUpdate::CompletedTaskCount(
self.metadata.media_data_metrics.extracted
+ self.metadata.media_data_metrics.skipped,
),
ProgressUpdate::Phase(self.phase.to_string()),
ProgressUpdate::Message(format!(
"Preparing to process {} files in {} chunks",
self.total_media_data_extraction_files,
self.total_media_data_extraction_tasks
)),
],
Phase::ThumbnailGeneration => vec![
ProgressUpdate::TaskCount(self.total_thumbnailer_files),
ProgressUpdate::CompletedTaskCount(
self.metadata.thumbnailer_metrics_acc.generated
+ self.metadata.thumbnailer_metrics_acc.skipped,
),
ProgressUpdate::Phase(self.phase.to_string()),
ProgressUpdate::Message(format!(
"Preparing to process {} files in {} chunks",
self.total_thumbnailer_files, self.total_thumbnailer_tasks
)),
],
};

job_ctx.progress(updates).await;

pending_running_tasks.extend(mem::take(&mut self.pending_tasks_on_resume));
}

Expand Down Expand Up @@ -412,12 +447,20 @@ impl MediaProcessor {

self.errors.extend(errors);

job_ctx
.progress(vec![ProgressUpdate::CompletedTaskCount(
self.metadata.thumbnailer_metrics_acc.generated
+ self.metadata.thumbnailer_metrics_acc.skipped,
)])
.await;
debug!(
"Processed {}/{} thumbnailer tasks",
self.metadata.thumbnailer_metrics_acc.total_successful_tasks,
self.total_thumbnailer_tasks
);

if matches!(self.phase, Phase::ThumbnailGeneration) {
job_ctx
.progress(vec![ProgressUpdate::CompletedTaskCount(
self.metadata.thumbnailer_metrics_acc.generated
+ self.metadata.thumbnailer_metrics_acc.skipped,
)])
.await;
}

// if self.total_thumbnailer_tasks
// == self.metadata.thumbnailer_metrics_acc.total_successful_tasks
Expand Down Expand Up @@ -550,6 +593,7 @@ struct SaveState {
sub_path: Option<PathBuf>,
regenerate_thumbnails: bool,

total_media_data_extraction_files: u64,
total_media_data_extraction_tasks: u64,
total_thumbnailer_tasks: u64,
total_thumbnailer_files: u64,
Expand All @@ -570,6 +614,7 @@ impl<OuterCtx: OuterContext> SerializableJob<OuterCtx> for MediaProcessor {
location_path,
sub_path,
regenerate_thumbnails,
total_media_data_extraction_files,
total_media_data_extraction_tasks,
total_thumbnailer_tasks,
total_thumbnailer_files,
Expand All @@ -585,6 +630,7 @@ impl<OuterCtx: OuterContext> SerializableJob<OuterCtx> for MediaProcessor {
location_path,
sub_path,
regenerate_thumbnails,
total_media_data_extraction_files,
total_media_data_extraction_tasks,
total_thumbnailer_tasks,
total_thumbnailer_files,
Expand Down Expand Up @@ -628,6 +674,7 @@ impl<OuterCtx: OuterContext> SerializableJob<OuterCtx> for MediaProcessor {
location_path,
sub_path,
regenerate_thumbnails,
total_media_data_extraction_files,
total_media_data_extraction_tasks,
total_thumbnailer_tasks,
total_thumbnailer_files,
Expand All @@ -643,6 +690,7 @@ impl<OuterCtx: OuterContext> SerializableJob<OuterCtx> for MediaProcessor {
location_path,
sub_path,
regenerate_thumbnails,
total_media_data_extraction_files,
total_media_data_extraction_tasks,
total_thumbnailer_tasks,
total_thumbnailer_files,
Expand Down
Loading

0 comments on commit caa23a4

Please sign in to comment.