Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion-imports #823

Merged
merged 3 commits into from
Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 31 additions & 24 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object_store = "0.5.0"
once_cell = "1.12.0"
parquet = { version = "22", features = ["async"], optional = true }
parquet2 = { version = "0.16", optional = true }
parquet-format = "~4.0.0"
parquet-format = { version = "~4.0.0" }
percent-encoding = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand All @@ -44,17 +44,44 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true
# Glue
rusoto_glue = { version = "0.48", default-features = false, optional = true }

# Datafusion
datafusion = { version = "12", optional = true }
datafusion-expr = { version = "12", optional = true }
datafusion-common = { version = "12", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
tempdir = { version = "0", optional = true }

[dependencies.datafusion]
version = "12"
[dependencies.dynamodb_lock]
path = "../dynamodb_lock"
version = "0"
default-features = false
optional = true

[dev-dependencies]
criterion = "0"
dotenv = "*"
maplit = "1"
pretty_assertions = "1.2.1"
rand = "0.8"
serial_test = "0"
tempdir = "0"
tempfile = "3"
utime = "0.3"

[build-dependencies]
glibc_version = "0"

[features]
default = ["arrow", "parquet"]
datafusion-ext = ["datafusion"]
datafusion-ext = [
"datafusion",
"datafusion-expr",
"datafusion-common",
"arrow",
"parquet",
]
azure = ["object_store/azure"]
gcs = ["object_store/gcp"]
s3 = [
Expand All @@ -78,26 +105,6 @@ python = ["arrow/pyarrow"]
# used only for integration testing
integration_test = ["fs_extra", "tempdir"]

[build-dependencies]
glibc_version = "0"

[dependencies.dynamodb_lock]
path = "../dynamodb_lock"
version = "0"
default-features = false
optional = true

[dev-dependencies]
criterion = "0"
dotenv = "*"
maplit = "1"
pretty_assertions = "1.3.0"
rand = "0.8"
serial_test = "0"
tempdir = "0"
tempfile = "3"
utime = "0.3"

[[bench]]
name = "read_checkpoint"
harness = false
48 changes: 19 additions & 29 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,27 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use crate::action;
use crate::schema;
use crate::{DeltaTable, DeltaTableError};

use arrow::array::ArrayRef;
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::datasource::{listing::PartitionedFile, TableProvider, TableType};
use datafusion::execution::context::SessionState;
use datafusion::logical_plan::{combine_filters, Column, Expr};
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use datafusion::scalar::ScalarValue;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult};
use datafusion_expr::{combine_filters, Expr};
use object_store::{path::Path, ObjectMeta};
use url::Url;

use crate::action;
use crate::delta;
use crate::schema;
use crate::DeltaTableError;

impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
Expand All @@ -60,7 +58,7 @@ impl From<DeltaTableError> for DataFusionError {
}
}

impl From<DataFusionError> for crate::DeltaTableError {
impl From<DataFusionError> for DeltaTableError {
fn from(err: DataFusionError) -> Self {
match err {
DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source },
Expand All @@ -72,7 +70,7 @@ impl From<DataFusionError> for crate::DeltaTableError {
}
}

impl delta::DeltaTable {
impl DeltaTable {
/// Return statistics for Datafusion Table
pub fn datafusion_table_statistics(&self) -> Statistics {
let stats = self
Expand Down Expand Up @@ -222,7 +220,7 @@ impl delta::DeltaTable {
}
}

impl PruningStatistics for delta::DeltaTable {
impl PruningStatistics for DeltaTable {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
Expand Down Expand Up @@ -294,13 +292,11 @@ impl PruningStatistics for delta::DeltaTable {
}

#[async_trait]
impl TableProvider for delta::DeltaTable {
impl TableProvider for DeltaTable {
fn schema(&self) -> Arc<ArrowSchema> {
Arc::new(
<ArrowSchema as TryFrom<&schema::Schema>>::try_from(
delta::DeltaTable::schema(self).unwrap(),
)
.unwrap(),
<ArrowSchema as TryFrom<&schema::Schema>>::try_from(DeltaTable::schema(self).unwrap())
.unwrap(),
)
}

Expand All @@ -316,7 +312,7 @@ impl TableProvider for delta::DeltaTable {
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let schema = Arc::new(<ArrowSchema as TryFrom<&schema::Schema>>::try_from(
delta::DeltaTable::schema(self).unwrap(),
DeltaTable::schema(self).unwrap(),
)?);

// each delta table must register a specific object store, since paths are internally
Expand Down Expand Up @@ -421,7 +417,7 @@ fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> P
}
}

fn to_scalar_value(stat_val: &serde_json::Value) -> Option<datafusion::scalar::ScalarValue> {
fn to_scalar_value(stat_val: &serde_json::Value) -> Option<ScalarValue> {
match stat_val {
serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)),
serde_json::Value::Number(num) => {
Expand All @@ -443,7 +439,7 @@ fn to_scalar_value(stat_val: &serde_json::Value) -> Option<datafusion::scalar::S
fn to_correct_scalar_value(
stat_val: &serde_json::Value,
field_dt: &ArrowDataType,
) -> Option<datafusion::scalar::ScalarValue> {
) -> Option<ScalarValue> {
match stat_val {
serde_json::Value::Array(_) => None,
serde_json::Value::Object(_) => None,
Expand Down Expand Up @@ -485,10 +481,7 @@ fn to_correct_scalar_value(
}
}

fn correct_scalar_value_type(
value: datafusion::scalar::ScalarValue,
field_dt: &ArrowDataType,
) -> Option<datafusion::scalar::ScalarValue> {
fn correct_scalar_value_type(value: ScalarValue, field_dt: &ArrowDataType) -> Option<ScalarValue> {
match field_dt {
ArrowDataType::Int64 => {
let raw_value = i64::try_from(value).ok()?;
Expand Down Expand Up @@ -565,10 +558,7 @@ fn correct_scalar_value_type(
}
}

fn left_larger_than_right(
left: datafusion::scalar::ScalarValue,
right: datafusion::scalar::ScalarValue,
) -> Option<bool> {
fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option<bool> {
match left {
ScalarValue::Float64(Some(v)) => {
let f_right = f64::try_from(right).ok()?;
Expand Down
8 changes: 5 additions & 3 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Command for creating a new delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala
use std::sync::Arc;

use super::{
to_datafusion_err,
transaction::{serialize_actions, OPERATION_SCHEMA},
Expand All @@ -9,11 +11,11 @@ use crate::{
action::{Action, DeltaOperation, MetaData, Protocol, SaveMode},
DeltaTableBuilder, DeltaTableMetaData,
};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use core::any::Any;
use datafusion::{
arrow::datatypes::SchemaRef,
error::{DataFusionError, Result as DataFusionResult},
execution::context::TaskContext,
physical_plan::{
common::{compute_record_batch_statistics, SizedRecordBatchStream},
Expand All @@ -23,8 +25,8 @@ use datafusion::{
Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
},
};
use datafusion_common::{DataFusionError, Result as DataFusionResult};
use futures::{TryFutureExt, TryStreamExt};
use std::sync::Arc;

/// Command for creating new delta table
pub struct CreateCommand {
Expand Down
12 changes: 7 additions & 5 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
//! High level delta commands that can be executed against a delta table
// TODO
// - rename to delta operations
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;

use crate::{
action::{DeltaOperation, Protocol, SaveMode},
builder::DeltaTableBuilder,
Expand All @@ -9,16 +14,13 @@ use crate::{
writer::{record_batch::divide_by_partition_values, utils::PartitionPath},
DeltaTable, DeltaTableError, DeltaTableMetaData,
};

use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError, record_batch::RecordBatch};
use datafusion::{
error::DataFusionError,
physical_plan::{collect, memory::MemoryExec, ExecutionPlan},
prelude::SessionContext,
};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
use datafusion_common::DataFusionError;

pub mod create;
pub mod transaction;
Expand Down
20 changes: 10 additions & 10 deletions rust/src/operations/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
//! Wrapper Execution plan to handle distributed operations
use core::any::Any;
use std::sync::Arc;

use super::*;
use crate::action::Action;
use crate::schema::DeltaDataTypeVersion;
use core::any::Any;

use arrow::array::StringArray;
use arrow::datatypes::{
DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use arrow::record_batch::RecordBatch;
use datafusion::{
arrow::{
array::StringArray,
datatypes::{
DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
},
record_batch::RecordBatch,
},
error::Result as DataFusionResult,
execution::context::TaskContext,
physical_plan::{
coalesce_partitions::CoalescePartitionsExec, common::compute_record_batch_statistics,
empty::EmptyExec, expressions::PhysicalSortExpr, stream::RecordBatchStreamAdapter,
Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
},
};
use datafusion_common::Result as DataFusionResult;
use futures::{TryFutureExt, TryStreamExt};
use lazy_static::lazy_static;
use std::sync::Arc;

lazy_static! {
/// Schema expected for plans wrapped by transaction
Expand Down
22 changes: 11 additions & 11 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
//! replace data that matches a predicate.

// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala
use core::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use super::{
create::CreateCommand,
transaction::{serialize_actions, OPERATION_SCHEMA},
*,
};
use crate::{
action::{Action, Add, Remove, SaveMode},
writer::{DeltaWriter, RecordBatchWriter},
Schema,
};
use core::any::Any;
use crate::action::{Action, Add, Remove, SaveMode};
use crate::writer::{DeltaWriter, RecordBatchWriter};
use crate::Schema;

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::{
arrow::datatypes::SchemaRef as ArrowSchemaRef,
error::Result as DataFusionResult,
execution::context::TaskContext,
physical_plan::{
common::{
Expand All @@ -42,10 +44,8 @@ use datafusion::{
SendableRecordBatchStream, Statistics,
},
};
use datafusion_common::Result as DataFusionResult;
use futures::{TryFutureExt, TryStreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

const MAX_SUPPORTED_WRITER_VERSION: i32 = 1;

Expand Down
9 changes: 9 additions & 0 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ impl DeltaObjectStore {
}
Ok(())
}

/// Check if the location is a delta table location
pub async fn is_delta_table_location(&self) -> ObjectStoreResult<bool> {
match self.head(self.log_path()).await {
Ok(_) => Ok(true),
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(err) => Err(err),
}
}
}

#[async_trait::async_trait]
Expand Down
Loading