Skip to content

Commit

Permalink
Merge pull request #1338 from roeap/unity-catalog
Browse files Browse the repository at this point in the history
feat: integrate unity catalog with datafusion
  • Loading branch information
rtyler authored Sep 20, 2023
2 parents d2fc649 + 9b3a0e5 commit 6746dd4
Show file tree
Hide file tree
Showing 14 changed files with 2,886 additions and 104 deletions.
1 change: 0 additions & 1 deletion .github/workflows/dev_pr/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,3 @@ proofs:

tlaplus:
- tlaplus/**/*

6 changes: 4 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,17 @@ impl RawDeltaTable {
}

#[classmethod]
#[pyo3(signature = (data_catalog, database_name, table_name, data_catalog_id, catalog_options = None))]
fn get_table_uri_from_data_catalog(
_cls: &PyType,
data_catalog: &str,
database_name: &str,
table_name: &str,
data_catalog_id: Option<String>,
catalog_options: Option<HashMap<String, String>>,
) -> PyResult<String> {
let data_catalog =
deltalake::data_catalog::get_data_catalog(data_catalog).map_err(|_| {
let data_catalog = deltalake::data_catalog::get_data_catalog(data_catalog, catalog_options)
.map_err(|_| {
PyValueError::new_err(format!("Catalog '{}' not available.", data_catalog))
})?;
let table_uri = rt()?
Expand Down
22 changes: 13 additions & 9 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ tokio = { workspace = true, features = [

# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
"hdfs3",
"try_spawn_blocking",
], optional = true }
errno = "0.3"
hyper = { version = "0.14", optional = true }
itertools = "0.11"
lazy_static = "1"
log = "0"
Expand All @@ -78,6 +75,14 @@ once_cell = "1.16.0"
parking_lot = "0.12"
parquet2 = { version = "0.17", optional = true }
percent-encoding = "2"
tracing = { version = "0.1", optional = true }
rand = "0.8"

# hdfs
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
"hdfs3",
"try_spawn_blocking",
], optional = true }

# S3 lock client
rusoto_core = { version = "0.47", default-features = false, optional = true }
Expand All @@ -93,8 +98,6 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
reqwest-middleware = { version = "0.2.1", optional = true }
reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
dashmap = { version = "5", optional = true }
Expand All @@ -117,6 +120,7 @@ tempdir = "0"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"
hyper = { version = "0.14", features = ["server"] }

[features]
azure = ["object_store/azure"]
Expand Down Expand Up @@ -145,8 +149,8 @@ datafusion = [
]
datafusion-ext = ["datafusion"]
gcs = ["object_store/gcp"]
glue = ["s3", "rusoto_glue/rustls"]
glue-native-tls = ["s3-native-tls", "rusoto_glue"]
glue = ["s3", "rusoto_glue/rustls", "tracing", "hyper"]
glue-native-tls = ["s3-native-tls", "rusoto_glue", "tracing", "hyper"]
hdfs = ["datafusion-objectstore-hdfs"]
# used only for integration testing
integration_test = ["fs_extra", "tempdir"]
Expand All @@ -168,7 +172,7 @@ s3 = [
"dynamodb_lock/rustls",
"object_store/aws",
]
unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"]
unity-experimental = ["reqwest", "tracing", "hyper"]

[[bench]]
name = "read_checkpoint"
Expand Down
135 changes: 135 additions & 0 deletions rust/src/data_catalog/client/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//! Exponential backoff with jitter
use rand::prelude::*;
use std::time::Duration;

/// Exponential backoff with jitter
///
/// See <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
#[allow(missing_copy_implementations)]
#[derive(Debug, Clone)]
pub struct BackoffConfig {
/// The initial backoff duration
pub init_backoff: Duration,
/// The maximum backoff duration
pub max_backoff: Duration,
/// The base of the exponential to use
pub base: f64,
}

impl Default for BackoffConfig {
fn default() -> Self {
Self {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(15),
base: 2.,
}
}
}

/// [`Backoff`] can be created from a [`BackoffConfig`]
///
/// Consecutive calls to [`Backoff::tick`] will return the next backoff interval
pub struct Backoff {
init_backoff: f64,
next_backoff_secs: f64,
max_backoff_secs: f64,
base: f64,
rng: Option<Box<dyn RngCore + Sync + Send>>,
}

impl std::fmt::Debug for Backoff {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backoff")
.field("init_backoff", &self.init_backoff)
.field("next_backoff_secs", &self.next_backoff_secs)
.field("max_backoff_secs", &self.max_backoff_secs)
.field("base", &self.base)
.finish()
}
}

impl Backoff {
/// Create a new [`Backoff`] from the provided [`BackoffConfig`]
pub fn new(config: &BackoffConfig) -> Self {
Self::new_with_rng(config, None)
}

/// Creates a new `Backoff` with the optional `rng`
///
/// Used [`rand::thread_rng()`] if no rng provided
pub fn new_with_rng(
config: &BackoffConfig,
rng: Option<Box<dyn RngCore + Sync + Send>>,
) -> Self {
let init_backoff = config.init_backoff.as_secs_f64();
Self {
init_backoff,
next_backoff_secs: init_backoff,
max_backoff_secs: config.max_backoff.as_secs_f64(),
base: config.base,
rng,
}
}

/// Returns the next backoff duration to wait for
pub fn tick(&mut self) -> Duration {
let range = self.init_backoff..(self.next_backoff_secs * self.base);

let rand_backoff = match self.rng.as_mut() {
Some(rng) => rng.gen_range(range),
None => thread_rng().gen_range(range),
};

let next_backoff = self.max_backoff_secs.min(rand_backoff);
Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, next_backoff))
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::mock::StepRng;

#[test]
fn test_backoff() {
let init_backoff_secs = 1.;
let max_backoff_secs = 500.;
let base = 3.;

let config = BackoffConfig {
init_backoff: Duration::from_secs_f64(init_backoff_secs),
max_backoff: Duration::from_secs_f64(max_backoff_secs),
base,
};

let assert_fuzzy_eq = |a: f64, b: f64| assert!((b - a).abs() < 0.0001, "{a} != {b}");

// Create a static rng that takes the minimum of the range
let rng = Box::new(StepRng::new(0, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for _ in 0..20 {
assert_eq!(backoff.tick().as_secs_f64(), init_backoff_secs);
}

// Create a static rng that takes the maximum of the range
let rng = Box::new(StepRng::new(u64::MAX, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for i in 0..20 {
let value = (base.powi(i) * init_backoff_secs).min(max_backoff_secs);
assert_fuzzy_eq(backoff.tick().as_secs_f64(), value);
}

// Create a static rng that takes the mid point of the range
let rng = Box::new(StepRng::new(u64::MAX / 2, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

let mut value = init_backoff_secs;
for _ in 0..20 {
assert_fuzzy_eq(backoff.tick().as_secs_f64(), value);
value =
(init_backoff_secs + (value * base - init_backoff_secs) / 2.).min(max_backoff_secs);
}
}
}
94 changes: 94 additions & 0 deletions rust/src/data_catalog/client/mock_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use parking_lot::Mutex;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> Response<Body> + Send>;

/// A mock server
pub struct MockServer {
responses: Arc<Mutex<VecDeque<ResponseFn>>>,
shutdown: oneshot::Sender<()>,
handle: JoinHandle<()>,
url: String,
}

impl Default for MockServer {
fn default() -> Self {
Self::new()
}
}

impl MockServer {
pub fn new() -> Self {
let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(10)));

let r = Arc::clone(&responses);
let make_service = make_service_fn(move |_conn| {
let r = Arc::clone(&r);
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let r = Arc::clone(&r);
async move {
Ok::<_, Infallible>(match r.lock().pop_front() {
Some(r) => r(req),
None => Response::new(Body::from("Hello World")),
})
}
}))
}
});

let (shutdown, rx) = oneshot::channel::<()>();
let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0))).serve(make_service);

let url = format!("http://{}", server.local_addr());

let handle = tokio::spawn(async move {
server
.with_graceful_shutdown(async {
rx.await.ok();
})
.await
.unwrap()
});

Self {
responses,
shutdown,
handle,
url,
}
}

/// The url of the mock server
pub fn url(&self) -> &str {
&self.url
}

/// Add a response
pub fn push(&self, response: Response<Body>) {
self.push_fn(|_| response)
}

/// Add a response function
pub fn push_fn<F>(&self, f: F)
where
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
{
self.responses.lock().push_back(Box::new(f))
}

/// Shutdown the mock server
pub async fn shutdown(self) {
let _ = self.shutdown.send(());
self.handle.await.unwrap()
}
}
Loading

0 comments on commit 6746dd4

Please sign in to comment.