Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: support hdfs for backend #852

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions dragonfly-client-backend/src/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,26 @@ use tokio_util::io::StreamReader;
use tracing::{error, info, instrument};
use url::Url;

const NAMENODE_DEFAULT_WEB_PORT: u16 = 9870;
/// HDFS_SCHEME is the scheme of the HDFS.
pub const HDFS_SCHEME: &str = "hdfs";

/// DEFAULT_NAMENODE_PORT is the default port of the HDFS namenode.
const DEFAULT_NAMENODE_PORT: u16 = 9870;

/// Hdfs is a struct that implements the Backend trait.
pub struct Hdfs {}
pub struct Hdfs {
/// scheme is the scheme of the HDFS.
scheme: String,
}

/// Hdfs implements the Backend trait.
impl Hdfs {
/// new returns a new HDFS backend.
#[instrument(skip_all)]
pub fn new() -> Self {
Self {}
Self {
scheme: HDFS_SCHEME.to_string(),
}
}

/// operator initializes the operator with the parsed URL and HDFS config.
Expand All @@ -44,7 +53,7 @@ impl Hdfs {
.host_str()
.ok_or_else(|| ClientError::InvalidURI(url.to_string()))?
.to_string();
let port = url.port().unwrap_or(NAMENODE_DEFAULT_WEB_PORT);
let port = url.port().unwrap_or(DEFAULT_NAMENODE_PORT);

// Initialize the HDFS operator.
let mut builder = opendal::services::Webhdfs::default();
Expand All @@ -54,7 +63,7 @@ impl Hdfs {

// If HDFS config is not None, set the config for builder.
if let Some(config) = config {
if let Some(delegation_token) = config.delegation_token {
if let Some(delegation_token) = &config.delegation_token {
builder = builder.delegation(delegation_token.as_str());
}
}
Expand All @@ -69,7 +78,7 @@ impl super::Backend for Hdfs {
/// scheme returns the scheme of the HDFS backend.
#[instrument(skip_all)]
fn scheme(&self) -> String {
"hdfs".to_string()
self.scheme.clone()
}

/// head gets the header of the request.
Expand Down Expand Up @@ -239,9 +248,6 @@ mod tests {
let url: Url = Url::parse("hdfs://127.0.0.1:9870/file").unwrap();
let operator = Hdfs::new().operator(url, None);

// If HDFS is running on localhost, the following code can be used to check the operator.
// operator.unwrap().check().await.unwrap();

assert!(
operator.is_ok(),
"can not get hdfs operator, due to: {}",
Expand Down
31 changes: 20 additions & 11 deletions dragonfly-client-backend/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ use std::io::{Error as IOError, ErrorKind};
use tokio_util::io::StreamReader;
use tracing::{debug, error, instrument};

// HTTP_SCHEME is the HTTP scheme.
pub const HTTP_SCHEME: &str = "http";

// HTTPS_SCHEME is the HTTPS scheme.
pub const HTTPS_SCHEME: &str = "https";

/// HTTP is the HTTP backend.
pub struct HTTP {
/// scheme is the scheme of the HTTP backend.
Expand Down Expand Up @@ -173,13 +179,16 @@ impl super::Backend for HTTP {
impl Default for HTTP {
/// default returns a new default HTTP.
fn default() -> Self {
Self::new("http")
Self::new(HTTP_SCHEME)
}
}

#[cfg(test)]
mod tests {
use crate::{http::HTTP, Backend, GetRequest, HeadRequest};
use crate::{
http::{HTTP, HTTPS_SCHEME, HTTP_SCHEME},
Backend, GetRequest, HeadRequest,
};
use dragonfly_client_util::tls::{load_certs_from_pem, load_key_from_pem};
use hyper_util::rt::{TokioExecutor, TokioIo};
use reqwest::{header::HeaderMap, StatusCode};
Expand Down Expand Up @@ -348,7 +357,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
.mount(&server)
.await;

let resp = HTTP::new("http")
let resp = HTTP::new(HTTP_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: format!("{}/head", server.uri()),
Expand Down Expand Up @@ -376,7 +385,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
.mount(&server)
.await;

let resp = HTTP::new("http")
let resp = HTTP::new(HTTP_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: format!("{}/head", server.uri()),
Expand Down Expand Up @@ -404,7 +413,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
.mount(&server)
.await;

let mut resp = HTTP::new("http")
let mut resp = HTTP::new(HTTP_SCHEME)
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
Expand All @@ -426,7 +435,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_head_response_with_self_signed_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: server_addr,
Expand All @@ -445,7 +454,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_return_error_response_when_head_with_wrong_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: server_addr,
Expand All @@ -463,7 +472,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_response_with_self_signed_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let mut resp = HTTP::new("https")
let mut resp = HTTP::new(HTTPS_SCHEME)
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
Expand All @@ -485,7 +494,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_return_error_response_when_get_with_wrong_cert() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
Expand All @@ -505,7 +514,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_head_response_with_no_verifier() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let resp = HTTP::new("https")
let resp = HTTP::new(HTTPS_SCHEME)
.head(HeadRequest {
task_id: "test".to_string(),
url: server_addr,
Expand All @@ -524,7 +533,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
#[tokio::test]
async fn should_get_response_with_no_verifier() {
let server_addr = start_https_server(SERVER_CERT, SERVER_KEY).await;
let http_backend = HTTP::new("https");
let http_backend = HTTP::new(HTTPS_SCHEME);
let mut resp = http_backend
.get(GetRequest {
task_id: "test".to_string(),
Expand Down
19 changes: 15 additions & 4 deletions dragonfly-client-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use libloading::Library;
use reqwest::header::HeaderMap;
use rustls_pki_types::CertificateDer;
use std::path::Path;
use std::str::FromStr;
use std::{collections::HashMap, pin::Pin, time::Duration};
use std::{fmt::Debug, fs};
use tokio::io::{AsyncRead, AsyncReadExt};
Expand Down Expand Up @@ -227,6 +228,12 @@ impl BackendFactory {
Ok(backend_factory)
}

/// supported_download_directory returns whether the scheme supports directory download.
#[instrument(skip_all)]
pub fn supported_download_directory(scheme: &str) -> bool {
object_storage::Scheme::from_str(scheme).is_ok() || scheme == hdfs::HDFS_SCHEME
}

/// build returns the backend by the scheme of the url.
#[instrument(skip_all)]
pub fn build(&self, url: &str) -> Result<&(dyn Backend + Send + Sync)> {
Expand All @@ -241,12 +248,16 @@ impl BackendFactory {
/// load_builtin_backends loads the builtin backends.
#[instrument(skip_all)]
fn load_builtin_backends(&mut self) {
self.backends
.insert("http".to_string(), Box::new(http::HTTP::new("http")));
self.backends.insert(
"http".to_string(),
Box::new(http::HTTP::new(http::HTTP_SCHEME)),
);
info!("load [http] builtin backend");

self.backends
.insert("https".to_string(), Box::new(http::HTTP::new("https")));
self.backends.insert(
"https".to_string(),
Box::new(http::HTTP::new(http::HTTPS_SCHEME)),
);
info!("load [https] builtin backend");

self.backends.insert(
Expand Down
27 changes: 13 additions & 14 deletions dragonfly-client/src/bin/dfget/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use dragonfly_client::metrics::{
collect_backend_request_started_metrics,
};
use dragonfly_client::tracing::init_tracing;
use dragonfly_client_backend::{object_storage, BackendFactory, DirEntry, HeadRequest};
use dragonfly_client_backend::{hdfs, object_storage, BackendFactory, DirEntry, HeadRequest};
use dragonfly_client_config::VersionValueParser;
use dragonfly_client_config::{self, dfdaemon, dfget};
use dragonfly_client_core::error::{BackendError, ErrorType, OrErr};
Expand Down Expand Up @@ -55,6 +55,9 @@ The full documentation is here: https://d7y.io/docs/next/reference/commands/clie
Examples:
# Download a file from HTTP server.
$ dfget https://<host>:<port>/<path> -O /tmp/file.txt

# Download a file from HDFS.
$ dfget hdfs://<host>:<port>/<path> -O /tmp/file.txt --hdfs-delegation-token=<delegation_token>

# Download a file from Amazon Simple Storage Service(S3).
$ dfget s3://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret>
Expand Down Expand Up @@ -204,7 +207,7 @@ struct Args {
long,
help = "Specify the delegation token for Hadoop Distributed File System(HDFS)"
)]
storage_delegation_token: Option<String>,
hdfs_delegation_token: Option<String>,

#[arg(
long,
Expand Down Expand Up @@ -574,24 +577,19 @@ async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) -
// then download all files in the directory. Otherwise, download the single file.
let scheme = args.url.scheme();
if args.url.path().ends_with('/') {
if !is_support_directory_download(scheme) {
if !BackendFactory::supported_download_directory(scheme) {
return Err(Error::Unsupported(format!("{} download directory", scheme)));
};

return download_dir(args, dfdaemon_download_client).await;
};

download(args, ProgressBar::new(0), dfdaemon_download_client).await
}

/// is_support_directory_download checks whether the scheme supports directory download.
fn is_support_directory_download(scheme: &str) -> bool {
// Only object storage protocol and hdfs protocol supports directory download.
object_storage::Scheme::from_str(scheme).is_ok() || scheme == "hdfs"
}

/// download_dir downloads all files in the directory.
async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Result<()> {
// Initialize the object storage and the hdfs.
// Initialize the object storage config and the hdfs config.
let object_storage = Some(ObjectStorage {
access_key_id: args.storage_access_key_id.clone(),
access_key_secret: args.storage_access_key_secret.clone(),
Expand All @@ -601,8 +599,9 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
credential_path: args.storage_credential_path.clone(),
predefined_acl: args.storage_predefined_acl.clone(),
});

let hdfs = Some(Hdfs {
delegation_token: args.storage_delegation_token.clone(),
delegation_token: args.hdfs_delegation_token.clone(),
});

// Get all entries in the directory. If the directory is empty, then return directly.
Expand Down Expand Up @@ -702,10 +701,10 @@ async fn download(
Err(_) => None,
};

// Only initialize hdfs when the scheme is HDFS protocol.
// Only initialize HDFS when the scheme is HDFS protocol.
let hdfs = match args.url.scheme() {
"hdfs" => Some(Hdfs {
delegation_token: args.storage_delegation_token.clone(),
hdfs::HDFS_SCHEME => Some(Hdfs {
delegation_token: args.hdfs_delegation_token.clone(),
}),
_ => None,
};
Expand Down
Loading