Skip to content

Commit

Permalink
ensure graceful shutdown after task completion
Browse files Browse the repository at this point in the history
  • Loading branch information
dongbin86 committed Oct 12, 2024
1 parent 652cebd commit a42f22e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[package]
name = "polly-scheduler"
version = "0.1.2"
version = "0.1.3"
author = "dongbin86"
edition = "2021"
description = "A robust task scheduling system leveraging Tokio, with built-in fault tolerance, persistence, and recovery capabilities to ensure seamless task management and execution."
license-file = "LICENSE"
documentation = "https://docs.rs/polly-scheduler/0.1.0/"
documentation = "https://docs.rs/polly-scheduler/0.1.3/"
repository = "https://github.com/dongbin86/polly-scheduler"

[features]
Expand Down
69 changes: 35 additions & 34 deletions src/core/periodic.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use crate::core::error::BoxError;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::signal;
use tokio::{sync::Notify, time::sleep};
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{error, info, warn};

#[derive(Default)]
pub struct PeriodicTask {
// Name of the periodic task.
name: String,
// A notification mechanism for shutdown signaling.
shutdown: Arc<Notify>,
shutdown: Arc<RwLock<bool>>,
}

impl PeriodicTask {
Expand All @@ -25,15 +26,16 @@ impl PeriodicTask {
pub fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
shutdown: Arc::new(Notify::new()),
shutdown: Arc::new(RwLock::new(false)),
}
}

/// Sends a shutdown signal to the task.
///
/// This method notifies the task to stop executing.
pub fn shutdown(self: Arc<Self>) {
self.shutdown.notify_one();
pub async fn shutdown(self: Arc<Self>) {
let mut triggered = self.shutdown.write().await; // Acquire write lock to set shutdown state
*triggered = true; // Set the shutdown state to true
}

/// Starts the periodic task and sets up a signal handler for shutdown.
Expand Down Expand Up @@ -75,7 +77,7 @@ impl PeriodicTask {
&self.name
);
// Notify the task to shut down.
signal_clone.shutdown();
signal_clone.shutdown().await;
}
Err(err) => {
error!(
Expand Down Expand Up @@ -107,37 +109,36 @@ impl PeriodicTask {
{
info!("Periodic task '{}' started", &self.name);
loop {
tokio::select! {
// Wait for a shutdown notification.
_ = self.shutdown.notified() => {
info!("Received shutdown signal, stopping periodic task '{}'.", &self.name);
break; // Exit the loop to stop the task.
}
// Wait for the specified interval to elapse.
_ = sleep(interval) => {
// Clone the task to execute it.
let task_clone = Arc::clone(&task);
let task_future = tokio::spawn(async move {
task_clone().await // Execute the task.
});
// Check if shutdown is triggered
let triggered = self.shutdown.read().await;
if *triggered {
break; // Exit loop if shutdown is triggered
}

// Handle the result of the task execution.
match task_future.await {
Ok(Ok(_)) => {
info!("Periodic task '{}' completed successfully.", &self.name);
}
Ok(Err(e)) => {
warn!("Periodic task '{}' failed: {:?}", &self.name, e);
}
Err(e) if e.is_panic() => {
error!("Fatal: Periodic task '{}' encountered a panic.", &self.name);
}
Err(e) => {
error!("Periodic task '{}' failed unexpectedly: {:?}", &self.name, e);
}
}
let task_clone = Arc::clone(&task);
let task_future = tokio::spawn(async move {
task_clone().await // Execute the task.
});

// Handle the result of the task execution.
match task_future.await {
Ok(Ok(_)) => {
info!("Periodic task '{}' completed successfully.", &self.name);
}
Ok(Err(e)) => {
warn!("Periodic task '{}' failed: {:?}", &self.name, e);
}
Err(e) if e.is_panic() => {
error!("Fatal: Periodic task '{}' encountered a panic.", &self.name);
}
Err(e) => {
error!(
"Periodic task '{}' failed unexpectedly: {:?}",
&self.name, e
);
}
}
sleep(interval).await;
}
info!("Periodic task '{}' stopped", &self.name);
}
Expand Down
19 changes: 8 additions & 11 deletions src/core/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub(crate) async fn process_task_worker<T>(
// Generate a unique identifier for the worker
let worker_id = generate_token!();
loop {
// Check if shutdown is triggered
let triggered = shutdown.read().await;
if *triggered {
break; // Exit loop if shutdown is triggered
}

// Fetch and execute a task from the task store
let fetch =
fetch_and_execute_task(queue_name, &worker_id, handlers.clone(), task_store.clone())
Expand Down Expand Up @@ -49,12 +55,6 @@ pub(crate) async fn process_task_worker<T>(
// Sleep for 1 second
}
}

// Check if shutdown is triggered
let shutdown_triggered = shutdown.read().await;
if *shutdown_triggered {
break; // Exit loop if shutdown is triggered
}
}
tracing::warn!("Task worker='{worker_id}' is closed"); // Log that the worker has closed
}
Expand Down Expand Up @@ -106,7 +106,7 @@ async fn calculate_next_run(task: &TaskMeta) -> Option<i64> {
next_run(cron_schedule, cron_timezone, task.next_run)
} else {
None // Return None if schedule or timezone is not defined
}
}
}
_ => None, // No next run time for one-time tasks
}
Expand Down Expand Up @@ -137,10 +137,7 @@ where
Ok(()) // Return Ok if successful
}

async fn handle_task_result(
result: TaskResult,
task: &TaskMeta,
) -> (Option<String>, Option<i64>) {
async fn handle_task_result(result: TaskResult, task: &TaskMeta) -> (Option<String>, Option<i64>) {
// Handle the result of task execution to determine next run time and error status
match result {
TaskResult { result: Ok(()), .. } => {
Expand Down

0 comments on commit a42f22e

Please sign in to comment.