Skip to content

Commit

Permalink
feat(services): add hdfs native layout (#3933)
Browse files Browse the repository at this point in the history
* intial commit for native-hdfs service

* revert async-tls change

* review comments on layout

* remove doc

* fix import issues

* fix import errors

* change service name

* parse hdfs-native error into opendal error

* cargo fmt

* reader and writer implementations

* revert implementation details

* cargo fmt and clippy

* review comments

* rename to error.rs

* fix clippy unused issue for now

* commit cargo.lock file

* Fix cargo lock changes

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
  • Loading branch information
shbhmrzd and Xuanwo authored Jan 17, 2024
1 parent 553cb35 commit d99cac4
Show file tree
Hide file tree
Showing 12 changed files with 568 additions and 1 deletion.
83 changes: 82 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ services-wasabi = []
services-webdav = []
services-webhdfs = []
services-yandex-disk = []
services-hdfs-native = ["hdfs-native"]

[lib]
bench = false
Expand Down Expand Up @@ -332,6 +333,8 @@ suppaftp = { version = "5.2", default-features = false, features = [
], optional = true }
# for services-tikv
tikv-client = { version = "0.3.0", optional = true, default-features = false }
# for services-hdfs-native
hdfs-native = { version = "0.6.0", optional = true}

# Layers
# for layers-async-backtrace
Expand Down
241 changes: 241 additions & 0 deletions core/src/services/hdfs_native/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use async_trait::async_trait;
use hdfs_native::WriteOptions;
use log::debug;
use serde::Deserialize;
// use uuid::Uuid;

use super::error::parse_hdfs_error;
use super::lister::HdfsNativeLister;
use super::reader::HdfsNativeReader;
use super::writer::HdfsNativeWriter;
use crate::raw::*;
use crate::*;

/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/) support.
/// Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native).

/// Config for HdfsNative services support.
#[derive(Default, Deserialize, Clone)]
#[serde(default)]
#[non_exhaustive]
pub struct HdfsNativeConfig {
/// work dir of this backend
pub root: Option<String>,
/// url of this backend
pub url: Option<String>,
/// enable the append capacity
pub enable_append: bool,
}

impl Debug for HdfsNativeConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsNativeConfig")
.field("root", &self.root)
.field("url", &self.url)
.field("enable_append", &self.enable_append)
.finish_non_exhaustive()
}
}

#[doc = include_str!("docs.md")]
#[derive(Default)]
pub struct HdfsNativeBuilder {
config: HdfsNativeConfig,
}

impl Debug for HdfsNativeBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsNativeBuilder")
.field("config", &self.config)
.finish()
}
}

impl HdfsNativeBuilder {
/// Set root of this backend.
///
/// All operations will happen under this root.
pub fn root(&mut self, root: &str) -> &mut Self {
self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};

self
}

/// Set url of this backend.
///
/// Valid format including:
///
/// - `default`: using the default setting based on hadoop config.
/// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster.
pub fn url(&mut self, url: &str) -> &mut Self {
if !url.is_empty() {
// Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
self.config.url = Some(url.trim_end_matches('/').to_string())
}

self
}

/// Enable append capacity of this backend.
///
/// This should be disabled when HDFS runs in non-distributed mode.
pub fn enable_append(&mut self, enable_append: bool) -> &mut Self {
self.config.enable_append = enable_append;
self
}
}

impl Builder for HdfsNativeBuilder {
const SCHEME: Scheme = Scheme::HdfsNative;
type Accessor = HdfsNativeBackend;

fn from_map(map: HashMap<String, String>) -> Self {
// Deserialize the configuration from the HashMap.
let config = HdfsNativeConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");

// Create an HdfsNativeBuilder instance with the deserialized config.
HdfsNativeBuilder { config }
}

fn build(&mut self) -> Result<Self::Accessor> {
debug!("backend build started: {:?}", &self);

let url = match &self.config.url {
Some(v) => v,
None => {
return Err(Error::new(ErrorKind::ConfigInvalid, "url is empty")
.with_context("service", Scheme::HdfsNative))
}
};

let root = normalize_root(&self.config.root.take().unwrap_or_default());
debug!("backend use root {}", root);

let client = hdfs_native::Client::new(url).map_err(parse_hdfs_error)?;

// need to check if root dir exists, create if not

debug!("backend build finished: {:?}", &self);
Ok(HdfsNativeBackend {
root,
client: Arc::new(client),
_enable_append: self.config.enable_append,
})
}
}

// #[inline]
// fn tmp_file_of(path: &str) -> String {
// let name = get_basename(path);
// let uuid = Uuid::new_v4().to_string();
//
// format!("{name}.{uuid}")
// }

/// Backend for hdfs-native services.
#[derive(Debug, Clone)]
pub struct HdfsNativeBackend {
root: String,
client: Arc<hdfs_native::Client>,
_enable_append: bool,
}

/// hdfs_native::Client is thread-safe.
unsafe impl Send for HdfsNativeBackend {}
unsafe impl Sync for HdfsNativeBackend {}

#[async_trait]
impl Accessor for HdfsNativeBackend {
type Reader = HdfsNativeReader;
type BlockingReader = ();
type Writer = HdfsNativeWriter;
type BlockingWriter = ();
type Lister = Option<HdfsNativeLister>;
type BlockingLister = ();

fn info(&self) -> AccessorInfo {
todo!()
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let p = build_rooted_abs_path(&self.root, path);

self.client
.mkdirs(&p, 0o777, true)
.await
.map_err(parse_hdfs_error)?;
Ok(RpCreateDir::default())
}

async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> {
let p = build_rooted_abs_path(&self.root, path);

let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;

let r = HdfsNativeReader::new(f);

Ok((RpRead::new(), r))
}

async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let p = build_rooted_abs_path(&self.root, path);

let f = self
.client
.create(&p, WriteOptions::default())
.await
.map_err(parse_hdfs_error)?;

let w = HdfsNativeWriter::new(f);

Ok((RpWrite::new(), w))
}

async fn copy(&self, _from: &str, _to: &str, _args: OpCopy) -> Result<RpCopy> {
todo!()
}

async fn rename(&self, _from: &str, _to: &str, _args: OpRename) -> Result<RpRename> {
todo!()
}

async fn stat(&self, _path: &str, _args: OpStat) -> Result<RpStat> {
todo!()
}

async fn delete(&self, _path: &str, _args: OpDelete) -> Result<RpDelete> {
todo!()
}

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);
let l = HdfsNativeLister::new(p, self.client.clone());
Ok((RpList::default(), Some(l)))
}
}
1 change: 1 addition & 0 deletions core/src/services/hdfs_native/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A distributed file system that provides high-throughput access to application data.
Loading

0 comments on commit d99cac4

Please sign in to comment.