diff --git a/src/main.rs b/src/main.rs index 1d05ddf..f2b6c39 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,12 +10,11 @@ use chrono::Local; use clap::Parser; use config::Config; use env_logger::Builder; -use log::debug; +use log::{debug, info}; use std::error::Error; use std::io::Write; use std::time::Duration; use tokio::net::TcpListener; -use tokio::try_join; use tokio_graceful::Shutdown; use crate::db::DB; @@ -60,7 +59,7 @@ async fn api_handler(settings: Config, db: DB) { let port = settings.get_int("api.port").unwrap(); let host = settings::host(address, port, false); - debug!("binding API listener to {}", host); + debug!("api - binding listener to {}", host); let api_listener = TcpListener::bind(host).await.unwrap(); let app = api::app(db.clone()); @@ -73,7 +72,7 @@ async fn bmp_handler(settings: Config, db: DB) { let port = settings.get_int("bmp.port").unwrap(); let host = settings::host(address, port, false); - debug!("binding BMP listener to {}", host); + debug!("bmp - binding listener to {}", host); let bmp_listener = TcpListener::bind(host).await.unwrap(); loop { @@ -101,8 +100,17 @@ async fn main() -> Result<(), Box> { let api_handler = shutdown.spawn_task(api_handler(settings.clone(), db.clone())); let bmp_handler = shutdown.spawn_task(bmp_handler(settings.clone(), db.clone())); - shutdown.shutdown_with_limit(Duration::from_secs(1)).await?; - try_join!(api_handler, bmp_handler)?; + tokio::select! { + _ = shutdown.shutdown_with_limit(Duration::from_secs(1)) => { + info!("shutdown - gracefully after shutdown signal received"); + }, + _ = api_handler => { + info!("api - handler shutdown"); + } + _ = bmp_handler => { + info!("bmp - handler shutdown"); + } + } Ok(()) } diff --git a/src/pipeline.rs b/src/pipeline.rs index ba96ef2..0f8b7cb 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -15,7 +15,7 @@ pub fn send_to_kafka(settings: &Config, data: &[u8]) { let (broker, topic) = get_kafka_details(settings); if let Err(e) = produce_message(data, &topic, vec![broker.to_owned()]) { - error!("failed producing messages: {}", e); + error!("kafka - failed producing messages: {}", e); } }