Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bump hive_metastore to use pure rust thrift impl volo #174

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ typed-builder = "^0.18"
url = "2"
urlencoding = "2"
uuid = "1.6.1"

volo-thrift = "0.9.2"
hive_metastore = "0.0.2"
tera = "1"
10 changes: 3 additions & 7 deletions crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@ license = { workspace = true }
keywords = ["iceberg", "hive", "catalog"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
hive_metastore = "0.0.1"
hive_metastore = { workspace = true }
iceberg = { workspace = true }
# the thrift upstream suffered from no regular rust release.
#
# [test-rs](https://github.com/tent-rs) is an organization that helps resolves this
# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the thrift
# crate, built from the thrift upstream with only version bumped.
thrift = { package = "tent-thrift", version = "0.18.1" }
typed-builder = { workspace = true }
volo-thrift = { workspace = true }
76 changes: 38 additions & 38 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

use super::utils::*;
use async_trait::async_trait;
use hive_metastore::{TThriftHiveMetastoreSyncClient, ThriftHiveMetastoreSyncClient};
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
use thrift::transport::{
ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, WriteHalf,
};
use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;

/// Hive metastore Catalog configuration.
Expand All @@ -35,24 +35,7 @@ pub struct HmsCatalogConfig {
address: String,
}

/// TODO: We only support binary protocol for now.
type HmsClientType = ThriftHiveMetastoreSyncClient<
TBinaryInputProtocol<TBufferedReadTransport<ReadHalf<thrift::transport::TTcpChannel>>>,
TBinaryOutputProtocol<TBufferedWriteTransport<WriteHalf<thrift::transport::TTcpChannel>>>,
>;

/// # TODO
///
/// we are using the same connection everytime, we should support connection
/// pool in the future.
struct HmsClient(Arc<Mutex<HmsClientType>>);

impl HmsClient {
fn call<T>(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result<T>) -> Result<T> {
let mut client = self.0.lock().unwrap();
f(&mut client).map_err(from_thrift_error)
}
}
struct HmsClient(ThriftHiveMetastoreClient);

/// Hive metastore Catalog.
pub struct HmsCatalog {
Expand All @@ -71,19 +54,29 @@ impl Debug for HmsCatalog {
impl HmsCatalog {
/// Create a new hms catalog.
pub fn new(config: HmsCatalogConfig) -> Result<Self> {
let mut channel = thrift::transport::TTcpChannel::new();
channel
.open(config.address.as_str())
.map_err(from_thrift_error)?;
let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?;
let i_chan = TBufferedReadTransport::new(i_chan);
let o_chan = TBufferedWriteTransport::new(o_chan);
let i_proto = TBinaryInputProtocol::new(i_chan, true);
let o_proto = TBinaryOutputProtocol::new(o_chan, true);
let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto);
let address = config
.address
.as_str()
.to_socket_addrs()
.map_err(from_io_error)?
.next()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!("invalid address: {}", config.address),
)
})?;

let client = ThriftHiveMetastoreClientBuilder::new("hms")
.address(address)
// Framed thrift rpc is not enabled by default in HMS, we use
// buffered instead.
.make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to make this a config?

Copy link
Member Author

@Xuanwo Xuanwo Feb 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should make this setting available to users, though I believe it's a lower priority. I've created an issue at #188 to keep track of this.

.build();

Ok(Self {
config,
client: HmsClient(Arc::new(Mutex::new(client))),
client: HmsClient(client),
})
}
}
Expand All @@ -103,10 +96,17 @@ impl Catalog for HmsCatalog {
let dbs = if parent.is_some() {
return Ok(vec![]);
} else {
self.client.call(|client| client.get_all_databases())?
self.client
.0
.get_all_databases()
.await
.map_err(from_thrift_error)?
};

Ok(dbs.into_iter().map(NamespaceIdent::new).collect())
Ok(dbs
.into_iter()
.map(|v| NamespaceIdent::new(v.into()))
.collect())
}

async fn create_namespace(
Expand Down
17 changes: 16 additions & 1 deletion crates/catalog/hms/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,28 @@
// specific language governing permissions and limitations
// under the License.

use anyhow::anyhow;
use iceberg::{Error, ErrorKind};
use std::fmt::Debug;
use std::io;

/// Format a thrift error into iceberg error.
pub fn from_thrift_error(error: thrift::Error) -> Error {
pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> Error
where
T: Debug,
{
Error::new(
ErrorKind::Unexpected,
"operation failed for hitting thrift error".to_string(),
)
.with_source(anyhow!("thrift error: {:?}", error))
}

/// Format an io error into iceberg error.
pub fn from_io_error(error: io::Error) -> Error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io::Error is a common error, how about moving it to core crate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if moving to core and making it a public API is a good idea. What if we address this once we have the same from_io_error in core?

Error::new(
ErrorKind::Unexpected,
"operation failed for hitting io error".to_string(),
)
.with_source(error)
}
Loading