Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dfget supports HDFS #838

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ opendal = { version = "0.48.0", features = [
"services-oss",
"services-obs",
"services-cos",
"services-webhdfs",
] }
clap = { version = "4.5.20", features = ["derive"] }
anyhow = "1.0.93"
Expand Down
260 changes: 260 additions & 0 deletions dragonfly-client-backend/src/hdfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use dragonfly_api::common;
use dragonfly_client_core::error::BackendError;
use dragonfly_client_core::{Error as ClientError, Result as ClientResult};
use opendal::{Metakey, Operator};
use percent_encoding::percent_decode_str;
use tokio_util::io::StreamReader;
use tracing::{error, info, instrument};
use url::Url;

const NAMENODE_DEFAULT_WEB_PORT: u16 = 9870;

/// Hdfs is a struct that implements the Backend trait.
pub struct Hdfs {}

/// Hdfs implements the Backend trait.
impl Hdfs {
/// new returns a new HDFS backend.
#[instrument(skip_all)]
pub fn new() -> Self {
Self {}
}

/// operator initializes the operator with the parsed URL and HDFS config.
#[instrument(skip_all)]
pub fn operator(&self, url: Url, config: Option<common::v2::Hdfs>) -> ClientResult<Operator> {
// Get the host and port from the URL.
let host = url
.host_str()
.ok_or_else(|| ClientError::InvalidURI(url.to_string()))?
.to_string();
let port = url.port().unwrap_or(NAMENODE_DEFAULT_WEB_PORT);

// Initialize the HDFS operator.
let mut builder = opendal::services::Webhdfs::default();
builder = builder
.root("/")
.endpoint(format!("http://{}:{}", host, port).as_str());

// If HDFS config is not None, set the config for builder.
if let Some(config) = config {
if let Some(delegation_token) = config.delegation_token {
builder = builder.delegation(delegation_token.as_str());
}
}

Ok(Operator::new(builder)?.finish())
}
}

/// Implement the Backend trait for Hdfs.
#[tonic::async_trait]
impl super::Backend for Hdfs {
/// scheme returns the scheme of the HDFS backend.
#[instrument(skip_all)]
fn scheme(&self) -> String {
"hdfs".to_string()
}

/// head gets the header of the request.
#[instrument(skip_all)]
async fn head(&self, request: super::HeadRequest) -> ClientResult<super::HeadResponse> {
info!(
"head request {} {}: {:?}",
request.task_id, request.url, request.http_header
);

// Parse the URL.
let url = Url::parse(request.url.as_ref())
.map_err(|_| ClientError::InvalidURI(request.url.clone()))?;
let decoded_path = percent_decode_str(url.path())
.decode_utf8_lossy()
.to_string();

// Initialize the operator with the parsed URL and HDFS config.
let operator = self.operator(url.clone(), request.hdfs)?;

// Get the entries if url point to a directory.
let entries = if url.path().ends_with('/') {
operator
.list_with(decoded_path.as_str())
.recursive(true)
.metakey(Metakey::ContentLength | Metakey::Mode)
.await // Do the list op here.
.map_err(|err| {
error!(
"list request failed {} {}: {}",
request.task_id, request.url, err
);
ClientError::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?
.into_iter()
.map(|entry| {
let metadata = entry.metadata();
let mut url = url.clone();
url.set_path(entry.path());
super::DirEntry {
url: url.to_string(),
content_length: metadata.content_length() as usize,
is_dir: metadata.is_dir(),
}
})
.collect()
} else {
Vec::new()
};

// Stat the path to get the response from HDFS operator.
let response = operator
.stat_with(decoded_path.as_str())
.await
.map_err(|err| {
error!(
"stat request failed {} {}: {}",
request.task_id, request.url, err
);
ClientError::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?;

info!(
"head response {} {}: {}",
request.task_id,
request.url,
response.content_length()
);

Ok(super::HeadResponse {
success: true,
content_length: Some(response.content_length()),
http_header: None,
http_status_code: None,
error_message: None,
entries,
})
}

/// get returns content of requested file.
#[instrument(skip_all)]
async fn get(
&self,
request: super::GetRequest,
) -> ClientResult<super::GetResponse<super::Body>> {
info!(
"get request {} {}: {:?}",
request.piece_id, request.url, request.http_header
);

// Parse the URL.
let url = Url::parse(request.url.as_ref())
.map_err(|_| ClientError::InvalidURI(request.url.clone()))?;
let decoded_path = percent_decode_str(url.path())
.decode_utf8_lossy()
.to_string();

// Initialize the operator with the parsed URL and HDFS config.
let operator_reader = self
.operator(url.clone(), request.hdfs)?
.reader(decoded_path.as_ref())
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
ClientError::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?;

let stream = match request.range {
Some(range) => operator_reader
.into_bytes_stream(range.start..range.start + range.length)
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
ClientError::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?,
None => operator_reader.into_bytes_stream(..).await.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
);
ClientError::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?,
};

Ok(crate::GetResponse {
success: true,
http_header: None,
http_status_code: Some(reqwest::StatusCode::OK),
reader: Box::new(StreamReader::new(stream)),
error_message: None,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn should_get_operator() {
let url: Url = Url::parse("hdfs://127.0.0.1:9870/file").unwrap();
let operator = Hdfs::new().operator(url, None);

// If HDFS is running on localhost, the following code can be used to check the operator.
// operator.unwrap().check().await.unwrap();

assert!(
operator.is_ok(),
"can not get hdfs operator, due to: {}",
operator.unwrap_err()
);
}

#[test]
fn should_return_error_when_url_not_valid() {
let url: Url = Url::parse("hdfs:/127.0.0.1:9870/file").unwrap();
let result = Hdfs::new().operator(url, None);

assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ClientError::InvalidURI(..)));
}
}
9 changes: 9 additions & 0 deletions dragonfly-client-backend/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: std::time::Duration::from_secs(5),
client_cert: None,
object_storage: None,
hdfs: None,
})
.await
.unwrap();
Expand Down Expand Up @@ -383,6 +384,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: std::time::Duration::from_secs(5),
client_cert: None,
object_storage: None,
hdfs: None,
})
.await;

Expand Down Expand Up @@ -412,6 +414,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: std::time::Duration::from_secs(5),
client_cert: None,
object_storage: None,
hdfs: None,
})
.await
.unwrap();
Expand All @@ -431,6 +434,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: Duration::from_secs(5),
client_cert: Some(load_certs_from_pem(CA_CERT).unwrap()),
object_storage: None,
hdfs: None,
})
.await
.unwrap();
Expand All @@ -449,6 +453,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: Duration::from_secs(5),
client_cert: Some(load_certs_from_pem(WRONG_CA_CERT).unwrap()),
object_storage: None,
hdfs: None,
})
.await;

Expand All @@ -468,6 +473,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: std::time::Duration::from_secs(5),
client_cert: Some(load_certs_from_pem(CA_CERT).unwrap()),
object_storage: None,
hdfs: None,
})
.await
.unwrap();
Expand All @@ -489,6 +495,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: std::time::Duration::from_secs(5),
client_cert: Some(load_certs_from_pem(WRONG_CA_CERT).unwrap()),
object_storage: None,
hdfs: None,
})
.await;

Expand All @@ -506,6 +513,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: Duration::from_secs(5),
client_cert: None,
object_storage: None,
hdfs: None,
})
.await
.unwrap();
Expand All @@ -527,6 +535,7 @@ TrIVG3cErZoBC6zqBs/Ibe9q3gdHGqS3QLAKy/k=
timeout: std::time::Duration::from_secs(5),
client_cert: None,
object_storage: None,
hdfs: None,
})
.await
.unwrap();
Expand Down
Loading
Loading