diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index c1539240..a1556951 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -36,8 +36,9 @@ use dragonfly_client_storage::Storage; use dragonfly_client_util::id_generator::IDGenerator; use std::net::SocketAddr; use std::path::PathBuf; -use std::sync::{Arc, Barrier}; +use std::sync::Arc; use tokio::sync::mpsc; +use tokio::sync::Barrier; use tracing::{error, info, Level}; #[cfg(not(target_env = "msvc"))] @@ -297,24 +298,6 @@ async fn main() -> Result<(), anyhow::Error> { // Wait for servers to exit or shutdown signal. tokio::select! { - _ = { - let barrier = grpc_server_started_barrier.clone(); - tokio::spawn(async move { - dfdaemon_upload_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon upload grpc server failed: {}", err)); - }) - } => { - info!("dfdaemon upload grpc server exited"); - }, - - _ = { - let barrier = grpc_server_started_barrier.clone(); - tokio::spawn(async move { - dfdaemon_download_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon download grpc server failed: {}", err)); - }) - } => { - info!("dfdaemon download grpc unix server exited"); - }, - _ = tokio::spawn(async move { dynconfig.run().await }) => { info!("dynconfig manager exited"); }, @@ -343,11 +326,29 @@ async fn main() -> Result<(), anyhow::Error> { info!("garbage collector exited"); }, + _ = { + let barrier = grpc_server_started_barrier.clone(); + tokio::spawn(async move { + dfdaemon_upload_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon upload grpc server failed: {}", err)); + }) + } => { + info!("dfdaemon upload grpc server exited"); + }, + + _ = { + let barrier = grpc_server_started_barrier.clone(); + tokio::spawn(async move { + dfdaemon_download_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon download grpc server failed: {}", err)); + }) + } => { + info!("dfdaemon download grpc unix server exited"); + }, + _ = { let barrier = grpc_server_started_barrier.clone(); tokio::spawn(async move { // Wait for grpc server started. - barrier.wait(); + barrier.wait().await; proxy.run().await.unwrap_or_else(|err| error!("proxy server failed: {}", err)); }) } => { diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 09086bb2..34b76f4e 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -47,11 +47,12 @@ use dragonfly_client_util::{ }; use hyper_util::rt::TokioIo; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Barrier}; +use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::fs; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::mpsc; +use tokio::sync::Barrier; use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream}; use tonic::{ transport::{Channel, Endpoint, Server, Uri}, @@ -123,18 +124,12 @@ impl DfdaemonDownloadServer { .await; // Start download grpc server with unix domain socket. - info!( - "download server listening on {}", - self.socket_path.display() - ); fs::create_dir_all(self.socket_path.parent().unwrap()).await?; let uds = UnixListener::bind(&self.socket_path)?; let uds_stream = UnixListenerStream::new(uds); + let server = Server::builder() - .tcp_nodelay(true) .max_frame_size(super::MAX_FRAME_SIZE) - .initial_connection_window_size(super::INITIAL_WINDOW_SIZE) - .initial_stream_window_size(super::INITIAL_WINDOW_SIZE) .tcp_keepalive(Some(super::TCP_KEEPALIVE)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) @@ -148,9 +143,13 @@ impl DfdaemonDownloadServer { }); // Notify the grpc server is started. - grpc_server_started_barrier.wait(); + grpc_server_started_barrier.wait().await; // Wait for the download grpc server to shutdown. + info!( + "download server listening on {}", + self.socket_path.display() + ); server.await?; // Remove the unix domain socket file. @@ -994,7 +993,6 @@ impl DfdaemonDownloadClient { // Ignore the uri because it is not used. let channel = Endpoint::try_from("http://[::]:50051") .unwrap() - .tcp_nodelay(true) .buffer_size(super::BUFFER_SIZE) .connect_timeout(super::CONNECT_TIMEOUT) .timeout(super::REQUEST_TIMEOUT) diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 787b0b5f..e792e6f3 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -41,10 +41,11 @@ use dragonfly_client_core::{ use dragonfly_client_util::http::{get_range, hashmap_to_headermap, headermap_to_hashmap}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Barrier}; +use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::AsyncReadExt; use tokio::sync::mpsc; +use tokio::sync::Barrier; use tokio_stream::wrappers::ReceiverStream; use tonic::{ transport::{Channel, Server}, @@ -121,13 +122,13 @@ impl DfdaemonUploadServer { .await; // Start upload grpc server. - info!("upload server listening on {}", self.addr); let mut server_builder = Server::builder(); if let Ok(Some(server_tls_config)) = self.config.upload.server.load_server_tls_config().await { server_builder = server_builder.tls_config(server_tls_config)?; } + let server = server_builder .max_frame_size(super::MAX_FRAME_SIZE) .initial_connection_window_size(super::INITIAL_WINDOW_SIZE) @@ -142,9 +143,10 @@ impl DfdaemonUploadServer { }); // Notify the grpc server is started. - grpc_server_started_barrier.wait(); + grpc_server_started_barrier.wait().await; // Wait for the upload grpc server to shutdown. + info!("upload server listening on {}", self.addr); Ok(server.await?) } } diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 52d7abab..6b982ce1 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -49,8 +49,10 @@ use dragonfly_client_util::{ }; use reqwest::header::HeaderMap; use std::path::Path; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; use std::time::{Duration, Instant}; use tokio::sync::{ mpsc::{self, Sender},