Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Core): gracefully shutdown router/scheduler if Redis is unavailable #891

Merged
merged 19 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3d7f610
Gracefull shutdown for Server
Apr 16, 2023
f74358f
Gracefull shutdown for Server
Apr 16, 2023
6e7be29
Merge branch 'main' of https://github.com/juspay/hyperswitch
Apr 16, 2023
d7ae6d5
Gracefull shutdown for Server
Apr 16, 2023
7a92e0c
Gracefull shutdown for Server
Apr 16, 2023
4062498
Gracefull shutdown for Server: resolved changes
prajjwalkumar17 Apr 17, 2023
47f5e8a
Merge branch 'main' of https://github.com/juspay/hyperswitch
prajjwalkumar17 Apr 17, 2023
439d62b
Gracefull shutdown for Server: resolved changes all
prajjwalkumar17 Apr 17, 2023
530f487
Merge branch 'main' into feat/gracefullShutdown
prajjwalkumar17 Apr 17, 2023
467978a
gracefull shutdown resolved tokio::sync::mpsc::channel(1) to mpsc::ch…
prajjwalkumar17 Apr 17, 2023
b52694f
Refactored the comment sequence to make it more meaningful
prajjwalkumar17 Apr 18, 2023
b19f6c9
Merge branch 'main' into feat/gracefullShutdown
prajjwalkumar17 Apr 19, 2023
a1345ce
Merge branch 'main' into feat/gracefullShutdown
prajjwalkumar17 Apr 19, 2023
30496df
Updated serde and thiserror to latest version
prajjwalkumar17 Apr 19, 2023
fca393c
Updated some import changes
prajjwalkumar17 Apr 20, 2023
62db3a1
added crate::config on top itself
prajjwalkumar17 Apr 20, 2023
96c2df7
resolved and added the crate::configs at top
prajjwalkumar17 Apr 20, 2023
99401b0
Merge branch 'main' into feat/gracefullShutdown
jarnura Apr 20, 2023
f72b2e6
Merge branch 'main' of https://github.com/juspay/hyperswitch into fea…
prajjwalkumar17 Apr 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/common_utils/src/pii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mod pii_masking_strategy_tests {
#[test]
fn test_invalid_card_number_masking() {
let secret: Secret<String, CardNumber> = Secret::new("1234567890".to_string());
assert_eq!("123456****", format!("{secret:?}"));
assert_eq!("*** alloc::string::String ***", format!("{secret:?}",));
}

/*
Expand Down
7 changes: 4 additions & 3 deletions crates/common_utils/src/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

use futures::StreamExt;
use router_env::logger;
pub use tokio::sync::oneshot;
use tokio::sync::mpsc;

///
/// This functions is meant to run in parallel to the application.
/// It will send a signal to the receiver when a SIGTERM or SIGINT is received
///
pub async fn signal_handler(mut sig: signal_hook_tokio::Signals, sender: oneshot::Sender<()>) {
pub async fn signal_handler(mut sig: signal_hook_tokio::Signals, sender: mpsc::Sender<()>) {
if let Some(signal) = sig.next().await {
logger::info!(
"Received signal: {:?}",
signal_hook::low_level::signal_name(signal)
);
match signal {
signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.send(()) {
signal_hook::consts::SIGTERM | signal_hook::consts::SIGINT => match sender.try_send(())
{
Ok(_) => {
logger::info!("Request for force shutdown received")
}
Expand Down
12 changes: 7 additions & 5 deletions crates/drainer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ pub mod settings;
mod utils;
use std::sync::{atomic, Arc};

use common_utils::signals::{get_allowed_signals, oneshot};
use common_utils::signals::get_allowed_signals;
pub use env as logger;
use error_stack::{IntoReport, ResultExt};
use storage_models::kv;
use tokio::sync::mpsc;

use crate::{connection::pg_connection, services::Store};

Expand All @@ -35,14 +36,15 @@ pub async fn start_drainer(
.change_context(errors::DrainerError::SignalError(
"Failed while getting allowed signals".to_string(),
))?;
let (tx, mut rx) = oneshot::channel();

let (tx, mut rx) = mpsc::channel(1);
let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));

let active_tasks = Arc::new(atomic::AtomicU64::new(0));
'event: loop {
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => {
Err(mpsc::error::TryRecvError::Empty) => {
if utils::is_stream_available(stream_index, store.clone()).await {
tokio::spawn(drainer_handler(
store.clone(),
Expand All @@ -59,17 +61,17 @@ pub async fn start_drainer(
)
.await;
}
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => {
logger::info!("Awaiting shutdown!");
metrics::SHUTDOWN_SIGNAL_RECEIVED.add(&metrics::CONTEXT, 1, &[]);
let shutdown_started = tokio::time::Instant::now();
rx.close();
loop {
if active_tasks.load(atomic::Ordering::Acquire) == 0 {
logger::info!("Terminating drainer");
metrics::SUCCESSFUL_SHUTDOWN.add(&metrics::CONTEXT, 1, &[]);
let shutdown_ended = shutdown_started.elapsed().as_secs_f64() * 1000f64;
metrics::CLEANUP_TIME.record(&metrics::CONTEXT, shutdown_ended, &[]);

break 'event;
}
shutdown_interval.tick().await;
Expand Down
1 change: 1 addition & 0 deletions crates/redis_interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fred = { version = "6.0.0", features = ["metrics", "partial-tracing"] }
futures = "0.3"
serde = { version = "1.0.160", features = ["derive"] }
thiserror = "1.0.40"
tokio = "1.26.0"

# First party crates
common_utils = { version = "0.1.0", path = "../common_utils", features = ["async_ext"] }
Expand Down
8 changes: 7 additions & 1 deletion crates/redis_interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,18 @@ impl RedisConnectionPool {
};
}
}
pub async fn on_error(&self) {

pub async fn on_error(&self, tx: tokio::sync::oneshot::Sender<()>) {
while let Ok(redis_error) = self.pool.on_error().recv().await {
logger::error!(?redis_error, "Redis protocol or connection error");
logger::error!("current state: {:#?}", self.pool.state());
if self.pool.state() == fred::types::ClientState::Disconnected {
if tx.send(()).is_err() {
logger::error!("The redis shutdown signal sender failed to signal");
}
self.is_redis_available
.store(false, atomic::Ordering::SeqCst);
break;
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/router/src/bin/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async fn main() -> ApplicationResult<()> {
let (server, mut state) = router::start_server(conf)
.await
.expect("Failed to create the server");

let _ = server.await;

state.store.close().await;
Expand Down
17 changes: 13 additions & 4 deletions crates/router/src/bin/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use router::{
core::errors::{self, CustomResult},
logger, routes, scheduler,
};
use tokio::sync::{mpsc, oneshot};

const SCHEDULER_FLOW: &str = "SCHEDULER_FLOW";

Expand All @@ -18,14 +19,21 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> {
#[allow(clippy::expect_used)]
let conf = Settings::with_config_path(cmd_line.config_path)
.expect("Unable to construct application configuration");

let mut state = routes::AppState::new(conf).await;
// channel for listening to redis disconnect events
let (redis_shutdown_signal_tx, redis_shutdown_signal_rx) = oneshot::channel();
let mut state = routes::AppState::new(conf, redis_shutdown_signal_tx).await;
// channel to shutdown scheduler gracefully
let (tx, rx) = mpsc::channel(1);
tokio::spawn(router::receiver_for_error(
redis_shutdown_signal_rx,
tx.clone(),
));
let _guard =
logger::setup(&state.conf.log).map_err(|_| errors::ProcessTrackerError::UnexpectedFlow)?;

logger::debug!(startup_config=?state.conf);

start_scheduler(&state).await?;
start_scheduler(&state, (tx, rx)).await?;

state.store.close().await;

Expand All @@ -35,6 +43,7 @@ async fn main() -> CustomResult<(), errors::ProcessTrackerError> {

async fn start_scheduler(
state: &routes::AppState,
channel: (mpsc::Sender<()>, mpsc::Receiver<()>),
) -> CustomResult<(), errors::ProcessTrackerError> {
use std::str::FromStr;

Expand All @@ -49,5 +58,5 @@ async fn start_scheduler(
.scheduler
.clone()
.ok_or(errors::ProcessTrackerError::ConfigurationError)?;
scheduler::start_process_tracker(state, flow, Arc::new(scheduler_settings)).await
scheduler::start_process_tracker(state, flow, Arc::new(scheduler_settings), channel).await
}
38 changes: 35 additions & 3 deletions crates/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ pub mod utils;

use actix_web::{
body::MessageBody,
dev::{Server, ServiceFactory, ServiceRequest},
dev::{Server, ServerHandle, ServiceFactory, ServiceRequest},
middleware::ErrorHandlers,
};
use http::StatusCode;
use routes::AppState;
use tokio::sync::{mpsc, oneshot};

pub use self::env::logger;
use crate::{
Expand Down Expand Up @@ -140,7 +141,8 @@ pub fn mk_app(
pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server, AppState)> {
logger::debug!(startup_config=?conf);
let server = conf.server.clone();
let state = routes::AppState::new(conf).await;
let (tx, rx) = oneshot::channel();
let state = routes::AppState::new(conf, tx).await;
// Cloning to close connections before shutdown
let app_state = state.clone();
let request_body_limit = server.request_body_limit;
Expand All @@ -149,10 +151,40 @@ pub async fn start_server(conf: settings::Settings) -> ApplicationResult<(Server
.workers(server.workers)
.shutdown_timeout(server.shutdown_timeout)
.run();

tokio::spawn(receiver_for_error(rx, server.handle()));
Ok((server, app_state))
}

pub async fn receiver_for_error(rx: oneshot::Receiver<()>, mut server: impl Stop) {
match rx.await {
Ok(_) => {
logger::error!("The redis server failed ");
server.stop_server().await;
}
Err(err) => {
logger::error!("Channel receiver error{err}");
}
}
}

#[async_trait::async_trait]
pub trait Stop {
async fn stop_server(&mut self);
}

#[async_trait::async_trait]
impl Stop for ServerHandle {
async fn stop_server(&mut self) {
let _ = self.stop(true).await;
}
}
#[async_trait::async_trait]
impl Stop for mpsc::Sender<()> {
async fn stop_server(&mut self) {
let _ = self.send(()).await.map_err(|err| logger::error!("{err}"));
}
}

pub fn get_application_builder(
request_body_limit: usize,
) -> actix_web::App<
Expand Down
13 changes: 9 additions & 4 deletions crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use actix_web::{web, Scope};
use tokio::sync::oneshot;

use super::health::*;
#[cfg(feature = "olap")]
Expand Down Expand Up @@ -40,11 +41,15 @@ impl AppStateInfo for AppState {
}

impl AppState {
pub async fn with_storage(conf: Settings, storage_impl: StorageImpl) -> Self {
pub async fn with_storage(
conf: Settings,
storage_impl: StorageImpl,
shut_down_signal: oneshot::Sender<()>,
) -> Self {
let testable = storage_impl == StorageImpl::PostgresqlTest;
let store: Box<dyn StorageInterface> = match storage_impl {
StorageImpl::Postgresql | StorageImpl::PostgresqlTest => {
Box::new(Store::new(&conf, testable).await)
Box::new(Store::new(&conf, testable, shut_down_signal).await)
}
StorageImpl::Mock => Box::new(MockDb::new(&conf).await),
};
Expand All @@ -56,8 +61,8 @@ impl AppState {
}
}

pub async fn new(conf: Settings) -> Self {
Self::with_storage(conf, StorageImpl::Postgresql).await
pub async fn new(conf: Settings, shut_down_signal: oneshot::Sender<()>) -> Self {
Self::with_storage(conf, StorageImpl::Postgresql, shut_down_signal).await
}
}

Expand Down
15 changes: 13 additions & 2 deletions crates/router/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod workflows;

use std::sync::Arc;

use tokio::sync::mpsc;

pub use self::types::*;
use crate::{
configs::settings::SchedulerSettings,
Expand All @@ -21,11 +23,20 @@ pub async fn start_process_tracker(
state: &AppState,
scheduler_flow: SchedulerFlow,
scheduler_settings: Arc<SchedulerSettings>,
channel: (mpsc::Sender<()>, mpsc::Receiver<()>),
) -> CustomResult<(), errors::ProcessTrackerError> {
match scheduler_flow {
SchedulerFlow::Producer => producer::start_producer(state, scheduler_settings).await?,
SchedulerFlow::Producer => {
producer::start_producer(state, scheduler_settings, channel).await?
}
SchedulerFlow::Consumer => {
consumer::start_consumer(state, scheduler_settings, workflows::runner_from_task).await?
consumer::start_consumer(
state,
scheduler_settings,
workflows::runner_from_task,
channel,
)
.await?
}
SchedulerFlow::Cleaner => {
error!("This flow has not been implemented yet!");
Expand Down
13 changes: 7 additions & 6 deletions crates/router/src/scheduler/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::{
sync::{self, atomic},
};

use common_utils::signals::{get_allowed_signals, oneshot};
use common_utils::signals::get_allowed_signals;
use error_stack::{IntoReport, ResultExt};
use futures::future;
use redis_interface::{RedisConnectionPool, RedisEntryId};
use router_env::{instrument, tracing};
use time::PrimitiveDateTime;
use tokio::sync::mpsc;
use uuid::Uuid;

use super::{
Expand All @@ -37,6 +38,7 @@ pub async fn start_consumer(
state: &AppState,
settings: sync::Arc<settings::SchedulerSettings>,
workflow_selector: workflows::WorkflowSelectorFn,
(tx, mut rx): (mpsc::Sender<()>, mpsc::Receiver<()>),
) -> CustomResult<(), errors::ProcessTrackerError> {
use std::time::Duration;

Expand All @@ -58,13 +60,12 @@ pub async fn start_consumer(
})
.into_report()
.attach_printable("Failed while creating a signals handler")?;
let (sx, mut rx) = oneshot::channel();
let handle = signal.handle();
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, sx));
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));

loop {
match rx.try_recv() {
Err(oneshot::error::TryRecvError::Empty) => {
Err(mpsc::error::TryRecvError::Empty) => {
interval.tick().await;

// A guard from env to disable the consumer
Expand All @@ -82,11 +83,11 @@ pub async fn start_consumer(
workflow_selector,
));
}
Ok(()) | Err(oneshot::error::TryRecvError::Closed) => {
Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => {
logger::debug!("Awaiting shutdown!");
rx.close();
shutdown_interval.tick().await;
let active_tasks = consumer_operation_counter.load(atomic::Ordering::Acquire);

match active_tasks {
0 => {
logger::info!("Terminating consumer");
Expand Down
Loading