From 934cfa6d69582ec0a161dee2e950ddd1b039c544 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 19 Nov 2024 14:31:32 +0800 Subject: [PATCH] refactor: support hdfs for backend Signed-off-by: Gaius --- dragonfly-client-backend/src/hdfs.rs | 24 ++++++++++++-------- dragonfly-client-backend/src/http.rs | 31 +++++++++++++++++--------- dragonfly-client-backend/src/lib.rs | 19 ++++++++++++---- dragonfly-client/src/bin/dfget/main.rs | 27 +++++++++++----------- 4 files changed, 63 insertions(+), 38 deletions(-) diff --git a/dragonfly-client-backend/src/hdfs.rs b/dragonfly-client-backend/src/hdfs.rs index bd14cfe4..d705e572 100644 --- a/dragonfly-client-backend/src/hdfs.rs +++ b/dragonfly-client-backend/src/hdfs.rs @@ -23,17 +23,26 @@ use tokio_util::io::StreamReader; use tracing::{error, info, instrument}; use url::Url; -const NAMENODE_DEFAULT_WEB_PORT: u16 = 9870; +/// HDFS_SCHEME is the scheme of the HDFS. +pub const HDFS_SCHEME: &str = "hdfs"; + +/// DEFAULT_NAMENODE_PORT is the default port of the HDFS namenode. +const DEFAULT_NAMENODE_PORT: u16 = 9870; /// Hdfs is a struct that implements the Backend trait. -pub struct Hdfs {} +pub struct Hdfs { + /// scheme is the scheme of the HDFS. + scheme: String, +} /// Hdfs implements the Backend trait. impl Hdfs { /// new returns a new HDFS backend. #[instrument(skip_all)] pub fn new() -> Self { - Self {} + Self { + scheme: HDFS_SCHEME.to_string(), + } } /// operator initializes the operator with the parsed URL and HDFS config. @@ -44,7 +53,7 @@ impl Hdfs { .host_str() .ok_or_else(|| ClientError::InvalidURI(url.to_string()))? .to_string(); - let port = url.port().unwrap_or(NAMENODE_DEFAULT_WEB_PORT); + let port = url.port().unwrap_or(DEFAULT_NAMENODE_PORT); // Initialize the HDFS operator. let mut builder = opendal::services::Webhdfs::default(); @@ -54,7 +63,7 @@ impl Hdfs { // If HDFS config is not None, set the config for builder. if let Some(config) = config { - if let Some(delegation_token) = config.delegation_token { + if let Some(delegation_token) = &config.delegation_token { builder = builder.delegation(delegation_token.as_str()); } } @@ -69,7 +78,7 @@ impl super::Backend for Hdfs { /// scheme returns the scheme of the HDFS backend. #[instrument(skip_all)] fn scheme(&self) -> String { - "hdfs".to_string() + self.scheme.clone() } /// head gets the header of the request. @@ -239,9 +248,6 @@ mod tests { let url: Url = Url::parse("hdfs://127.0.0.1:9870/file").unwrap(); let operator = Hdfs::new().operator(url, None); - // If HDFS is running on localhost, the following code can be used to check the operator. - // operator.unwrap().check().await.unwrap(); - assert!( operator.is_ok(), "can not get hdfs operator, due to: {}", diff --git a/dragonfly-client-backend/src/http.rs b/dragonfly-client-backend/src/http.rs index 51d8c123..eb217ced 100644 --- a/dragonfly-client-backend/src/http.rs +++ b/dragonfly-client-backend/src/http.rs @@ -22,6 +22,12 @@ use std::io::{Error as IOError, ErrorKind}; use tokio_util::io::StreamReader; use tracing::{debug, error, instrument}; +// HTTP_SCHEME is the HTTP scheme. +pub const HTTP_SCHEME: &str = "http"; + +// HTTPS_SCHEME is the HTTPS scheme. +pub const HTTPS_SCHEME: &str = "https"; + /// HTTP is the HTTP backend. pub struct HTTP { /// scheme is the scheme of the HTTP backend. @@ -173,13 +179,16 @@ impl super::Backend for HTTP { impl Default for HTTP { /// default returns a new default HTTP. fn default() -> Self { - Self::new("http") + Self::new(HTTP_SCHEME) } } #[cfg(test)] mod tests { - use crate::{http::HTTP, Backend, GetRequest, HeadRequest}; + use crate::{ + http::{HTTP, HTTPS_SCHEME, HTTP_SCHEME}, + Backend, GetRequest, HeadRequest, + }; use dragonfly_client_util::tls::{load_certs_from_pem, load_key_from_pem}; use hyper_util::rt::{TokioExecutor, TokioIo}; use reqwest::{header::HeaderMap, StatusCode}; @@ -348,7 +357,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= .mount(&server) .await; - let resp = HTTP::new("http") + let resp = HTTP::new(HTTP_SCHEME) .head(HeadRequest { task_id: "test".to_string(), url: format!("{}/head", server.uri()), @@ -376,7 +385,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= .mount(&server) .await; - let resp = HTTP::new("http") + let resp = HTTP::new(HTTP_SCHEME) .head(HeadRequest { task_id: "test".to_string(), url: format!("{}/head", server.uri()), @@ -404,7 +413,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= .mount(&server) .await; - let mut resp = HTTP::new("http") + let mut resp = HTTP::new(HTTP_SCHEME) .get(GetRequest { task_id: "test".to_string(), piece_id: "test".to_string(), @@ -426,7 +435,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= #[tokio::test] async fn should_get_head_response_with_self_signed_cert() { let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await; - let resp = HTTP::new("https") + let resp = HTTP::new(HTTPS_SCHEME) .head(HeadRequest { task_id: "test".to_string(), url: server_addr, @@ -445,7 +454,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= #[tokio::test] async fn should_return_error_response_when_head_with_wrong_cert() { let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await; - let resp = HTTP::new("https") + let resp = HTTP::new(HTTPS_SCHEME) .head(HeadRequest { task_id: "test".to_string(), url: server_addr, @@ -463,7 +472,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= #[tokio::test] async fn should_get_response_with_self_signed_cert() { let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await; - let mut resp = HTTP::new("https") + let mut resp = HTTP::new(HTTPS_SCHEME) .get(GetRequest { task_id: "test".to_string(), piece_id: "test".to_string(), @@ -485,7 +494,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= #[tokio::test] async fn should_return_error_response_when_get_with_wrong_cert() { let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await; - let resp = HTTP::new("https") + let resp = HTTP::new(HTTPS_SCHEME) .get(GetRequest { task_id: "test".to_string(), piece_id: "test".to_string(), @@ -505,7 +514,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= #[tokio::test] async fn should_get_head_response_with_no_verifier() { let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await; - let resp = HTTP::new("https") + let resp = HTTP::new(HTTPS_SCHEME) .head(HeadRequest { task_id: "test".to_string(), url: server_addr, @@ -524,7 +533,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k= #[tokio::test] async fn should_get_response_with_no_verifier() { let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await; - let http_backend = HTTP::new("https"); + let http_backend = HTTP::new(HTTPS_SCHEME); let mut resp = http_backend .get(GetRequest { task_id: "test".to_string(), diff --git a/dragonfly-client-backend/src/lib.rs b/dragonfly-client-backend/src/lib.rs index 72627454..63317c18 100644 --- a/dragonfly-client-backend/src/lib.rs +++ b/dragonfly-client-backend/src/lib.rs @@ -23,6 +23,7 @@ use libloading::Library; use reqwest::header::HeaderMap; use rustls_pki_types::CertificateDer; use std::path::Path; +use std::str::FromStr; use std::{collections::HashMap, pin::Pin, time::Duration}; use std::{fmt::Debug, fs}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -227,6 +228,12 @@ impl BackendFactory { Ok(backend_factory) } + /// supported_download_directory returns whether the scheme supports directory download. + #[instrument(skip_all)] + pub fn supported_download_directory(scheme: &str) -> bool { + object_storage::Scheme::from_str(scheme).is_ok() || scheme == hdfs::HDFS_SCHEME + } + /// build returns the backend by the scheme of the url. #[instrument(skip_all)] pub fn build(&self, url: &str) -> Result<&(dyn Backend + Send + Sync)> { @@ -241,12 +248,16 @@ impl BackendFactory { /// load_builtin_backends loads the builtin backends. #[instrument(skip_all)] fn load_builtin_backends(&mut self) { - self.backends - .insert("http".to_string(), Box::new(http::HTTP::new("http"))); + self.backends.insert( + "http".to_string(), + Box::new(http::HTTP::new(http::HTTP_SCHEME)), + ); info!("load [http] builtin backend"); - self.backends - .insert("https".to_string(), Box::new(http::HTTP::new("https"))); + self.backends.insert( + "https".to_string(), + Box::new(http::HTTP::new(http::HTTPS_SCHEME)), + ); info!("load [https] builtin backend"); self.backends.insert( diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 005fc24f..d11fb0c3 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -25,7 +25,7 @@ use dragonfly_client::metrics::{ collect_backend_request_started_metrics, }; use dragonfly_client::tracing::init_tracing; -use dragonfly_client_backend::{object_storage, BackendFactory, DirEntry, HeadRequest}; +use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry, HeadRequest}; use dragonfly_client_config::VersionValueParser; use dragonfly_client_config::{self, dfdaemon, dfget}; use dragonfly_client_core::error::{BackendError, ErrorType, OrErr}; @@ -55,6 +55,9 @@ The full documentation is here: https://d7y.io/docs/next/reference/commands/clie Examples: # Download a file from HTTP server. $ dfget https://:/ -O /tmp/file.txt + + # Download a file from HDFS. + $ dfget hdfs://:/ -O /tmp/file.txt --hdfs-delegation-token= # Download a file from Amazon Simple Storage Service(S3). $ dfget s3:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= @@ -204,7 +207,7 @@ struct Args { long, help = "Specify the delegation token for Hadoop Distributed File System(HDFS)" )] - storage_delegation_token: Option, + hdfs_delegation_token: Option, #[arg( long, @@ -574,24 +577,19 @@ async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) - // then download all files in the directory. Otherwise, download the single file. let scheme = args.url.scheme(); if args.url.path().ends_with('/') { - if !is_support_directory_download(scheme) { + if !BackendFactory::supported_download_directory(scheme) { return Err(Error::Unsupported(format!("{} download directory", scheme))); }; + return download_dir(args, dfdaemon_download_client).await; }; download(args, ProgressBar::new(0), dfdaemon_download_client).await } -/// is_support_directory_download checks whether the scheme supports directory download. -fn is_support_directory_download(scheme: &str) -> bool { - // Only object storage protocol and hdfs protocol supports directory download. - object_storage::Scheme::from_str(scheme).is_ok() || scheme == "hdfs" -} - /// download_dir downloads all files in the directory. async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Result<()> { - // Initialize the object storage and the hdfs. + // Initialize the object storage config and the hdfs config. let object_storage = Some(ObjectStorage { access_key_id: args.storage_access_key_id.clone(), access_key_secret: args.storage_access_key_secret.clone(), @@ -601,8 +599,9 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re credential_path: args.storage_credential_path.clone(), predefined_acl: args.storage_predefined_acl.clone(), }); + let hdfs = Some(Hdfs { - delegation_token: args.storage_delegation_token.clone(), + delegation_token: args.hdfs_delegation_token.clone(), }); // Get all entries in the directory. If the directory is empty, then return directly. @@ -702,10 +701,10 @@ async fn download( Err(_) => None, }; - // Only initialize hdfs when the scheme is HDFS protocol. + // Only initialize HDFS when the scheme is HDFS protocol. let hdfs = match args.url.scheme() { - "hdfs" => Some(Hdfs { - delegation_token: args.storage_delegation_token.clone(), + hdfs::HDFS_SCHEME => Some(Hdfs { + delegation_token: args.hdfs_delegation_token.clone(), }), _ => None, };