Skip to content

Commit

Permalink
fix graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieugouel committed Nov 5, 2024
1 parent 73ed00a commit c831ba2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
20 changes: 14 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ use chrono::Local;
use clap::Parser;
use config::Config;
use env_logger::Builder;
use log::debug;
use log::{debug, info};
use std::error::Error;
use std::io::Write;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::try_join;
use tokio_graceful::Shutdown;

use crate::db::DB;
Expand Down Expand Up @@ -60,7 +59,7 @@ async fn api_handler(settings: Config, db: DB) {
let port = settings.get_int("api.port").unwrap();
let host = settings::host(address, port, false);

debug!("binding API listener to {}", host);
debug!("api - binding listener to {}", host);

let api_listener = TcpListener::bind(host).await.unwrap();
let app = api::app(db.clone());
Expand All @@ -73,7 +72,7 @@ async fn bmp_handler(settings: Config, db: DB) {
let port = settings.get_int("bmp.port").unwrap();
let host = settings::host(address, port, false);

debug!("binding BMP listener to {}", host);
debug!("bmp - binding listener to {}", host);

let bmp_listener = TcpListener::bind(host).await.unwrap();
loop {
Expand Down Expand Up @@ -101,8 +100,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let api_handler = shutdown.spawn_task(api_handler(settings.clone(), db.clone()));
let bmp_handler = shutdown.spawn_task(bmp_handler(settings.clone(), db.clone()));

shutdown.shutdown_with_limit(Duration::from_secs(1)).await?;
try_join!(api_handler, bmp_handler)?;
tokio::select! {
_ = shutdown.shutdown_with_limit(Duration::from_secs(1)) => {
info!("shutdown - gracefully after shutdown signal received");
},
_ = api_handler => {
info!("api - handler shutdown");
}
_ = bmp_handler => {
info!("bmp - handler shutdown");
}
}

Ok(())
}
2 changes: 1 addition & 1 deletion src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn send_to_kafka(settings: &Config, data: &[u8]) {
let (broker, topic) = get_kafka_details(settings);

if let Err(e) = produce_message(data, &topic, vec![broker.to_owned()]) {
error!("failed producing messages: {}", e);
error!("kafka - failed producing messages: {}", e);
}
}

Expand Down

0 comments on commit c831ba2

Please sign in to comment.