Skip to content

Commit

Permalink
fix: barrier blocks thread (#858)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 22, 2024
1 parent 84f0b30 commit 574f376
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 35 deletions.
41 changes: 21 additions & 20 deletions dragonfly-client/src/bin/dfdaemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ use dragonfly_client_storage::Storage;
use dragonfly_client_util::id_generator::IDGenerator;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Barrier};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tracing::{error, info, Level};

#[cfg(not(target_env = "msvc"))]
Expand Down Expand Up @@ -297,24 +298,6 @@ async fn main() -> Result<(), anyhow::Error> {

// Wait for servers to exit or shutdown signal.
tokio::select! {
_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_upload_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon upload grpc server failed: {}", err));
})
} => {
info!("dfdaemon upload grpc server exited");
},

_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_download_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon download grpc server failed: {}", err));
})
} => {
info!("dfdaemon download grpc unix server exited");
},

_ = tokio::spawn(async move { dynconfig.run().await }) => {
info!("dynconfig manager exited");
},
Expand Down Expand Up @@ -343,11 +326,29 @@ async fn main() -> Result<(), anyhow::Error> {
info!("garbage collector exited");
},

_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_upload_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon upload grpc server failed: {}", err));
})
} => {
info!("dfdaemon upload grpc server exited");
},

_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
dfdaemon_download_grpc.run(barrier).await.unwrap_or_else(|err| error!("dfdaemon download grpc server failed: {}", err));
})
} => {
info!("dfdaemon download grpc unix server exited");
},

_ = {
let barrier = grpc_server_started_barrier.clone();
tokio::spawn(async move {
// Wait for grpc server started.
barrier.wait();
barrier.wait().await;
proxy.run().await.unwrap_or_else(|err| error!("proxy server failed: {}", err));
})
} => {
Expand Down
18 changes: 8 additions & 10 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ use dragonfly_client_util::{
};
use hyper_util::rt::TokioIo;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Barrier};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream};
use tonic::{
transport::{Channel, Endpoint, Server, Uri},
Expand Down Expand Up @@ -123,18 +124,12 @@ impl DfdaemonDownloadServer {
.await;

// Start download grpc server with unix domain socket.
info!(
"download server listening on {}",
self.socket_path.display()
);
fs::create_dir_all(self.socket_path.parent().unwrap()).await?;
let uds = UnixListener::bind(&self.socket_path)?;
let uds_stream = UnixListenerStream::new(uds);

let server = Server::builder()
.tcp_nodelay(true)
.max_frame_size(super::MAX_FRAME_SIZE)
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
.initial_stream_window_size(super::INITIAL_WINDOW_SIZE)
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
.http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT))
Expand All @@ -148,9 +143,13 @@ impl DfdaemonDownloadServer {
});

// Notify the grpc server is started.
grpc_server_started_barrier.wait();
grpc_server_started_barrier.wait().await;

// Wait for the download grpc server to shutdown.
info!(
"download server listening on {}",
self.socket_path.display()
);
server.await?;

// Remove the unix domain socket file.
Expand Down Expand Up @@ -994,7 +993,6 @@ impl DfdaemonDownloadClient {
// Ignore the uri because it is not used.
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.tcp_nodelay(true)
.buffer_size(super::BUFFER_SIZE)
.connect_timeout(super::CONNECT_TIMEOUT)
.timeout(super::REQUEST_TIMEOUT)
Expand Down
8 changes: 5 additions & 3 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ use dragonfly_client_core::{
use dragonfly_client_util::http::{get_range, hashmap_to_headermap, headermap_to_hashmap};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Barrier};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::sync::Barrier;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{
transport::{Channel, Server},
Expand Down Expand Up @@ -121,13 +122,13 @@ impl DfdaemonUploadServer {
.await;

// Start upload grpc server.
info!("upload server listening on {}", self.addr);
let mut server_builder = Server::builder();
if let Ok(Some(server_tls_config)) =
self.config.upload.server.load_server_tls_config().await
{
server_builder = server_builder.tls_config(server_tls_config)?;
}

let server = server_builder
.max_frame_size(super::MAX_FRAME_SIZE)
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
Expand All @@ -142,9 +143,10 @@ impl DfdaemonUploadServer {
});

// Notify the grpc server is started.
grpc_server_started_barrier.wait();
grpc_server_started_barrier.wait().await;

// Wait for the upload grpc server to shutdown.
info!("upload server listening on {}", self.addr);
Ok(server.await?)
}
}
Expand Down
6 changes: 4 additions & 2 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ use dragonfly_client_util::{
};
use reqwest::header::HeaderMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant};
use tokio::sync::{
mpsc::{self, Sender},
Expand Down

0 comments on commit 574f376

Please sign in to comment.