diff --git a/Cargo.lock b/Cargo.lock index cd56abe92a4..f0476a220a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2777,6 +2777,34 @@ dependencies = [ "byteorder", ] +[[package]] +name = "g2gen" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2c7625b2fc250dd90b63f7887a6bb0f7ec1d714c8278415bea2669ef20820e" +dependencies = [ + "g2poly", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "g2p" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc36d9bdc3d2da057775a9f4fa7d7b09edab3e0eda7a92cc353358fa63b8519e" +dependencies = [ + "g2gen", + "g2poly", +] + +[[package]] +name = "g2poly" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af6a86e750338603ea2c14b1c0bfe58cd61f87ca67a0021d9334996024608e12" + [[package]] name = "generator" version = "0.7.5" @@ -2939,6 +2967,31 @@ dependencies = [ "hashbrown 0.14.2", ] +[[package]] +name = "hdfs-native" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "270a4d5e17b0a3e252ecf3c85bd62d7ad89cb423c1dde8e58cfe50458e9c6066" +dependencies = [ + "base64 0.21.5", + "bytes", + "crc", + "futures", + "g2p", + "libc", + "log", + "num-traits", + "prost 0.11.9", + "prost-types 0.11.9", + "roxmltree", + "socket2 0.5.5", + "thiserror", + "tokio", + "url", + "users", + "uuid", +] + [[package]] name = "hdfs-sys" version = "0.3.0" @@ -4547,6 +4600,7 @@ dependencies = [ "futures", "getrandom 0.2.11", "governor", + "hdfs-native", "hdrs", "hmac", "hrana-client-proto", @@ -5447,7 +5501,7 @@ dependencies = [ "petgraph", "prettyplease", "prost 0.12.3", - "prost-types", + "prost-types 0.12.3", "regex", "syn 2.0.39", "tempfile", @@ -5480,6 +5534,15 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost 0.11.9", +] + [[package]] name = "prost-types" version = "0.12.3" @@ -6083,6 +6146,15 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "roxmltree" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862340e351ce1b271a378ec53f304a5558f7db87f3769dc655a8f6ecbb68b302" +dependencies = [ + "xmlparser", +] + [[package]] name = "rsa" version = "0.9.4" @@ -7743,6 +7815,15 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "users" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24cc0f6d6f267b73e5a2cadf007ba8f9bc39c6a6f9666f8cf25ea809a153b032" +dependencies = [ + "libc", +] + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 87c3b32d2f6..d04dce44fbd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -200,6 +200,7 @@ services-wasabi = [] services-webdav = [] services-webhdfs = [] services-yandex-disk = [] +services-hdfs-native = ["hdfs-native"] [lib] bench = false @@ -332,6 +333,8 @@ suppaftp = { version = "5.2", default-features = false, features = [ ], optional = true } # for services-tikv tikv-client = { version = "0.3.0", optional = true, default-features = false } +# for services-hdfs-native +hdfs-native = { version = "0.6.0", optional = true} # Layers # for layers-async-backtrace diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs new file mode 100644 index 00000000000..5c5412b3d87 --- /dev/null +++ b/core/src/services/hdfs_native/backend.rs @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use async_trait::async_trait; +use hdfs_native::WriteOptions; +use log::debug; +use serde::Deserialize; +// use uuid::Uuid; + +use super::error::parse_hdfs_error; +use super::lister::HdfsNativeLister; +use super::reader::HdfsNativeReader; +use super::writer::HdfsNativeWriter; +use crate::raw::*; +use crate::*; + +/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/) support. +/// Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native). + +/// Config for HdfsNative services support. +#[derive(Default, Deserialize, Clone)] +#[serde(default)] +#[non_exhaustive] +pub struct HdfsNativeConfig { + /// work dir of this backend + pub root: Option, + /// url of this backend + pub url: Option, + /// enable the append capacity + pub enable_append: bool, +} + +impl Debug for HdfsNativeConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HdfsNativeConfig") + .field("root", &self.root) + .field("url", &self.url) + .field("enable_append", &self.enable_append) + .finish_non_exhaustive() + } +} + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct HdfsNativeBuilder { + config: HdfsNativeConfig, +} + +impl Debug for HdfsNativeBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HdfsNativeBuilder") + .field("config", &self.config) + .finish() + } +} + +impl HdfsNativeBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// Set url of this backend. + /// + /// Valid format including: + /// + /// - `default`: using the default setting based on hadoop config. + /// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster. + pub fn url(&mut self, url: &str) -> &mut Self { + if !url.is_empty() { + // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/` + self.config.url = Some(url.trim_end_matches('/').to_string()) + } + + self + } + + /// Enable append capacity of this backend. + /// + /// This should be disabled when HDFS runs in non-distributed mode. + pub fn enable_append(&mut self, enable_append: bool) -> &mut Self { + self.config.enable_append = enable_append; + self + } +} + +impl Builder for HdfsNativeBuilder { + const SCHEME: Scheme = Scheme::HdfsNative; + type Accessor = HdfsNativeBackend; + + fn from_map(map: HashMap) -> Self { + // Deserialize the configuration from the HashMap. + let config = HdfsNativeConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an HdfsNativeBuilder instance with the deserialized config. + HdfsNativeBuilder { config } + } + + fn build(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let url = match &self.config.url { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "url is empty") + .with_context("service", Scheme::HdfsNative)) + } + }; + + let root = normalize_root(&self.config.root.take().unwrap_or_default()); + debug!("backend use root {}", root); + + let client = hdfs_native::Client::new(url).map_err(parse_hdfs_error)?; + + // need to check if root dir exists, create if not + + debug!("backend build finished: {:?}", &self); + Ok(HdfsNativeBackend { + root, + client: Arc::new(client), + _enable_append: self.config.enable_append, + }) + } +} + +// #[inline] +// fn tmp_file_of(path: &str) -> String { +// let name = get_basename(path); +// let uuid = Uuid::new_v4().to_string(); +// +// format!("{name}.{uuid}") +// } + +/// Backend for hdfs-native services. +#[derive(Debug, Clone)] +pub struct HdfsNativeBackend { + root: String, + client: Arc, + _enable_append: bool, +} + +/// hdfs_native::Client is thread-safe. +unsafe impl Send for HdfsNativeBackend {} +unsafe impl Sync for HdfsNativeBackend {} + +#[async_trait] +impl Accessor for HdfsNativeBackend { + type Reader = HdfsNativeReader; + type BlockingReader = (); + type Writer = HdfsNativeWriter; + type BlockingWriter = (); + type Lister = Option; + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + todo!() + } + + async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result { + let p = build_rooted_abs_path(&self.root, path); + + self.client + .mkdirs(&p, 0o777, true) + .await + .map_err(parse_hdfs_error)?; + Ok(RpCreateDir::default()) + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_rooted_abs_path(&self.root, path); + + let f = self.client.read(&p).await.map_err(parse_hdfs_error)?; + + let r = HdfsNativeReader::new(f); + + Ok((RpRead::new(), r)) + } + + async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_rooted_abs_path(&self.root, path); + + let f = self + .client + .create(&p, WriteOptions::default()) + .await + .map_err(parse_hdfs_error)?; + + let w = HdfsNativeWriter::new(f); + + Ok((RpWrite::new(), w)) + } + + async fn copy(&self, _from: &str, _to: &str, _args: OpCopy) -> Result { + todo!() + } + + async fn rename(&self, _from: &str, _to: &str, _args: OpRename) -> Result { + todo!() + } + + async fn stat(&self, _path: &str, _args: OpStat) -> Result { + todo!() + } + + async fn delete(&self, _path: &str, _args: OpDelete) -> Result { + todo!() + } + + async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { + let p = build_rooted_abs_path(&self.root, path); + let l = HdfsNativeLister::new(p, self.client.clone()); + Ok((RpList::default(), Some(l))) + } +} diff --git a/core/src/services/hdfs_native/docs.md b/core/src/services/hdfs_native/docs.md new file mode 100644 index 00000000000..7d720d49865 --- /dev/null +++ b/core/src/services/hdfs_native/docs.md @@ -0,0 +1 @@ +A distributed file system that provides high-throughput access to application data. diff --git a/core/src/services/hdfs_native/error.rs b/core/src/services/hdfs_native/error.rs new file mode 100644 index 00000000000..4432716535c --- /dev/null +++ b/core/src/services/hdfs_native/error.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::*; +use hdfs_native::HdfsError; + +/// Parse hdfs-native error into opendal::Error. +pub fn parse_hdfs_error(hdfs_error: HdfsError) -> Error { + let (kind, retryable, msg) = match &hdfs_error { + HdfsError::IOError(err) => (ErrorKind::Unexpected, false, err.to_string()), + HdfsError::DataTransferError(msg) => (ErrorKind::Unexpected, false, msg.clone()), + HdfsError::ChecksumError => ( + ErrorKind::Unexpected, + false, + "checksums didn't match".to_string(), + ), + HdfsError::InvalidPath(msg) => (ErrorKind::InvalidInput, false, msg.clone()), + HdfsError::InvalidArgument(msg) => (ErrorKind::InvalidInput, false, msg.clone()), + HdfsError::UrlParseError(err) => (ErrorKind::Unexpected, false, err.to_string()), + HdfsError::AlreadyExists(msg) => (ErrorKind::AlreadyExists, false, msg.clone()), + HdfsError::OperationFailed(msg) => (ErrorKind::Unexpected, false, msg.clone()), + HdfsError::FileNotFound(msg) => (ErrorKind::NotFound, false, msg.clone()), + HdfsError::BlocksNotFound(msg) => (ErrorKind::NotFound, false, msg.clone()), + HdfsError::IsADirectoryError(msg) => (ErrorKind::IsADirectory, false, msg.clone()), + _ => ( + ErrorKind::Unexpected, + false, + "unexpected error from hdfs".to_string(), + ), + }; + + let mut err = Error::new(kind, &msg).set_source(hdfs_error); + + if retryable { + err = err.set_temporary(); + } + + err +} diff --git a/core/src/services/hdfs_native/lister.rs b/core/src/services/hdfs_native/lister.rs new file mode 100644 index 00000000000..deb4336446b --- /dev/null +++ b/core/src/services/hdfs_native/lister.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::oio; +use crate::raw::oio::Entry; +use crate::*; +use std::sync::Arc; +use std::task::{Context, Poll}; + +pub struct HdfsNativeLister { + _path: String, + _client: Arc, +} + +impl HdfsNativeLister { + pub fn new(path: String, client: Arc) -> Self { + HdfsNativeLister { + _path: path, + _client: client, + } + } +} + +impl oio::List for HdfsNativeLister { + fn poll_next(&mut self, _cx: &mut Context<'_>) -> Poll>> { + todo!() + } +} diff --git a/core/src/services/hdfs_native/mod.rs b/core/src/services/hdfs_native/mod.rs new file mode 100644 index 00000000000..a65cacb0a67 --- /dev/null +++ b/core/src/services/hdfs_native/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::HdfsNativeBuilder as HdfsNative; +pub use backend::HdfsNativeConfig; + +mod error; +mod lister; +mod reader; +mod writer; diff --git a/core/src/services/hdfs_native/reader.rs b/core/src/services/hdfs_native/reader.rs new file mode 100644 index 00000000000..babbbf32b14 --- /dev/null +++ b/core/src/services/hdfs_native/reader.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::oio::Read; +use crate::*; +use bytes::Bytes; +use hdfs_native::file::FileReader; +use std::io::SeekFrom; +use std::task::{Context, Poll}; + +pub struct HdfsNativeReader { + _f: FileReader, +} + +impl HdfsNativeReader { + pub fn new(f: FileReader) -> Self { + HdfsNativeReader { _f: f } + } +} + +impl Read for HdfsNativeReader { + fn poll_read(&mut self, _cx: &mut Context<'_>, _buf: &mut [u8]) -> Poll> { + todo!() + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + let (_, _) = (cx, pos); + + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "HdfsNativeReader doesn't support seeking", + ))) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let _ = cx; + + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "HdfsNativeReader doesn't support iterating", + )))) + } +} diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs new file mode 100644 index 00000000000..de0349eacf3 --- /dev/null +++ b/core/src/services/hdfs_native/writer.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::oio; +use crate::raw::oio::WriteBuf; +use crate::*; +use hdfs_native::file::FileWriter; +use std::task::{Context, Poll}; + +pub struct HdfsNativeWriter { + _f: FileWriter, +} + +impl HdfsNativeWriter { + pub fn new(f: FileWriter) -> Self { + HdfsNativeWriter { _f: f } + } +} + +impl oio::Write for HdfsNativeWriter { + fn poll_write(&mut self, _cx: &mut Context<'_>, _bs: &dyn WriteBuf) -> Poll> { + todo!() + } + + fn poll_close(&mut self, _cx: &mut Context<'_>) -> Poll> { + todo!() + } + + fn poll_abort(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "HdfsNativeWriter doesn't support abort", + ))) + } +} diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 146efb962b5..c09a61e9139 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -353,6 +353,13 @@ pub use pcloud::Pcloud; #[cfg(feature = "services-pcloud")] pub use pcloud::PcloudConfig; +#[cfg(feature = "services-hdfs-native")] +mod hdfs_native; +#[cfg(feature = "services-hdfs-native")] +pub use hdfs_native::HdfsNative; +#[cfg(feature = "services-hdfs-native")] +pub use hdfs_native::HdfsNativeConfig; + #[cfg(feature = "services-yandex-disk")] mod yandex_disk; #[cfg(feature = "services-yandex-disk")] diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 30dcd251eda..ab63e222389 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -261,6 +261,8 @@ impl Operator { Scheme::Redb => Self::from_map::(map)?.finish(), #[cfg(feature = "services-mongodb")] Scheme::Mongodb => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-hdfs-native")] + Scheme::HdfsNative => Self::from_map::(map)?.finish(), v => { return Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index cdb4cf2f67b..289fb9ca365 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -145,6 +145,8 @@ pub enum Scheme { Mongodb, /// [gridfs](crate::services::gridfs): MongoDB Gridfs Services Gridfs, + /// [Native HDFS](crate::services::hdfs_native): Hdfs Native service, using rust hdfs-native client for hdfs + HdfsNative, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -279,6 +281,8 @@ impl Scheme { Scheme::Redb, #[cfg(feature = "services-mongodb")] Scheme::Mongodb, + #[cfg(feature = "services-hdfs-native")] + Scheme::HdfsNative, ]) } } @@ -360,6 +364,7 @@ impl FromStr for Scheme { "tikv" => Ok(Scheme::Tikv), "azfile" => Ok(Scheme::Azfile), "mongodb" => Ok(Scheme::Mongodb), + "hdfs_native" => Ok(Scheme::HdfsNative), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -424,6 +429,7 @@ impl From for &'static str { Scheme::Upyun => "upyun", Scheme::YandexDisk => "yandex_disk", Scheme::Pcloud => "pcloud", + Scheme::HdfsNative => "hdfs_native", Scheme::Custom(v) => v, } }