Skip to content

Commit

Permalink
chore: refactor into the deltalake meta crate and deltalake-core crates
Browse files Browse the repository at this point in the history
This puts the groundwork in place for starting to partition into smaller crates
in a simpler and more manageable fashion.

See #1713
  • Loading branch information
rtyler committed Oct 31, 2023
1 parent 8f0b2d7 commit d5ad4fb
Show file tree
Hide file tree
Showing 490 changed files with 317 additions and 236 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
[workspace]
members = ["delta-inspect", "rust", "python"]
members = [
"crates/*",
"delta-inspect",
"python",
]
exclude = ["proofs"]
resolver = "2"

Expand Down
3 changes: 3 additions & 0 deletions crates/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Delta Lake Rust crates

This directory contains all of the crates published by the [delta-rs](https://github.com/delta-io/delta-rs) project. These crates were originally split based on the proposal in [#1713](https://github.com/delta-io/delta-rs/discussions/1713).
File renamed without changes.
File renamed without changes.
14 changes: 1 addition & 13 deletions rust/Cargo.toml → crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "deltalake"
name = "deltalake-core"
version = "0.17.0"
rust-version = "1.64"
authors = ["Qingping Hou <[email protected]>"]
Expand Down Expand Up @@ -182,15 +182,3 @@ unity-experimental = ["reqwest", "tracing", "hyper"]
[[bench]]
name = "read_checkpoint"
harness = false

[[example]]
name = "basic_operations"
required-features = ["datafusion"]

[[example]]
name = "load_table"
required-features = ["datafusion"]

[[example]]
name = "recordbatch-writer"
required-features = ["arrow"]
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//!
//! async {
//! let mut ctx = SessionContext::new();
//! let table = deltalake::open_table("./tests/data/simple_table")
//! let table = deltalake_core::open_table("./tests/data/simple_table")
//! .await
//! .unwrap();
//! ctx.register_table("demo", Arc::new(table)).unwrap();
Expand Down
File renamed without changes.
12 changes: 6 additions & 6 deletions rust/src/lib.rs → crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! ```rust
//! async {
//! let table = deltalake::open_table("./tests/data/simple_table").await.unwrap();
//! let table = deltalake_core::open_table("./tests/data/simple_table").await.unwrap();
//! let files = table.get_files();
//! };
//! ```
Expand All @@ -15,10 +15,10 @@
//!
//! ```rust
//! async {
//! let table = deltalake::open_table_with_version("./tests/data/simple_table", 0).await.unwrap();
//! let files = table.get_files_by_partitions(&[deltalake::PartitionFilter {
//! let table = deltalake_core::open_table_with_version("./tests/data/simple_table", 0).await.unwrap();
//! let files = table.get_files_by_partitions(&[deltalake_core::PartitionFilter {
//! key: "month".to_string(),
//! value: deltalake::PartitionValue::Equal("12".to_string()),
//! value: deltalake_core::PartitionValue::Equal("12".to_string()),
//! }]);
//! };
//! ```
Expand All @@ -27,7 +27,7 @@
//!
//! ```rust
//! async {
//! let table = deltalake::open_table_with_ds(
//! let table = deltalake_core::open_table_with_ds(
//! "./tests/data/simple_table",
//! "2020-05-02T23:47:31-07:00",
//! ).await.unwrap();
Expand Down Expand Up @@ -56,7 +56,7 @@
//!
//! async {
//! let mut ctx = SessionContext::new();
//! let table = deltalake::open_table("./tests/data/simple_table")
//! let table = deltalake_core::open_table("./tests/data/simple_table")
//! .await
//! .unwrap();
//! ctx.register_table("demo", Arc::new(table)).unwrap();
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl DeltaOps {
/// Create a new [`DeltaOps`] instance, operating on [`DeltaTable`] at given uri.
///
/// ```
/// use deltalake::DeltaOps;
/// use deltalake_core::DeltaOps;
///
/// async {
/// let ops = DeltaOps::try_from_uri("memory://").await.unwrap();
Expand All @@ -82,7 +82,7 @@ impl DeltaOps {
/// The main purpose of in-memory tables is for use in testing.
///
/// ```
/// use deltalake::DeltaOps;
/// use deltalake_core::DeltaOps;
///
/// let ops = DeltaOps::new_in_memory();
/// ```
Expand All @@ -97,7 +97,7 @@ impl DeltaOps {
/// Create a new Delta table
///
/// ```
/// use deltalake::DeltaOps;
/// use deltalake_core::DeltaOps;
///
/// async {
/// let ops = DeltaOps::try_from_uri("memory://").await.unwrap();
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub struct DeltaTablePartition {
/// A HivePartition string is represented by a "key=value" format.
///
/// ```rust
/// use deltalake::DeltaTablePartition;
/// use deltalake_core::DeltaTablePartition;
///
/// let hive_part = "ds=2023-01-01";
/// let partition = DeltaTablePartition::try_from(hive_part).unwrap();
Expand Down Expand Up @@ -227,7 +227,7 @@ impl DeltaTablePartition {
/// Try to create a DeltaTable partition from a partition value kv pair.
///
/// ```rust
/// use deltalake::DeltaTablePartition;
/// use deltalake_core::DeltaTablePartition;
///
/// let value = ("ds", &Some("2023-01-01".to_string()));
/// let null_default = "1979-01-01";
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl S3StorageBackend {
///
/// ```rust
/// use object_store::aws::AmazonS3Builder;
/// use deltalake::storage::s3::{S3StorageBackend, S3StorageOptions};
/// use deltalake_core::storage::s3::{S3StorageBackend, S3StorageOptions};
/// use std::sync::Arc;
///
/// let inner = AmazonS3Builder::new()
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#[cfg(all(feature = "arrow", feature = "parquet"))]
mod fs_common;
use deltalake::protocol::DeltaOperation;
use deltalake_core::protocol::DeltaOperation;

// NOTE: The below is a useful external command for inspecting the written checkpoint schema visually:
// parquet-tools inspect tests/data/checkpoints/_delta_log/00000000000000000005.checkpoint.parquet

#[cfg(all(feature = "arrow", feature = "parquet"))]
mod simple_checkpoint {
use deltalake::*;
use deltalake_core::*;
use pretty_assertions::assert_eq;
use std::fs;
use std::path::{Path, PathBuf};
Expand All @@ -22,7 +22,7 @@ mod simple_checkpoint {
cleanup_checkpoint_files(log_path.as_path());

// Load the delta table at version 5
let mut table = deltalake::open_table_with_version(table_location, 5)
let mut table = deltalake_core::open_table_with_version(table_location, 5)
.await
.unwrap();

Expand All @@ -49,7 +49,7 @@ mod simple_checkpoint {
assert_eq!(10, version);

// delta table should load just fine with the checkpoint in place
let table_result = deltalake::open_table(table_location).await.unwrap();
let table_result = deltalake_core::open_table(table_location).await.unwrap();
let table = table_result;
let files = table.get_files();
assert_eq!(12, files.len());
Expand Down Expand Up @@ -90,8 +90,8 @@ mod delete_expired_delta_log_in_checkpoint {

use ::object_store::path::Path as ObjectStorePath;
use chrono::Utc;
use deltalake::table::config::DeltaConfigKey;
use deltalake::*;
use deltalake_core::table::config::DeltaConfigKey;
use deltalake_core::*;
use maplit::hashmap;

#[tokio::test]
Expand Down Expand Up @@ -211,9 +211,9 @@ mod checkpoints_with_tombstones {
use super::*;
use ::object_store::path::Path as ObjectStorePath;
use chrono::Utc;
use deltalake::protocol::*;
use deltalake::table::config::DeltaConfigKey;
use deltalake::*;
use deltalake_core::protocol::*;
use deltalake_core::table::config::DeltaConfigKey;
use deltalake_core::*;
use maplit::hashmap;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::types::Type;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#![cfg(feature = "integration_test")]

use deltalake::test_utils::{
use deltalake_core::test_utils::{
set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables,
};
use deltalake::Path;
use deltalake::{errors::DeltaTableError, DeltaOps};
use deltalake_core::Path;
use deltalake_core::{errors::DeltaTableError, DeltaOps};
use serial_test::serial;

mod common;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ use std::{collections::HashMap, error::Error, sync::Arc};
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use arrow_select::concat::concat_batches;
use deltalake::errors::DeltaTableError;
use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics, OptimizeType};
use deltalake::operations::transaction::commit;
use deltalake::operations::DeltaOps;
use deltalake::protocol::{Action, DeltaOperation, Remove};
use deltalake::storage::ObjectStoreRef;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::{DeltaTable, PartitionFilter, Path, SchemaDataType, SchemaField};
use deltalake_core::errors::DeltaTableError;
use deltalake_core::operations::optimize::{
create_merge_plan, MetricDetails, Metrics, OptimizeType,
};
use deltalake_core::operations::transaction::commit;
use deltalake_core::operations::DeltaOps;
use deltalake_core::protocol::{Action, DeltaOperation, Remove};
use deltalake_core::storage::ObjectStoreRef;
use deltalake_core::writer::{DeltaWriter, RecordBatchWriter};
use deltalake_core::{DeltaTable, PartitionFilter, Path, SchemaDataType, SchemaField};
use futures::TryStreamExt;
use object_store::ObjectStore;
use parquet::arrow::async_reader::ParquetObjectReader;
Expand Down Expand Up @@ -276,7 +278,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
let other_dt = deltalake::open_table(uri).await?;
let other_dt = deltalake_core::open_table(uri).await?;
let add = &other_dt.get_state().files()[0];
let remove = Remove {
path: add.path.clone(),
Expand Down Expand Up @@ -346,7 +348,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
let mut other_dt = deltalake::open_table(uri).await?;
let mut other_dt = deltalake_core::open_table(uri).await?;
let mut writer = RecordBatchWriter::for_table(&other_dt)?;
write(
&mut writer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use deltalake::protocol::SaveMode;
use deltalake::{DeltaOps, DeltaTable, SchemaDataType, SchemaField};
use deltalake_core::protocol::SaveMode;
use deltalake_core::{DeltaOps, DeltaTable, SchemaDataType, SchemaField};
use rand::Rng;
use std::collections::HashMap;
use std::error::Error;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use chrono::Duration;
use common::clock::TestClock;
use common::TestContext;
use deltalake::operations::vacuum::Clock;
use deltalake::operations::DeltaOps;
use deltalake::Schema;
use deltalake_core::operations::vacuum::Clock;
use deltalake_core::operations::DeltaOps;
use deltalake_core::Schema;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use serde_json::json;
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(dead_code)]
mod fs_common;

use deltalake::operations::transaction::commit;
use deltalake::protocol::{Action, DeltaOperation, SaveMode};
use deltalake_core::operations::transaction::commit;
use deltalake_core::protocol::{Action, DeltaOperation, SaveMode};
use serde_json::json;
use std::error::Error;
use tempdir::TempDir;
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{Duration, Utc};
use deltalake::operations::vacuum::Clock;
use deltalake_core::operations::vacuum::Clock;
use std::sync::{Arc, Mutex};

#[derive(Clone, Debug)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionConfig;
use deltalake::delta_datafusion::DeltaTableFactory;
use deltalake_core::delta_datafusion::DeltaTableFactory;
use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#![allow(dead_code, unused_variables)]

use bytes::Bytes;
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::transaction::commit;
use deltalake::protocol::{self, Add, DeltaOperation, Remove, SaveMode};
use deltalake::storage::DeltaObjectStore;
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaTable, Schema};
use deltalake_core::operations::create::CreateBuilder;
use deltalake_core::operations::transaction::commit;
use deltalake_core::protocol::{self, Add, DeltaOperation, Remove, SaveMode};
use deltalake_core::storage::DeltaObjectStore;
use deltalake_core::DeltaTableBuilder;
use deltalake_core::{DeltaTable, Schema};
use object_store::{path::Path, ObjectStore};
use std::any::Any;
use std::collections::HashMap;
Expand Down
File renamed without changes.
Loading

0 comments on commit d5ad4fb

Please sign in to comment.