From 69328435630b4ee0b23614ab8d1c475f9d80ca8d Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Thu, 15 Aug 2024 15:28:20 +0530 Subject: [PATCH 01/21] feat: watch changes in path Signed-off-by: Sahil Yeole --- Cargo.lock | 82 ++++++++++++++++++++++++ Cargo.toml | 1 + src/cli/command.rs | 3 + src/cli/server/http_server.rs | 23 +++++-- src/cli/tc/run.rs | 7 +- src/cli/tc/start.rs | 117 ++++++++++++++++++++++++++++++++-- 6 files changed, 219 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58b729b6d7..c7e1539c72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1635,6 +1635,18 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "filetime" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -1666,6 +1678,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.30" @@ -2543,6 +2564,26 @@ dependencies = [ "serde", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.3" @@ -2673,6 +2714,26 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -2859,6 +2920,7 @@ checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ "bitflags 2.5.0", "libc", + "redox_syscall 0.5.2", ] [[package]] @@ -3239,6 +3301,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.5.0", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "ntapi" version = "0.4.1" @@ -5485,6 +5566,7 @@ dependencies = [ "mime", "moka", "nom", + "notify", "num", "num_cpus", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index efb97afcfe..b6a710028c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,7 @@ indenter = "0.3.3" derive_more = { workspace = true } enum_dispatch = "0.3.13" strum = "0.26.2" +notify = "6.1.1" [dev-dependencies] tailcall-prettier = { path = "tailcall-prettier" } diff --git a/src/cli/command.rs b/src/cli/command.rs index 36f79cb0d4..4ad847483a 100644 --- a/src/cli/command.rs +++ b/src/cli/command.rs @@ -26,6 +26,9 @@ pub enum Command { /// separated by spaces if more than one #[arg(required = true)] file_paths: Vec, + /// Watch for changes + #[arg(short, long)] + watch: bool, }, /// Validate a composition spec diff --git a/src/cli/server/http_server.rs b/src/cli/server/http_server.rs index 62928c492f..daf81e1c02 100644 --- a/src/cli/server/http_server.rs +++ b/src/cli/server/http_server.rs @@ -2,6 +2,7 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::Result; +use tokio::sync::broadcast; use tokio::sync::oneshot::{self}; use super::http_1::start_http_1; @@ -46,16 +47,28 @@ impl Server { } } - /// Starts the server in its own multithreaded Runtime - pub async fn fork_start(self) -> Result<()> { + pub async fn fork_start(self, rec: Option<&mut broadcast::Receiver<()>>) -> Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(self.config_module.deref().server.get_workers()) .enable_all() .build()?; - let result = runtime.spawn(async { self.start().await }).await?; - runtime.shutdown_background(); + let handle = runtime.spawn(async { self.start().await }); - result + if let Some(receiver) = rec { + tokio::select! { + _ = receiver.recv() => { + tracing::info!("Server shutdown signal received"); + runtime.shutdown_background(); + tracing::info!("Server shutdown complete"); + } + _ = handle => { + tracing::info!("Server completed without shutdown signal"); + } + } + } else { + handle.await?; + } + Ok(()) } } diff --git a/src/cli/tc/run.rs b/src/cli/tc/run.rs index c180142dfc..cc2a93b5af 100644 --- a/src/cli/tc/run.rs +++ b/src/cli/tc/run.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use clap::Parser; use convert_case::{Case, Casing}; @@ -35,8 +37,9 @@ pub async fn run() -> Result<()> { async fn run_command(cli: Cli, config_reader: ConfigReader, runtime: TargetRuntime) -> Result<()> { match cli.command { - Command::Start { file_paths } => { - start::start_command(file_paths, &config_reader).await?; + Command::Start { file_paths, watch } => { + let arc_config_reader = Arc::new(config_reader); + start::start_command(file_paths, watch, arc_config_reader).await?; } Command::Check { file_paths, n_plus_one_queries, schema, format } => { check::check_command( diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index b154ed0ccb..924869e0b1 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -1,4 +1,8 @@ -use anyhow::Result; +use anyhow::{Context, Result}; +use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use std::sync::Arc; +use tokio::sync::{broadcast, Mutex}; +use tokio::task; use super::helpers::log_endpoint_set; use crate::cli::fmt::Fmt; @@ -7,12 +11,111 @@ use crate::core::config::reader::ConfigReader; pub(super) async fn start_command( file_paths: Vec, - config_reader: &ConfigReader, + watch: bool, + config_reader: Arc, ) -> Result<()> { - let config_module = config_reader.read_all(&file_paths).await?; - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); - let server = Server::new(config_module); - server.fork_start().await?; + if watch { + start_watch_server(file_paths, config_reader).await?; + } else { + let config_module = config_reader + .read_all(&file_paths) + .await + .context("Failed to read config files")?; + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + let server = Server::new(config_module); + server + .fork_start(None) + .await + .context("Failed to start server")?; + } + Ok(()) +} + +async fn start_watch_server( + file_paths: Vec, + config_reader: Arc, +) -> Result<()> { + let (tx, mut rx) = broadcast::channel(16); + let file_paths_clone = file_paths.clone(); + + tokio::spawn(async move { + let watch_handler = task::spawn(async move { + let (watch_tx, watch_rx) = std::sync::mpsc::channel(); + + let mut watcher = match RecommendedWatcher::new(watch_tx, Config::default()) { + Ok(watcher) => watcher, + Err(err) => { + tracing::error!("Failed to create watcher: {}", err); + return; + } + }; + + for path in &file_paths_clone { + if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::Recursive) { + tracing::error!("Failed to watch path {:?}: {}", path, err); + } + } + + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + match watch_rx.recv() { + Ok(event) => { + if let Ok(event) = event { + if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = + event.kind + { + tracing::info!("File change detected"); + if let Err(err) = tx.send(()) { + tracing::error!("Failed to send the signal: {}", err); + } + } + } + } + Err(e) => tracing::error!("Watch error: {:?}", e), + } + } + }); + + let server_handler = task::spawn({ + let config_reader = Arc::clone(&config_reader); + let file_paths = file_paths.clone(); + async move { + let mut rec = Some(&mut rx); + let shown_warning = Arc::new(Mutex::new(false)); + loop { + match config_reader.read_all(&file_paths).await { + Ok(config_module) => { + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + let server = Server::new(config_module.clone()); + if let Err(err) = server.fork_start(rec.as_deref_mut()).await { + tracing::error!("Failed to start server: {}", err); + } + *shown_warning.lock().await = false; + tracing::info!("Restarting server"); + } + Err(err) => { + if !*shown_warning.lock().await { + tracing::error!("Failed to read config files: {}", err); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + *shown_warning.lock().await = true; + } + } + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + }); + + if let Err(err) = watch_handler.await { + tracing::debug!("Error in watch handler: {}", err); + } + if let Err(err) = server_handler.await { + tracing::debug!("Error in server handler: {}", err); + } + }) + .await + .context("Failed to spawn watch server task")?; Ok(()) } From 0abc74b133c4016c2d61a37511d98be17862d034 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Thu, 15 Aug 2024 15:31:11 +0530 Subject: [PATCH 02/21] revert comment Signed-off-by: Sahil Yeole --- src/cli/server/http_server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cli/server/http_server.rs b/src/cli/server/http_server.rs index daf81e1c02..3c2fcf59be 100644 --- a/src/cli/server/http_server.rs +++ b/src/cli/server/http_server.rs @@ -47,6 +47,7 @@ impl Server { } } + /// Starts the server in its own multithreaded Runtime pub async fn fork_start(self, rec: Option<&mut broadcast::Receiver<()>>) -> Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(self.config_module.deref().server.get_workers()) From 9645c3822bfcfd02fbad6b05e64284122793ef78 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Thu, 15 Aug 2024 15:37:11 +0530 Subject: [PATCH 03/21] remove outer task Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 120 +++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 62 deletions(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 924869e0b1..a2e74fb13d 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -39,83 +39,79 @@ async fn start_watch_server( let (tx, mut rx) = broadcast::channel(16); let file_paths_clone = file_paths.clone(); - tokio::spawn(async move { - let watch_handler = task::spawn(async move { - let (watch_tx, watch_rx) = std::sync::mpsc::channel(); + let watch_handler = task::spawn(async move { + let (watch_tx, watch_rx) = std::sync::mpsc::channel(); - let mut watcher = match RecommendedWatcher::new(watch_tx, Config::default()) { - Ok(watcher) => watcher, - Err(err) => { - tracing::error!("Failed to create watcher: {}", err); - return; - } - }; + let mut watcher = match RecommendedWatcher::new(watch_tx, Config::default()) { + Ok(watcher) => watcher, + Err(err) => { + tracing::error!("Failed to create watcher: {}", err); + return; + } + }; - for path in &file_paths_clone { - if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::Recursive) { - tracing::error!("Failed to watch path {:?}: {}", path, err); - } + for path in &file_paths_clone { + if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::Recursive) { + tracing::error!("Failed to watch path {:?}: {}", path, err); } + } - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - match watch_rx.recv() { - Ok(event) => { - if let Ok(event) = event { - if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = - event.kind - { - tracing::info!("File change detected"); - if let Err(err) = tx.send(()) { - tracing::error!("Failed to send the signal: {}", err); - } + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + match watch_rx.recv() { + Ok(event) => { + if let Ok(event) = event { + if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = + event.kind + { + tracing::info!("File change detected"); + if let Err(err) = tx.send(()) { + tracing::error!("Failed to send the signal: {}", err); } } } - Err(e) => tracing::error!("Watch error: {:?}", e), } + Err(e) => tracing::error!("Watch error: {:?}", e), } - }); + } + }); - let server_handler = task::spawn({ - let config_reader = Arc::clone(&config_reader); - let file_paths = file_paths.clone(); - async move { - let mut rec = Some(&mut rx); - let shown_warning = Arc::new(Mutex::new(false)); - loop { - match config_reader.read_all(&file_paths).await { - Ok(config_module) => { - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); - let server = Server::new(config_module.clone()); - if let Err(err) = server.fork_start(rec.as_deref_mut()).await { - tracing::error!("Failed to start server: {}", err); - } - *shown_warning.lock().await = false; - tracing::info!("Restarting server"); + let server_handler = task::spawn({ + let config_reader = Arc::clone(&config_reader); + let file_paths = file_paths.clone(); + async move { + let mut rec = Some(&mut rx); + let shown_warning = Arc::new(Mutex::new(false)); + loop { + match config_reader.read_all(&file_paths).await { + Ok(config_module) => { + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + let server = Server::new(config_module.clone()); + if let Err(err) = server.fork_start(rec.as_deref_mut()).await { + tracing::error!("Failed to start server: {}", err); } - Err(err) => { - if !*shown_warning.lock().await { - tracing::error!("Failed to read config files: {}", err); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - *shown_warning.lock().await = true; - } + *shown_warning.lock().await = false; + tracing::info!("Restarting server"); + } + Err(err) => { + if !*shown_warning.lock().await { + tracing::error!("Failed to read config files: {}", err); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + *shown_warning.lock().await = true; } } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } - }); - - if let Err(err) = watch_handler.await { - tracing::debug!("Error in watch handler: {}", err); - } - if let Err(err) = server_handler.await { - tracing::debug!("Error in server handler: {}", err); } - }) - .await - .context("Failed to spawn watch server task")?; + }); + + if let Err(err) = watch_handler.await { + tracing::debug!("Error in watch handler: {}", err); + } + if let Err(err) = server_handler.await { + tracing::debug!("Error in server handler: {}", err); + } Ok(()) } From a5323481a56aa10a319467aa4f25999b2b1d306d Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Thu, 15 Aug 2024 19:59:00 +0530 Subject: [PATCH 04/21] use config_error flag and content event for notify Signed-off-by: Sahil Yeole --- src/cli/server/http_server.rs | 6 ++---- src/cli/tc/start.rs | 20 +++++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/cli/server/http_server.rs b/src/cli/server/http_server.rs index 3c2fcf59be..98d7320162 100644 --- a/src/cli/server/http_server.rs +++ b/src/cli/server/http_server.rs @@ -59,16 +59,14 @@ impl Server { if let Some(receiver) = rec { tokio::select! { _ = receiver.recv() => { - tracing::info!("Server shutdown signal received"); runtime.shutdown_background(); - tracing::info!("Server shutdown complete"); } _ = handle => { - tracing::info!("Server completed without shutdown signal"); + tracing::debug!("Server completed without shutdown signal"); } } } else { - handle.await?; + let _ = handle.await?; } Ok(()) } diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index a2e74fb13d..134940b15f 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -1,7 +1,8 @@ +use std::sync::Arc; + use anyhow::{Context, Result}; use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; -use std::sync::Arc; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::broadcast; use tokio::task; use super::helpers::log_endpoint_set; @@ -57,12 +58,13 @@ async fn start_watch_server( } loop { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; match watch_rx.recv() { Ok(event) => { if let Ok(event) = event { - if let notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) = - event.kind + if let notify::EventKind::Modify(notify::event::ModifyKind::Data( + notify::event::DataChange::Content, + )) = event.kind { tracing::info!("File change detected"); if let Err(err) = tx.send(()) { @@ -81,7 +83,7 @@ async fn start_watch_server( let file_paths = file_paths.clone(); async move { let mut rec = Some(&mut rx); - let shown_warning = Arc::new(Mutex::new(false)); + let mut config_error = false; loop { match config_reader.read_all(&file_paths).await { Ok(config_module) => { @@ -91,14 +93,14 @@ async fn start_watch_server( if let Err(err) = server.fork_start(rec.as_deref_mut()).await { tracing::error!("Failed to start server: {}", err); } - *shown_warning.lock().await = false; + config_error = false; tracing::info!("Restarting server"); } Err(err) => { - if !*shown_warning.lock().await { + if !config_error { tracing::error!("Failed to read config files: {}", err); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - *shown_warning.lock().await = true; + config_error = true; } } } From 63fe1c2e02c3f67afbc551844440321e6fd2b60c Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Thu, 15 Aug 2024 20:12:26 +0530 Subject: [PATCH 05/21] refact: start_watch_server Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 137 +++++++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 59 deletions(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 134940b15f..5cb0793637 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -38,73 +38,21 @@ async fn start_watch_server( config_reader: Arc, ) -> Result<()> { let (tx, mut rx) = broadcast::channel(16); - let file_paths_clone = file_paths.clone(); - let watch_handler = task::spawn(async move { - let (watch_tx, watch_rx) = std::sync::mpsc::channel(); - - let mut watcher = match RecommendedWatcher::new(watch_tx, Config::default()) { - Ok(watcher) => watcher, - Err(err) => { - tracing::error!("Failed to create watcher: {}", err); - return; - } - }; - - for path in &file_paths_clone { - if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::Recursive) { - tracing::error!("Failed to watch path {:?}: {}", path, err); - } - } - - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - match watch_rx.recv() { - Ok(event) => { - if let Ok(event) = event { - if let notify::EventKind::Modify(notify::event::ModifyKind::Data( - notify::event::DataChange::Content, - )) = event.kind - { - tracing::info!("File change detected"); - if let Err(err) = tx.send(()) { - tracing::error!("Failed to send the signal: {}", err); - } - } - } - } - Err(e) => tracing::error!("Watch error: {:?}", e), + let watch_handler = task::spawn({ + let file_paths = file_paths.clone(); + async move { + if let Err(err) = watch_files(&file_paths, tx).await { + tracing::error!("Watch handler encountered an error: {}", err); } } }); let server_handler = task::spawn({ let config_reader = Arc::clone(&config_reader); - let file_paths = file_paths.clone(); async move { - let mut rec = Some(&mut rx); - let mut config_error = false; - loop { - match config_reader.read_all(&file_paths).await { - Ok(config_module) => { - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); - let server = Server::new(config_module.clone()); - if let Err(err) = server.fork_start(rec.as_deref_mut()).await { - tracing::error!("Failed to start server: {}", err); - } - config_error = false; - tracing::info!("Restarting server"); - } - Err(err) => { - if !config_error { - tracing::error!("Failed to read config files: {}", err); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - config_error = true; - } - } - } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + if let Err(err) = handle_server(&mut rx, &file_paths, config_reader).await { + tracing::error!("Server handler encountered an error: {}", err); } } }); @@ -115,5 +63,76 @@ async fn start_watch_server( if let Err(err) = server_handler.await { tracing::debug!("Error in server handler: {}", err); } + Ok(()) } + +async fn watch_files(file_paths: &[String], tx: broadcast::Sender<()>) -> Result<()> { + let (watch_tx, watch_rx) = std::sync::mpsc::channel(); + let mut watcher = match RecommendedWatcher::new(watch_tx, Config::default()) { + Ok(watcher) => watcher, + Err(err) => { + tracing::error!("Failed to create watcher: {}", err); + return Ok(()); + } + }; + + for path in file_paths { + if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::Recursive) { + tracing::error!("Failed to watch path {:?}: {}", path, err); + } + } + + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + match watch_rx.recv() { + Ok(event) => { + if let Ok(event) = event { + if let notify::EventKind::Modify(notify::event::ModifyKind::Data( + notify::event::DataChange::Content, + )) = event.kind + { + tracing::info!("File change detected"); + if let Err(err) = tx.send(()) { + tracing::error!("Failed to send the signal: {}", err); + } + } + } + } + Err(e) => tracing::error!("Watch error: {:?}", e), + } + } +} + +async fn handle_server( + rx: &mut broadcast::Receiver<()>, + file_paths: &[String], + config_reader: Arc, +) -> Result<()> { + let mut config_error = false; + + loop { + match config_reader.read_all(file_paths).await { + Ok(config_module) => { + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + + let server = Server::new(config_module.clone()); + if let Err(err) = server.fork_start(Some(rx)).await { + tracing::error!("Failed to start server: {}", err); + } + + config_error = false; + tracing::info!("Server restarted"); + } + Err(err) => { + if !config_error { + tracing::error!("Failed to read config files: {}", err); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + config_error = true; + } + } + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } +} From 15c40b9c58a6a9d29ece66e07c1ff44b9e638577 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 13:35:11 +0530 Subject: [PATCH 06/21] optimize server in watch mode Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 75 +++++++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 40 deletions(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 5cb0793637..2015a48b83 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -33,42 +33,46 @@ pub(super) async fn start_command( Ok(()) } +/// Starts the server in watch mode async fn start_watch_server( file_paths: Vec, config_reader: Arc, ) -> Result<()> { - let (tx, mut rx) = broadcast::channel(16); + let (tx, rx) = broadcast::channel(16); let watch_handler = task::spawn({ - let file_paths = file_paths.clone(); - async move { - if let Err(err) = watch_files(&file_paths, tx).await { - tracing::error!("Watch handler encountered an error: {}", err); - } - } - }); - - let server_handler = task::spawn({ let config_reader = Arc::clone(&config_reader); async move { - if let Err(err) = handle_server(&mut rx, &file_paths, config_reader).await { - tracing::error!("Server handler encountered an error: {}", err); + if let Err(err) = watch_files(&file_paths, tx, rx, config_reader).await { + tracing::error!("Watch handler encountered an error: {}", err); } } }); if let Err(err) = watch_handler.await { - tracing::debug!("Error in watch handler: {}", err); - } - if let Err(err) = server_handler.await { - tracing::debug!("Error in server handler: {}", err); + tracing::error!("Error in watch handler: {}", err); } Ok(()) } -async fn watch_files(file_paths: &[String], tx: broadcast::Sender<()>) -> Result<()> { +/// Watches the file paths for changes +async fn watch_files( + file_paths: &[String], + tx: broadcast::Sender<()>, + mut rx: broadcast::Receiver<()>, + config_reader: Arc, +) -> Result<()> { let (watch_tx, watch_rx) = std::sync::mpsc::channel(); + // fake event to trigger the first server start + watch_tx + .clone() + .send(Ok(notify::event::Event::new( + notify::event::EventKind::Modify(notify::event::ModifyKind::Data( + notify::event::DataChange::Content, + )), + ))) + .unwrap(); let mut watcher = match RecommendedWatcher::new(watch_tx, Config::default()) { Ok(watcher) => watcher, Err(err) => { @@ -92,10 +96,10 @@ async fn watch_files(file_paths: &[String], tx: broadcast::Sender<()>) -> Result notify::event::DataChange::Content, )) = event.kind { - tracing::info!("File change detected"); if let Err(err) = tx.send(()) { tracing::error!("Failed to send the signal: {}", err); } + let _ = handle_server(&mut rx, file_paths, config_reader.clone()).await; } } } @@ -104,35 +108,26 @@ async fn watch_files(file_paths: &[String], tx: broadcast::Sender<()>) -> Result } } +/// Handles the server (in watch mode) +/// Prevents server crashes if config reader fails async fn handle_server( rx: &mut broadcast::Receiver<()>, file_paths: &[String], config_reader: Arc, ) -> Result<()> { - let mut config_error = false; - - loop { - match config_reader.read_all(file_paths).await { - Ok(config_module) => { - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); - - let server = Server::new(config_module.clone()); - if let Err(err) = server.fork_start(Some(rx)).await { - tracing::error!("Failed to start server: {}", err); - } + match config_reader.read_all(file_paths).await { + Ok(config_module) => { + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); - config_error = false; - tracing::info!("Server restarted"); - } - Err(err) => { - if !config_error { - tracing::error!("Failed to read config files: {}", err); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - config_error = true; - } + let server = Server::new(config_module.clone()); + if let Err(err) = server.fork_start(Some(rx)).await { + tracing::error!("Failed to start server: {}", err); } } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + Err(err) => { + tracing::error!("Failed to read config files: {}", err); + } } + Ok(()) } From 1879460f2a1f14e09befffea7525ebab1f2ac960 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 13:40:18 +0530 Subject: [PATCH 07/21] remove separate watch task Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 2015a48b83..8a7dc78a8f 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -40,18 +40,7 @@ async fn start_watch_server( ) -> Result<()> { let (tx, rx) = broadcast::channel(16); - let watch_handler = task::spawn({ - let config_reader = Arc::clone(&config_reader); - async move { - if let Err(err) = watch_files(&file_paths, tx, rx, config_reader).await { - tracing::error!("Watch handler encountered an error: {}", err); - } - } - }); - - if let Err(err) = watch_handler.await { - tracing::error!("Error in watch handler: {}", err); - } + watch_files(&file_paths, tx, rx, config_reader).await?; Ok(()) } @@ -88,7 +77,6 @@ async fn watch_files( } loop { - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; match watch_rx.recv() { Ok(event) => { if let Ok(event) = event { From 02f0b23f7d7fd3e397e1c78a2552cbc94214e858 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 14:43:47 +0530 Subject: [PATCH 08/21] watch for linked files also and add debounce delay Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 46 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 8a7dc78a8f..32903bc203 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -1,9 +1,10 @@ use std::sync::Arc; +use std::time::Duration; use anyhow::{Context, Result}; -use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{Config, EventKind, FsEventWatcher, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::sync::broadcast; -use tokio::task; +use tokio::time::Instant; use super::helpers::log_endpoint_set; use crate::cli::fmt::Fmt; @@ -71,23 +72,42 @@ async fn watch_files( }; for path in file_paths { - if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::Recursive) { + if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::NonRecursive) { tracing::error!("Failed to watch path {:?}: {}", path, err); } } + // Debounce delay to prevent multiple server restarts on a single file change + // https://users.rust-lang.org/t/problem-with-notify-crate-v6-1/99877 + let debounce_duration = Duration::from_secs(1); + let mut last_event_time = Instant::now() - debounce_duration; loop { match watch_rx.recv() { Ok(event) => { if let Ok(event) = event { - if let notify::EventKind::Modify(notify::event::ModifyKind::Data( + if let EventKind::Modify(notify::event::ModifyKind::Data( notify::event::DataChange::Content, )) = event.kind { - if let Err(err) = tx.send(()) { - tracing::error!("Failed to send the signal: {}", err); + let now = Instant::now(); + if now.duration_since(last_event_time) >= debounce_duration { + last_event_time = now; + + if let Err(err) = tx.send(()) { + tracing::error!("Failed to send the signal: {}", err); + } + + if let Err(e) = handle_server( + &mut rx, + file_paths, + config_reader.clone(), + &mut watcher, + ) + .await + { + tracing::error!("Failed to handle server: {}", e); + } } - let _ = handle_server(&mut rx, file_paths, config_reader.clone()).await; } } } @@ -102,9 +122,21 @@ async fn handle_server( rx: &mut broadcast::Receiver<()>, file_paths: &[String], config_reader: Arc, + watcher: &mut FsEventWatcher, ) -> Result<()> { match config_reader.read_all(file_paths).await { Ok(config_module) => { + let links = config_module + .clone() + .links + .iter() + .map(|link| link.src.clone()) + .collect::>(); + for link in links { + if let Err(err) = watcher.watch(link.as_ref(), RecursiveMode::NonRecursive) { + tracing::error!("Failed to watch path {:?}: {}", link, err); + } + } log_endpoint_set(&config_module.extensions().endpoint_set); Fmt::log_n_plus_one(false, config_module.config()); From 1233dea2038c26467ffb9f5dcad61d1920007083 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 14:54:21 +0530 Subject: [PATCH 09/21] fix: first server run sometimes fails Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 32903bc203..bc3f1216bd 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -80,7 +80,8 @@ async fn watch_files( // Debounce delay to prevent multiple server restarts on a single file change // https://users.rust-lang.org/t/problem-with-notify-crate-v6-1/99877 let debounce_duration = Duration::from_secs(1); - let mut last_event_time = Instant::now() - debounce_duration; + // ensures the first server start is not blocked + let mut last_event_time = Instant::now() - (debounce_duration * 2); loop { match watch_rx.recv() { Ok(event) => { From 4b9dd5fe8c47bbee7faa9d235d16ea0fc727c028 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 15:02:12 +0530 Subject: [PATCH 10/21] fix: add delay to let server stop Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index bc3f1216bd..c25100c8b6 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -94,9 +94,12 @@ async fn watch_files( if now.duration_since(last_event_time) >= debounce_duration { last_event_time = now; + // send the signal to the currently running server runtime to shutdown if let Err(err) = tx.send(()) { - tracing::error!("Failed to send the signal: {}", err); + tracing::error!("Failed to stop the server: {}", err); } + // wait for the server to shutdown + tokio::time::sleep(Duration::from_millis(500)).await; if let Err(e) = handle_server( &mut rx, From 75cd209800dff14ffa1d65fe9b7558ef38ebb097 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 15:04:08 +0530 Subject: [PATCH 11/21] update command description Signed-off-by: Sahil Yeole --- src/cli/command.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli/command.rs b/src/cli/command.rs index 4ad847483a..b5c289724e 100644 --- a/src/cli/command.rs +++ b/src/cli/command.rs @@ -26,7 +26,7 @@ pub enum Command { /// separated by spaces if more than one #[arg(required = true)] file_paths: Vec, - /// Watch for changes + /// Watch for file changes #[arg(short, long)] watch: bool, }, From 19f53f095c54ebf22a4e719265fbb67321ff474b Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 15:06:46 +0530 Subject: [PATCH 12/21] increase init last_event_time Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index c25100c8b6..f30b4e3047 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -81,7 +81,7 @@ async fn watch_files( // https://users.rust-lang.org/t/problem-with-notify-crate-v6-1/99877 let debounce_duration = Duration::from_secs(1); // ensures the first server start is not blocked - let mut last_event_time = Instant::now() - (debounce_duration * 2); + let mut last_event_time = Instant::now() - (debounce_duration * 4); loop { match watch_rx.recv() { Ok(event) => { From a27bbda8b778102a8ab282ac077d294c8fca9ceb Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 17 Aug 2024 15:42:44 +0530 Subject: [PATCH 13/21] fix: build Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index f30b4e3047..1663b1c206 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; -use notify::{Config, EventKind, FsEventWatcher, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::fsevent::FsEventWatcher; +use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::sync::broadcast; use tokio::time::Instant; From fe7c562f1ea9d742fa2a640804d2190e42a747d1 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Fri, 23 Aug 2024 11:08:52 +0530 Subject: [PATCH 14/21] fix server and handle relative link paths Signed-off-by: Sahil Yeole --- src/cli/server/http_server.rs | 34 +++++---- src/cli/tc/start.rs | 140 ++++++++++++++++++---------------- src/core/config/reader.rs | 2 +- 3 files changed, 95 insertions(+), 81 deletions(-) diff --git a/src/cli/server/http_server.rs b/src/cli/server/http_server.rs index 98d7320162..e9a2cef16d 100644 --- a/src/cli/server/http_server.rs +++ b/src/cli/server/http_server.rs @@ -1,8 +1,7 @@ +use lazy_static::lazy_static; use std::ops::Deref; -use std::sync::Arc; use anyhow::Result; -use tokio::sync::broadcast; use tokio::sync::oneshot::{self}; use super::http_1::start_http_1; @@ -13,6 +12,13 @@ use crate::cli::CLIError; use crate::core::blueprint::{Blueprint, Http}; use crate::core::config::ConfigModule; +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; + +lazy_static! { + pub static ref RUNTIME: Arc>> = Arc::new(Mutex::new(None)); +} + pub struct Server { config_module: ConfigModule, server_up_sender: Option>, @@ -48,25 +54,21 @@ impl Server { } /// Starts the server in its own multithreaded Runtime - pub async fn fork_start(self, rec: Option<&mut broadcast::Receiver<()>>) -> Result<()> { + pub async fn fork_start(self, watch: bool) -> Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(self.config_module.deref().server.get_workers()) .enable_all() .build()?; - - let handle = runtime.spawn(async { self.start().await }); - - if let Some(receiver) = rec { - tokio::select! { - _ = receiver.recv() => { - runtime.shutdown_background(); - } - _ = handle => { - tracing::debug!("Server completed without shutdown signal"); - } - } + if watch { + RUNTIME + .lock() + .unwrap() + .get_or_insert_with(|| runtime) + .spawn(async { self.start().await }); } else { - let _ = handle.await?; + let result = runtime.spawn(async { self.start().await }).await?; + runtime.shutdown_background(); + return result; } Ok(()) } diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 1663b1c206..b9b5e22a32 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -1,16 +1,18 @@ +use std::path::Path; use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; use notify::fsevent::FsEventWatcher; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; -use tokio::sync::broadcast; use tokio::time::Instant; use super::helpers::log_endpoint_set; use crate::cli::fmt::Fmt; +use crate::cli::server::http_server::RUNTIME; use crate::cli::server::Server; use crate::core::config::reader::ConfigReader; +use crate::core::config::ConfigModule; pub(super) async fn start_command( file_paths: Vec, @@ -18,7 +20,7 @@ pub(super) async fn start_command( config_reader: Arc, ) -> Result<()> { if watch { - start_watch_server(file_paths, config_reader).await?; + start_watch_server(&file_paths, config_reader).await?; } else { let config_module = config_reader .read_all(&file_paths) @@ -28,7 +30,7 @@ pub(super) async fn start_command( Fmt::log_n_plus_one(false, config_module.config()); let server = Server::new(config_module); server - .fork_start(None) + .fork_start(false) .await .context("Failed to start server")?; } @@ -36,24 +38,7 @@ pub(super) async fn start_command( } /// Starts the server in watch mode -async fn start_watch_server( - file_paths: Vec, - config_reader: Arc, -) -> Result<()> { - let (tx, rx) = broadcast::channel(16); - - watch_files(&file_paths, tx, rx, config_reader).await?; - - Ok(()) -} - -/// Watches the file paths for changes -async fn watch_files( - file_paths: &[String], - tx: broadcast::Sender<()>, - mut rx: broadcast::Receiver<()>, - config_reader: Arc, -) -> Result<()> { +async fn start_watch_server(file_paths: &[String], config_reader: Arc) -> Result<()> { let (watch_tx, watch_rx) = std::sync::mpsc::channel(); // fake event to trigger the first server start watch_tx @@ -64,13 +49,8 @@ async fn watch_files( )), ))) .unwrap(); - let mut watcher = match RecommendedWatcher::new(watch_tx, Config::default()) { - Ok(watcher) => watcher, - Err(err) => { - tracing::error!("Failed to create watcher: {}", err); - return Ok(()); - } - }; + let mut watcher = + RecommendedWatcher::new(watch_tx, Config::default()).context("Failed to create watcher")?; for path in file_paths { if let Err(err) = watcher.watch(path.as_ref(), RecursiveMode::NonRecursive) { @@ -95,23 +75,14 @@ async fn watch_files( if now.duration_since(last_event_time) >= debounce_duration { last_event_time = now; - // send the signal to the currently running server runtime to shutdown - if let Err(err) = tx.send(()) { - tracing::error!("Failed to stop the server: {}", err); + tracing::info!("Restarting server"); + if let Some(runtime) = RUNTIME.lock().unwrap().take() { + runtime.shutdown_background(); } - // wait for the server to shutdown - tokio::time::sleep(Duration::from_millis(500)).await; - if let Err(e) = handle_server( - &mut rx, - file_paths, - config_reader.clone(), - &mut watcher, - ) - .await - { - tracing::error!("Failed to handle server: {}", e); - } + handle_watch_server(file_paths, config_reader.clone(), &mut watcher) + .await + .context("Failed to handle watch server")?; } } } @@ -123,36 +94,77 @@ async fn watch_files( /// Handles the server (in watch mode) /// Prevents server crashes if config reader fails -async fn handle_server( - rx: &mut broadcast::Receiver<()>, +async fn handle_watch_server( file_paths: &[String], config_reader: Arc, watcher: &mut FsEventWatcher, ) -> Result<()> { - match config_reader.read_all(file_paths).await { - Ok(config_module) => { - let links = config_module - .clone() - .links - .iter() - .map(|link| link.src.clone()) - .collect::>(); - for link in links { - if let Err(err) = watcher.watch(link.as_ref(), RecursiveMode::NonRecursive) { - tracing::error!("Failed to watch path {:?}: {}", link, err); + if file_paths.len() == 1 { + match config_reader.read_all(file_paths).await { + Ok(config_module) => { + watch_linked_files(file_paths[0].as_str(), config_module.clone(), watcher).await; + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + + let server = Server::new(config_module); + if let Err(err) = server.fork_start(true).await { + tracing::error!("Failed to start server: {}", err); } } - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); - - let server = Server::new(config_module.clone()); - if let Err(err) = server.fork_start(Some(rx)).await { - tracing::error!("Failed to start server: {}", err); + Err(err) => { + tracing::error!("{}", err); } } - Err(err) => { - tracing::error!("Failed to read config files: {}", err); + } else { + // ensure to watch for correct linked files wrt the config file + for file in file_paths { + match config_reader.read(file.as_str()).await { + Ok(config_module) => { + watch_linked_files(file, config_module, watcher).await; + } + Err(err) => { + tracing::error!("Failed to read config files: {}", err); + } + } + } + match config_reader.read_all(file_paths).await { + Ok(config_module) => { + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + + let server = Server::new(config_module); + if let Err(err) = server.fork_start(true).await { + tracing::error!("Failed to start server: {}", err); + } + } + Err(err) => { + tracing::error!("Failed to read config files: {}", err); + } } } Ok(()) } + +async fn watch_linked_files( + file_path: &str, + config_module: ConfigModule, + watcher: &mut FsEventWatcher, +) { + let links = config_module + .links + .iter() + .map(|link| link.src.clone()) + .collect::>(); + for link in links { + let mut link_path = link.clone(); + if let Some(pos) = file_path.rfind('/') { + let root_dir = Path::new(&file_path[..pos]); + link_path = ConfigReader::resolve_path(&link, Some(root_dir)); + } else if let Ok(current_dir) = std::env::current_dir() { + link_path = ConfigReader::resolve_path(&link, Some(current_dir.as_path())); + } + if let Err(err) = watcher.watch(link_path.as_ref(), RecursiveMode::NonRecursive) { + tracing::error!("Failed to watch path {:?}: {}", link, err); + } + } +} diff --git a/src/core/config/reader.rs b/src/core/config/reader.rs index 46eee6107f..453ff4677b 100644 --- a/src/core/config/reader.rs +++ b/src/core/config/reader.rs @@ -228,7 +228,7 @@ impl ConfigReader { /// Checks if path is a URL or absolute path, returns directly if so. /// Otherwise, it joins file path with relative dir path. - fn resolve_path(src: &str, root_dir: Option<&Path>) -> String { + pub fn resolve_path(src: &str, root_dir: Option<&Path>) -> String { if let Ok(url) = Url::parse(src) { url.to_string() } else if Path::new(&src).is_absolute() { From c53458cc9283aced5fc24886bde1cf8dd2831f7a Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Fri, 23 Aug 2024 11:44:18 +0530 Subject: [PATCH 15/21] fix: lint Signed-off-by: Sahil Yeole --- src/cli/server/http_server.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/cli/server/http_server.rs b/src/cli/server/http_server.rs index e9a2cef16d..bd80c1e953 100644 --- a/src/cli/server/http_server.rs +++ b/src/cli/server/http_server.rs @@ -1,7 +1,9 @@ -use lazy_static::lazy_static; use std::ops::Deref; +use std::sync::{Arc, Mutex}; use anyhow::Result; +use lazy_static::lazy_static; +use tokio::runtime::Runtime; use tokio::sync::oneshot::{self}; use super::http_1::start_http_1; @@ -12,9 +14,6 @@ use crate::cli::CLIError; use crate::core::blueprint::{Blueprint, Http}; use crate::core::config::ConfigModule; -use std::sync::{Arc, Mutex}; -use tokio::runtime::Runtime; - lazy_static! { pub static ref RUNTIME: Arc>> = Arc::new(Mutex::new(None)); } From c1409b3816805f5afeb02c5f35f1a223d9c61435 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Fri, 23 Aug 2024 11:54:37 +0530 Subject: [PATCH 16/21] use arc_config_reader in start Signed-off-by: Sahil Yeole --- src/cli/tc/run.rs | 5 +---- src/cli/tc/start.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/cli/tc/run.rs b/src/cli/tc/run.rs index cc2a93b5af..2fed378484 100644 --- a/src/cli/tc/run.rs +++ b/src/cli/tc/run.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use anyhow::Result; use clap::Parser; use convert_case::{Case, Casing}; @@ -38,8 +36,7 @@ pub async fn run() -> Result<()> { async fn run_command(cli: Cli, config_reader: ConfigReader, runtime: TargetRuntime) -> Result<()> { match cli.command { Command::Start { file_paths, watch } => { - let arc_config_reader = Arc::new(config_reader); - start::start_command(file_paths, watch, arc_config_reader).await?; + start::start_command(file_paths, watch, config_reader).await?; } Command::Check { file_paths, n_plus_one_queries, schema, format } => { check::check_command( diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index b9b5e22a32..96564a77a0 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -17,7 +17,7 @@ use crate::core::config::ConfigModule; pub(super) async fn start_command( file_paths: Vec, watch: bool, - config_reader: Arc, + config_reader: ConfigReader, ) -> Result<()> { if watch { start_watch_server(&file_paths, config_reader).await?; @@ -38,7 +38,7 @@ pub(super) async fn start_command( } /// Starts the server in watch mode -async fn start_watch_server(file_paths: &[String], config_reader: Arc) -> Result<()> { +async fn start_watch_server(file_paths: &[String], config_reader: ConfigReader) -> Result<()> { let (watch_tx, watch_rx) = std::sync::mpsc::channel(); // fake event to trigger the first server start watch_tx @@ -63,6 +63,7 @@ async fn start_watch_server(file_paths: &[String], config_reader: Arc { @@ -80,9 +81,13 @@ async fn start_watch_server(file_paths: &[String], config_reader: Arc Date: Fri, 23 Aug 2024 20:43:06 +0530 Subject: [PATCH 17/21] fix: build Signed-off-by: Sahil Yeole --- src/cli/tc/start.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 96564a77a0..823341e9a5 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; -use notify::fsevent::FsEventWatcher; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::time::Instant; @@ -102,7 +101,7 @@ async fn start_watch_server(file_paths: &[String], config_reader: ConfigReader) async fn handle_watch_server( file_paths: &[String], config_reader: Arc, - watcher: &mut FsEventWatcher, + watcher: &mut RecommendedWatcher, ) -> Result<()> { if file_paths.len() == 1 { match config_reader.read_all(file_paths).await { @@ -153,7 +152,7 @@ async fn handle_watch_server( async fn watch_linked_files( file_path: &str, config_module: ConfigModule, - watcher: &mut FsEventWatcher, + watcher: &mut RecommendedWatcher, ) { let links = config_module .links From 1db8662c7b4dc0dc023febd4f08bbf2c6cd3e628 Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Mon, 26 Aug 2024 22:21:00 +0530 Subject: [PATCH 18/21] prevent logs on restart in watch mode Signed-off-by: Sahil Yeole --- src/cli/runtime/file.rs | 6 +++++- src/cli/server/http_1.rs | 6 +++++- src/cli/server/http_2.rs | 6 +++++- src/cli/tc/mod.rs | 2 +- src/cli/tc/start.rs | 28 ++++++++++++++++++++++------ 5 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/cli/runtime/file.rs b/src/cli/runtime/file.rs index 3d9be7ed77..155cb8ae3d 100644 --- a/src/cli/runtime/file.rs +++ b/src/cli/runtime/file.rs @@ -1,5 +1,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use crate::cli::tc::start::PREVENT_LOGS; use crate::cli::CLIError; use crate::core::FileIO; @@ -41,7 +42,10 @@ impl FileIO for NativeFileIO { CLIError::new(format!("Failed to read file: {}", path).as_str()) .description(err.to_string()) })?; - tracing::info!("File read: {} ... ok", path); + let prevent_logs = *PREVENT_LOGS.lock().unwrap(); + if !prevent_logs { + tracing::info!("File read: {} ... ok", path); + } Ok(content) } } diff --git a/src/cli/server/http_1.rs b/src/cli/server/http_1.rs index 22d7d97c96..c0d1e2f8da 100644 --- a/src/cli/server/http_1.rs +++ b/src/cli/server/http_1.rs @@ -4,6 +4,7 @@ use hyper::service::{make_service_fn, service_fn}; use tokio::sync::oneshot; use super::server_config::ServerConfig; +use crate::cli::tc::start::PREVENT_LOGS; use crate::cli::CLIError; use crate::core::async_graphql_hyper::{GraphQLBatchRequest, GraphQLRequest}; use crate::core::http::handle_request; @@ -33,7 +34,10 @@ pub async fn start_http_1( let builder = hyper::Server::try_bind(&addr) .map_err(CLIError::from)? .http1_pipeline_flush(sc.app_ctx.blueprint.server.pipeline_flush); - super::log_launch(sc.as_ref()); + let prevent_logs = *PREVENT_LOGS.lock().unwrap(); + if !prevent_logs { + super::log_launch(sc.as_ref()); + } if let Some(sender) = server_up_sender { sender diff --git a/src/cli/server/http_2.rs b/src/cli/server/http_2.rs index 30ee21b5d1..1c58e1430d 100644 --- a/src/cli/server/http_2.rs +++ b/src/cli/server/http_2.rs @@ -9,6 +9,7 @@ use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use tokio::sync::oneshot; use super::server_config::ServerConfig; +use crate::cli::tc::start::PREVENT_LOGS; use crate::cli::CLIError; use crate::core::async_graphql_hyper::{GraphQLBatchRequest, GraphQLRequest}; use crate::core::http::handle_request; @@ -45,7 +46,10 @@ pub async fn start_http_2( let builder = Server::builder(acceptor).http2_only(true); - super::log_launch(sc.as_ref()); + let prevent_logs = *PREVENT_LOGS.lock().unwrap(); + if !prevent_logs { + super::log_launch(sc.as_ref()); + } if let Some(sender) = server_up_sender { sender diff --git a/src/cli/tc/mod.rs b/src/cli/tc/mod.rs index 952904acce..c3d8c89e8a 100644 --- a/src/cli/tc/mod.rs +++ b/src/cli/tc/mod.rs @@ -3,4 +3,4 @@ mod gen; mod helpers; mod init; pub mod run; -mod start; +pub mod start; diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 823341e9a5..854c24d760 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -1,5 +1,6 @@ +use lazy_static::lazy_static; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{Context, Result}; @@ -13,6 +14,10 @@ use crate::cli::server::Server; use crate::core::config::reader::ConfigReader; use crate::core::config::ConfigModule; +lazy_static! { + pub static ref PREVENT_LOGS: Arc> = Arc::new(Mutex::new(false)); +} + pub(super) async fn start_command( file_paths: Vec, watch: bool, @@ -75,7 +80,13 @@ async fn start_watch_server(file_paths: &[String], config_reader: ConfigReader) if now.duration_since(last_event_time) >= debounce_duration { last_event_time = now; - tracing::info!("Restarting server"); + if !event.paths.is_empty() { + { + let mut prevent_logs = PREVENT_LOGS.lock().unwrap(); + *prevent_logs = true; + } + tracing::info!("Reloaded configuration {:?}", event.paths[0]); + } if let Some(runtime) = RUNTIME.lock().unwrap().take() { runtime.shutdown_background(); } @@ -103,12 +114,15 @@ async fn handle_watch_server( config_reader: Arc, watcher: &mut RecommendedWatcher, ) -> Result<()> { + let prevent_logs = *PREVENT_LOGS.lock().unwrap(); if file_paths.len() == 1 { match config_reader.read_all(file_paths).await { Ok(config_module) => { watch_linked_files(file_paths[0].as_str(), config_module.clone(), watcher).await; - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); + if !prevent_logs { + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + } let server = Server::new(config_module); if let Err(err) = server.fork_start(true).await { @@ -133,8 +147,10 @@ async fn handle_watch_server( } match config_reader.read_all(file_paths).await { Ok(config_module) => { - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); + if !prevent_logs { + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); + } let server = Server::new(config_module); if let Err(err) = server.fork_start(true).await { From 70ad866cde4ab882dcde812894250ef5ff207ebf Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 31 Aug 2024 09:21:53 +0530 Subject: [PATCH 19/21] fix: errors not showing on first run Signed-off-by: Sahil Yeole --- src/cli/server/http_server.rs | 11 +++---- src/cli/tc/start.rs | 57 ++++++++++++++++++++++++----------- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/cli/server/http_server.rs b/src/cli/server/http_server.rs index bd80c1e953..b3dd71209a 100644 --- a/src/cli/server/http_server.rs +++ b/src/cli/server/http_server.rs @@ -59,16 +59,13 @@ impl Server { .enable_all() .build()?; if watch { - RUNTIME - .lock() - .unwrap() - .get_or_insert_with(|| runtime) - .spawn(async { self.start().await }); + let handle = runtime.spawn(async { self.start().await }); + RUNTIME.lock().unwrap().get_or_insert(runtime); + handle.await? } else { let result = runtime.spawn(async { self.start().await }).await?; runtime.shutdown_background(); - return result; + result } - Ok(()) } } diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 854c24d760..9ebd63a7d7 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -1,10 +1,11 @@ -use lazy_static::lazy_static; use std::path::Path; +use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{Context, Result}; -use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use lazy_static::lazy_static; +use notify::{Config, Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::time::Instant; use super::helpers::log_endpoint_set; @@ -61,13 +62,41 @@ async fn start_watch_server(file_paths: &[String], config_reader: ConfigReader) tracing::error!("Failed to watch path {:?}: {}", path, err); } } + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let watch = tokio::spawn(async move { + handle_watch_event(watch_rx, tx).await; + }); + let arc_config_reader = Arc::new(config_reader); + let arc_file_paths = Arc::new(file_paths.to_vec()); + let server = tokio::spawn(async move { + loop { + tokio::select! { + _ = rx.recv() => { + let _ = handle_watch_server( + &arc_file_paths.clone(), + arc_config_reader.clone(), + &mut watcher, + ) + .await; + }}; + } + }); + watch.await?; + server.await?; + Ok(()) +} +/// Handles file change events +/// Stops the server and notifies server task to restart +async fn handle_watch_event( + watch_rx: Receiver>, + tx: tokio::sync::mpsc::Sender<()>, +) { // Debounce delay to prevent multiple server restarts on a single file change // https://users.rust-lang.org/t/problem-with-notify-crate-v6-1/99877 let debounce_duration = Duration::from_secs(1); // ensures the first server start is not blocked let mut last_event_time = Instant::now() - (debounce_duration * 4); - let arc_config_reader = Arc::new(config_reader); loop { match watch_rx.recv() { Ok(event) => { @@ -81,29 +110,21 @@ async fn start_watch_server(file_paths: &[String], config_reader: ConfigReader) last_event_time = now; if !event.paths.is_empty() { - { - let mut prevent_logs = PREVENT_LOGS.lock().unwrap(); - *prevent_logs = true; - } + let mut prevent_logs = PREVENT_LOGS.lock().unwrap(); + *prevent_logs = true; tracing::info!("Reloaded configuration {:?}", event.paths[0]); } if let Some(runtime) = RUNTIME.lock().unwrap().take() { runtime.shutdown_background(); } - - handle_watch_server( - file_paths, - arc_config_reader.clone(), - &mut watcher, - ) - .await - .context("Failed to handle watch server")?; + let _ = tx.send(()).await; } } } } - Err(e) => tracing::error!("Watch error: {:?}", e), + Err(e) => tracing::error!("Error while watching: {:?}", e), } + tokio::time::sleep(Duration::from_millis(100)).await; } } @@ -126,7 +147,9 @@ async fn handle_watch_server( let server = Server::new(config_module); if let Err(err) = server.fork_start(true).await { - tracing::error!("Failed to start server: {}", err); + if !err.to_string().contains("task") { + tracing::error!("Failed to start server: {}", err); + } } } Err(err) => { From cd40becf365495d5f26b7dcaf415856bbc12c8fe Mon Sep 17 00:00:00 2001 From: Sahil Yeole Date: Sat, 31 Aug 2024 09:30:39 +0530 Subject: [PATCH 20/21] fix: lint Signed-off-by: Sahil Yeole --- src/cli/server/http_1.rs | 1 - src/cli/server/http_2.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/cli/server/http_1.rs b/src/cli/server/http_1.rs index 6b44998bbb..1bcf9be112 100644 --- a/src/cli/server/http_1.rs +++ b/src/cli/server/http_1.rs @@ -4,7 +4,6 @@ use hyper::service::{make_service_fn, service_fn}; use tokio::sync::oneshot; use super::server_config::ServerConfig; - use crate::cli::tc::start::PREVENT_LOGS; use crate::core::async_graphql_hyper::{GraphQLBatchRequest, GraphQLRequest}; use crate::core::http::handle_request; diff --git a/src/cli/server/http_2.rs b/src/cli/server/http_2.rs index 5bd924e6a6..a70080df59 100644 --- a/src/cli/server/http_2.rs +++ b/src/cli/server/http_2.rs @@ -9,7 +9,6 @@ use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use tokio::sync::oneshot; use super::server_config::ServerConfig; - use crate::cli::tc::start::PREVENT_LOGS; use crate::core::async_graphql_hyper::{GraphQLBatchRequest, GraphQLRequest}; use crate::core::http::handle_request; From 354d2b7a53d7d55af7afb297ad9a1da700545838 Mon Sep 17 00:00:00 2001 From: Tushar Mathur Date: Sun, 1 Sep 2024 19:53:04 +0530 Subject: [PATCH 21/21] drop prevent logs --- src/cli/runtime/file.rs | 6 +----- src/cli/server/http_1.rs | 6 +----- src/cli/server/http_2.rs | 6 +----- src/cli/tc/start.rs | 24 ++++++------------------ 4 files changed, 9 insertions(+), 33 deletions(-) diff --git a/src/cli/runtime/file.rs b/src/cli/runtime/file.rs index 78af09de05..72a55160c6 100644 --- a/src/cli/runtime/file.rs +++ b/src/cli/runtime/file.rs @@ -1,6 +1,5 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use crate::cli::tc::start::PREVENT_LOGS; use crate::core::{Errata, FileIO}; #[derive(Clone)] @@ -41,10 +40,7 @@ impl FileIO for NativeFileIO { Errata::new(format!("Failed to read file: {}", path).as_str()) .description(err.to_string()) })?; - let prevent_logs = *PREVENT_LOGS.lock().unwrap(); - if !prevent_logs { - tracing::info!("File read: {} ... ok", path); - } + tracing::info!("File read: {} ... ok", path); Ok(content) } } diff --git a/src/cli/server/http_1.rs b/src/cli/server/http_1.rs index 1bcf9be112..76360e860a 100644 --- a/src/cli/server/http_1.rs +++ b/src/cli/server/http_1.rs @@ -4,7 +4,6 @@ use hyper::service::{make_service_fn, service_fn}; use tokio::sync::oneshot; use super::server_config::ServerConfig; -use crate::cli::tc::start::PREVENT_LOGS; use crate::core::async_graphql_hyper::{GraphQLBatchRequest, GraphQLRequest}; use crate::core::http::handle_request; use crate::core::Errata; @@ -34,10 +33,7 @@ pub async fn start_http_1( let builder = hyper::Server::try_bind(&addr) .map_err(Errata::from)? .http1_pipeline_flush(sc.app_ctx.blueprint.server.pipeline_flush); - let prevent_logs = *PREVENT_LOGS.lock().unwrap(); - if !prevent_logs { - super::log_launch(sc.as_ref()); - } + super::log_launch(sc.as_ref()); if let Some(sender) = server_up_sender { sender diff --git a/src/cli/server/http_2.rs b/src/cli/server/http_2.rs index a70080df59..1895789603 100644 --- a/src/cli/server/http_2.rs +++ b/src/cli/server/http_2.rs @@ -9,7 +9,6 @@ use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use tokio::sync::oneshot; use super::server_config::ServerConfig; -use crate::cli::tc::start::PREVENT_LOGS; use crate::core::async_graphql_hyper::{GraphQLBatchRequest, GraphQLRequest}; use crate::core::http::handle_request; use crate::core::Errata; @@ -46,10 +45,7 @@ pub async fn start_http_2( let builder = Server::builder(acceptor).http2_only(true); - let prevent_logs = *PREVENT_LOGS.lock().unwrap(); - if !prevent_logs { - super::log_launch(sc.as_ref()); - } + super::log_launch(sc.as_ref()); if let Some(sender) = server_up_sender { sender diff --git a/src/cli/tc/start.rs b/src/cli/tc/start.rs index 9ebd63a7d7..0027124951 100644 --- a/src/cli/tc/start.rs +++ b/src/cli/tc/start.rs @@ -1,10 +1,9 @@ use std::path::Path; use std::sync::mpsc::Receiver; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; -use lazy_static::lazy_static; use notify::{Config, Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::time::Instant; @@ -15,10 +14,6 @@ use crate::cli::server::Server; use crate::core::config::reader::ConfigReader; use crate::core::config::ConfigModule; -lazy_static! { - pub static ref PREVENT_LOGS: Arc> = Arc::new(Mutex::new(false)); -} - pub(super) async fn start_command( file_paths: Vec, watch: bool, @@ -110,8 +105,6 @@ async fn handle_watch_event( last_event_time = now; if !event.paths.is_empty() { - let mut prevent_logs = PREVENT_LOGS.lock().unwrap(); - *prevent_logs = true; tracing::info!("Reloaded configuration {:?}", event.paths[0]); } if let Some(runtime) = RUNTIME.lock().unwrap().take() { @@ -135,15 +128,13 @@ async fn handle_watch_server( config_reader: Arc, watcher: &mut RecommendedWatcher, ) -> Result<()> { - let prevent_logs = *PREVENT_LOGS.lock().unwrap(); if file_paths.len() == 1 { match config_reader.read_all(file_paths).await { Ok(config_module) => { watch_linked_files(file_paths[0].as_str(), config_module.clone(), watcher).await; - if !prevent_logs { - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); - } + + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); let server = Server::new(config_module); if let Err(err) = server.fork_start(true).await { @@ -170,11 +161,8 @@ async fn handle_watch_server( } match config_reader.read_all(file_paths).await { Ok(config_module) => { - if !prevent_logs { - log_endpoint_set(&config_module.extensions().endpoint_set); - Fmt::log_n_plus_one(false, config_module.config()); - } - + log_endpoint_set(&config_module.extensions().endpoint_set); + Fmt::log_n_plus_one(false, config_module.config()); let server = Server::new(config_module); if let Err(err) = server.fork_start(true).await { tracing::error!("Failed to start server: {}", err);