Skip to content

Commit

Permalink
feat: add timeout for hdfs backend
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Nov 20, 2024
1 parent a19cc89 commit 59301cd
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions dragonfly-client-backend/src/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
use dragonfly_api::common;
use dragonfly_client_core::error::BackendError;
use dragonfly_client_core::{Error as ClientError, Result as ClientResult};
use opendal::{Metakey, Operator};
use opendal::{layers::TimeoutLayer, Metakey, Operator};
use percent_encoding::percent_decode_str;
use std::time::Duration;
use tokio_util::io::StreamReader;
use tracing::{error, info, instrument};
use url::Url;
Expand Down Expand Up @@ -47,7 +48,12 @@ impl Hdfs {

/// operator initializes the operator with the parsed URL and HDFS config.
#[instrument(skip_all)]
pub fn operator(&self, url: Url, config: Option<common::v2::Hdfs>) -> ClientResult<Operator> {
pub fn operator(
&self,
url: Url,
config: Option<common::v2::Hdfs>,
timeout: Duration,
) -> ClientResult<Operator> {
// Get the host and port from the URL.
let host = url
.host_str()
Expand All @@ -68,7 +74,9 @@ impl Hdfs {
}
}

Ok(Operator::new(builder)?.finish())
Ok(Operator::new(builder)?
.finish()
.layer(TimeoutLayer::new().with_timeout(timeout)))
}
}

Expand Down Expand Up @@ -97,7 +105,7 @@ impl super::Backend for Hdfs {
.to_string();

// Initialize the operator with the parsed URL and HDFS config.
let operator = self.operator(url.clone(), request.hdfs)?;
let operator = self.operator(url.clone(), request.hdfs, request.timeout)?;

// Get the entries if url point to a directory.
let entries = if url.path().ends_with('/') {
Expand Down Expand Up @@ -186,7 +194,7 @@ impl super::Backend for Hdfs {

// Initialize the operator with the parsed URL and HDFS config.
let operator_reader = self
.operator(url.clone(), request.hdfs)?
.operator(url.clone(), request.hdfs, request.timeout)?
.reader(decoded_path.as_ref())
.await
.map_err(|err| {
Expand Down Expand Up @@ -246,7 +254,7 @@ mod tests {
#[tokio::test]
async fn should_get_operator() {
let url: Url = Url::parse("hdfs://127.0.0.1:9870/file").unwrap();
let operator = Hdfs::new().operator(url, None);
let operator = Hdfs::new().operator(url, None, Duration::from_secs(10));

assert!(
operator.is_ok(),
Expand All @@ -258,7 +266,7 @@ mod tests {
#[test]
fn should_return_error_when_url_not_valid() {
let url: Url = Url::parse("hdfs:/127.0.0.1:9870/file").unwrap();
let result = Hdfs::new().operator(url, None);
let result = Hdfs::new().operator(url, None, Duration::from_secs(10));

assert!(result.is_err());
assert!(matches!(result.unwrap_err(), ClientError::InvalidURI(..)));
Expand Down

0 comments on commit 59301cd

Please sign in to comment.