Skip to content

Commit

Permalink
feat: proxy reuses the dfdaemon_download_client (#719)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Sep 5, 2024
1 parent 589fa1a commit 232ee8a
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 46 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.106"
version = "0.1.107"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.106" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.106" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.106" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.106" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.106" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.106" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.106" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.107" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.107" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.107" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.107" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.107" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.107" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.107" }
thiserror = "1.0"
dragonfly-api = "=2.0.154"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
8 changes: 4 additions & 4 deletions dragonfly-client/src/bin/dfdaemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,6 @@ async fn main() -> Result<(), anyhow::Error> {
info!("stats server exited");
},

_ = tokio::spawn(async move { proxy.run().await }) => {
info!("proxy server exited");
},

_ = tokio::spawn(async move { manager_announcer.run().await.unwrap_or_else(|err| error!("announcer manager failed: {}", err))}) => {
info!("announcer manager exited");
},
Expand All @@ -318,6 +314,10 @@ async fn main() -> Result<(), anyhow::Error> {
info!("dfdaemon download grpc unix server exited");
},

_ = tokio::spawn(async move { proxy.run().await }) => {
info!("proxy server exited");
},

_ = tokio::spawn(async move { gc.run().await }) => {
info!("garbage collector exited");
},
Expand Down
1 change: 0 additions & 1 deletion dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl DfdaemonDownloadServer {
.max_frame_size(super::MAX_FRAME_SIZE)
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
.initial_stream_window_size(super::INITIAL_WINDOW_SIZE)
.concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION)
.add_service(reflection.clone())
.add_service(health_service)
.add_service(self.service.clone())
Expand Down
1 change: 0 additions & 1 deletion dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl DfdaemonUploadServer {
.max_frame_size(super::MAX_FRAME_SIZE)
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
.initial_stream_window_size(super::INITIAL_WINDOW_SIZE)
.concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION)
.add_service(reflection.clone())
.add_service(health_service)
.add_service(self.service.clone())
Expand Down
3 changes: 0 additions & 3 deletions dragonfly-client/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
// REQUEST_TIMEOUT is the timeout for GRPC requests.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);

// CONCURRENCY_LIMIT_PER_CONNECTION is the limit of concurrency for each connection.
pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 8192;

// TCP_KEEPALIVE is the keepalive duration for TCP connection.
pub const TCP_KEEPALIVE: Duration = Duration::from_secs(3600);

Expand Down
95 changes: 74 additions & 21 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ impl Proxy {

let config = self.config.clone();
let task = self.task.clone();
let dfdaemon_download_client = DfdaemonDownloadClient::new_unix(
config.download.server.socket_path.clone(),
).await?;

let registry_certs = self.registry_certs.clone();
let server_ca_cert = self.server_ca_cert.clone();
tokio::task::spawn(async move {
Expand All @@ -183,7 +187,7 @@ impl Proxy {
.title_case_headers(true)
.serve_connection(
io,
service_fn(move |request| handler(config.clone(), task.clone(), request, registry_certs.clone(), server_ca_cert.clone())),
service_fn(move |request| handler(config.clone(), task.clone(), request, dfdaemon_download_client.clone(), registry_certs.clone(), server_ca_cert.clone())),
)
.with_upgrades()
.await
Expand All @@ -209,6 +213,7 @@ pub async fn handler(
config: Arc<Config>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
registry_certs: Arc<Option<Vec<CertificateDer<'static>>>>,
server_ca_cert: Arc<Option<Certificate>>,
) -> ClientResult<Response> {
Expand All @@ -224,13 +229,21 @@ pub async fn handler(
config,
task,
request,
dfdaemon_download_client,
registry_certs,
server_ca_cert,
)
.await;
}

return registry_mirror_http_handler(config, task, request, registry_certs).await;
return registry_mirror_http_handler(
config,
task,
request,
dfdaemon_download_client,
registry_certs,
)
.await;
}

// Span record the uri and method.
Expand All @@ -239,10 +252,25 @@ pub async fn handler(

// Handle CONNECT request.
if Method::CONNECT == request.method() {
return https_handler(config, task, request, registry_certs, server_ca_cert).await;
return https_handler(
config,
task,
request,
dfdaemon_download_client,
registry_certs,
server_ca_cert,
)
.await;
}

http_handler(config, task, request, registry_certs).await
http_handler(
config,
task,
request,
dfdaemon_download_client,
registry_certs,
)
.await
}

// registry_mirror_http_handler handles the http request for the registry mirror by client.
Expand All @@ -251,10 +279,18 @@ pub async fn registry_mirror_http_handler(
config: Arc<Config>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
registry_certs: Arc<Option<Vec<CertificateDer<'static>>>>,
) -> ClientResult<Response> {
let request = make_registry_mirror_request(config.clone(), request)?;
return http_handler(config, task, request, registry_certs).await;
return http_handler(
config,
task,
request,
dfdaemon_download_client,
registry_certs,
)
.await;
}

// registry_mirror_https_handler handles the https request for the registry mirror by client.
Expand All @@ -263,11 +299,20 @@ pub async fn registry_mirror_https_handler(
config: Arc<Config>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
registry_certs: Arc<Option<Vec<CertificateDer<'static>>>>,
server_ca_cert: Arc<Option<Certificate>>,
) -> ClientResult<Response> {
let request = make_registry_mirror_request(config.clone(), request)?;
return https_handler(config, task, request, registry_certs, server_ca_cert).await;
return https_handler(
config,
task,
request,
dfdaemon_download_client,
registry_certs,
server_ca_cert,
)
.await;
}

// http_handler handles the http request by client.
Expand All @@ -276,6 +321,7 @@ pub async fn http_handler(
config: Arc<Config>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
registry_certs: Arc<Option<Vec<CertificateDer<'static>>>>,
) -> ClientResult<Response> {
info!("handle HTTP request: {:?}", request);
Expand All @@ -290,7 +336,14 @@ pub async fn http_handler(
request.method(),
request_uri
);
return proxy_by_dfdaemon(config, task, rule.clone(), request).await;
return proxy_by_dfdaemon(
config,
task,
rule.clone(),
request,
dfdaemon_download_client,
)
.await;
}

if request.uri().scheme().cloned() == Some(http::uri::Scheme::HTTPS) {
Expand All @@ -316,6 +369,7 @@ pub async fn https_handler(
config: Arc<Config>,
task: Arc<Task>,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
registry_certs: Arc<Option<Vec<CertificateDer<'static>>>>,
server_ca_cert: Arc<Option<Certificate>>,
) -> ClientResult<Response> {
Expand All @@ -332,6 +386,7 @@ pub async fn https_handler(
task,
upgraded,
host,
dfdaemon_download_client,
registry_certs,
server_ca_cert,
)
Expand Down Expand Up @@ -359,6 +414,7 @@ async fn upgraded_tunnel(
task: Arc<Task>,
upgraded: Upgraded,
host: String,
dfdaemon_download_client: DfdaemonDownloadClient,
registry_certs: Arc<Option<Vec<CertificateDer<'static>>>>,
server_ca_cert: Arc<Option<Certificate>>,
) -> ClientResult<()> {
Expand Down Expand Up @@ -399,6 +455,7 @@ async fn upgraded_tunnel(
task.clone(),
host.clone(),
request,
dfdaemon_download_client.clone(),
registry_certs.clone(),
)
}),
Expand All @@ -419,6 +476,7 @@ pub async fn upgraded_handler(
task: Arc<Task>,
host: String,
mut request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
registry_certs: Arc<Option<Vec<CertificateDer<'static>>>>,
) -> ClientResult<Response> {
// Span record the uri and method.
Expand All @@ -442,7 +500,14 @@ pub async fn upgraded_handler(
request.method(),
request_uri
);
return proxy_by_dfdaemon(config, task, rule.clone(), request).await;
return proxy_by_dfdaemon(
config,
task,
rule.clone(),
request,
dfdaemon_download_client,
)
.await;
}

if request.uri().scheme().cloned() == Some(http::uri::Scheme::HTTPS) {
Expand All @@ -469,20 +534,8 @@ async fn proxy_by_dfdaemon(
task: Arc<Task>,
rule: Rule,
request: Request<hyper::body::Incoming>,
dfdaemon_download_client: DfdaemonDownloadClient,
) -> ClientResult<Response> {
// Initialize the dfdaemon download client.
let dfdaemon_download_client =
match DfdaemonDownloadClient::new_unix(config.download.server.socket_path.clone()).await {
Ok(client) => client,
Err(err) => {
error!("create dfdaemon download client failed: {}", err);
return Ok(make_error_response(
http::StatusCode::INTERNAL_SERVER_ERROR,
None,
));
}
};

// Make the download task request.
let download_task_request = match make_download_task_request(config.clone(), rule, request) {
Ok(download_task_request) => download_task_request,
Expand Down

0 comments on commit 232ee8a

Please sign in to comment.