From a42f22e78c2a18040ca9238d903c1da5600f4ae2 Mon Sep 17 00:00:00 2001 From: dongbin86 Date: Sat, 12 Oct 2024 10:42:07 +0800 Subject: [PATCH] ensure graceful shutdown after task completion --- Cargo.lock | 2 +- Cargo.toml | 4 +-- src/core/periodic.rs | 69 ++++++++++++++++++++++---------------------- src/core/worker.rs | 19 +++++------- 4 files changed, 46 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 325be82..7000a29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -588,7 +588,7 @@ checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "polly-scheduler" -version = "0.1.2" +version = "0.1.3" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index c34279d..6098b06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "polly-scheduler" -version = "0.1.2" +version = "0.1.3" author = "dongbin86" edition = "2021" description = "A robust task scheduling system leveraging Tokio, with built-in fault tolerance, persistence, and recovery capabilities to ensure seamless task management and execution." license-file = "LICENSE" -documentation = "https://docs.rs/polly-scheduler/0.1.0/" +documentation = "https://docs.rs/polly-scheduler/0.1.3/" repository = "https://github.com/dongbin86/polly-scheduler" [features] diff --git a/src/core/periodic.rs b/src/core/periodic.rs index b655417..9bf0bc3 100644 --- a/src/core/periodic.rs +++ b/src/core/periodic.rs @@ -1,7 +1,8 @@ use crate::core::error::BoxError; use std::{future::Future, sync::Arc, time::Duration}; use tokio::signal; -use tokio::{sync::Notify, time::sleep}; +use tokio::sync::RwLock; +use tokio::time::sleep; use tracing::{error, info, warn}; #[derive(Default)] @@ -9,7 +10,7 @@ pub struct PeriodicTask { // Name of the periodic task. name: String, // A notification mechanism for shutdown signaling. - shutdown: Arc, + shutdown: Arc>, } impl PeriodicTask { @@ -25,15 +26,16 @@ impl PeriodicTask { pub fn new(name: &str) -> Self { Self { name: name.to_owned(), - shutdown: Arc::new(Notify::new()), + shutdown: Arc::new(RwLock::new(false)), } } /// Sends a shutdown signal to the task. /// /// This method notifies the task to stop executing. - pub fn shutdown(self: Arc) { - self.shutdown.notify_one(); + pub async fn shutdown(self: Arc) { + let mut triggered = self.shutdown.write().await; // Acquire write lock to set shutdown state + *triggered = true; // Set the shutdown state to true } /// Starts the periodic task and sets up a signal handler for shutdown. @@ -75,7 +77,7 @@ impl PeriodicTask { &self.name ); // Notify the task to shut down. - signal_clone.shutdown(); + signal_clone.shutdown().await; } Err(err) => { error!( @@ -107,37 +109,36 @@ impl PeriodicTask { { info!("Periodic task '{}' started", &self.name); loop { - tokio::select! { - // Wait for a shutdown notification. - _ = self.shutdown.notified() => { - info!("Received shutdown signal, stopping periodic task '{}'.", &self.name); - break; // Exit the loop to stop the task. - } - // Wait for the specified interval to elapse. - _ = sleep(interval) => { - // Clone the task to execute it. - let task_clone = Arc::clone(&task); - let task_future = tokio::spawn(async move { - task_clone().await // Execute the task. - }); + // Check if shutdown is triggered + let triggered = self.shutdown.read().await; + if *triggered { + break; // Exit loop if shutdown is triggered + } - // Handle the result of the task execution. - match task_future.await { - Ok(Ok(_)) => { - info!("Periodic task '{}' completed successfully.", &self.name); - } - Ok(Err(e)) => { - warn!("Periodic task '{}' failed: {:?}", &self.name, e); - } - Err(e) if e.is_panic() => { - error!("Fatal: Periodic task '{}' encountered a panic.", &self.name); - } - Err(e) => { - error!("Periodic task '{}' failed unexpectedly: {:?}", &self.name, e); - } - } + let task_clone = Arc::clone(&task); + let task_future = tokio::spawn(async move { + task_clone().await // Execute the task. + }); + + // Handle the result of the task execution. + match task_future.await { + Ok(Ok(_)) => { + info!("Periodic task '{}' completed successfully.", &self.name); + } + Ok(Err(e)) => { + warn!("Periodic task '{}' failed: {:?}", &self.name, e); + } + Err(e) if e.is_panic() => { + error!("Fatal: Periodic task '{}' encountered a panic.", &self.name); + } + Err(e) => { + error!( + "Periodic task '{}' failed unexpectedly: {:?}", + &self.name, e + ); } } + sleep(interval).await; } info!("Periodic task '{}' stopped", &self.name); } diff --git a/src/core/worker.rs b/src/core/worker.rs index 04a8bcd..c3e6eba 100644 --- a/src/core/worker.rs +++ b/src/core/worker.rs @@ -20,6 +20,12 @@ pub(crate) async fn process_task_worker( // Generate a unique identifier for the worker let worker_id = generate_token!(); loop { + // Check if shutdown is triggered + let triggered = shutdown.read().await; + if *triggered { + break; // Exit loop if shutdown is triggered + } + // Fetch and execute a task from the task store let fetch = fetch_and_execute_task(queue_name, &worker_id, handlers.clone(), task_store.clone()) @@ -49,12 +55,6 @@ pub(crate) async fn process_task_worker( // Sleep for 1 second } } - - // Check if shutdown is triggered - let shutdown_triggered = shutdown.read().await; - if *shutdown_triggered { - break; // Exit loop if shutdown is triggered - } } tracing::warn!("Task worker='{worker_id}' is closed"); // Log that the worker has closed } @@ -106,7 +106,7 @@ async fn calculate_next_run(task: &TaskMeta) -> Option { next_run(cron_schedule, cron_timezone, task.next_run) } else { None // Return None if schedule or timezone is not defined - } + } } _ => None, // No next run time for one-time tasks } @@ -137,10 +137,7 @@ where Ok(()) // Return Ok if successful } -async fn handle_task_result( - result: TaskResult, - task: &TaskMeta, -) -> (Option, Option) { +async fn handle_task_result(result: TaskResult, task: &TaskMeta) -> (Option, Option) { // Handle the result of task execution to determine next run time and error status match result { TaskResult { result: Ok(()), .. } => {