Skip to content

Commit

Permalink
chore: update datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap authored and rtyler committed Jan 4, 2024
1 parent 2317602 commit 22e6fea
Show file tree
Hide file tree
Showing 22 changed files with 138 additions and 111 deletions.
34 changes: 17 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "48.0.1" }
arrow-arith = { version = "48.0.1" }
arrow-array = { version = "48.0.1" }
arrow-buffer = { version = "48.0.1" }
arrow-cast = { version = "48.0.1" }
arrow-ord = { version = "48.0.1" }
arrow-row = { version = "48.0.1" }
arrow-schema = { version = "48.0.1" }
arrow-select = { version = "48.0.1" }
object_store = { version = "0.7.1" }
parquet = { version = "48.0.1" }
arrow = { version = "49" }
arrow-arith = { version = "49" }
arrow-array = { version = "49" }
arrow-buffer = { version = "49" }
arrow-cast = { version = "49" }
arrow-ord = { version = "49" }
arrow-row = { version = "49" }
arrow-schema = { version = "49" }
arrow-select = { version = "49" }
object_store = { version = "0.8" }
parquet = { version = "49" }

# datafusion
datafusion = { version = "33.0.0" }
datafusion-expr = { version = "33.0.0" }
datafusion-common = { version = "33.0.0" }
datafusion-proto = { version = "33.0.0" }
datafusion-sql = { version = "33.0.0" }
datafusion-physical-expr = { version = "33.0.0" }
datafusion = { version = "34" }
datafusion-expr = { version = "34" }
datafusion-common = { version = "34" }
datafusion-proto = { version = "34" }
datafusion-sql = { version = "34" }
datafusion-physical-expr = { version = "34" }


# serde
Expand Down
26 changes: 16 additions & 10 deletions crates/deltalake-aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use bytes::Bytes;
use deltalake_core::storage::object_store::{
aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result as ObjectStoreResult,
ObjectMeta, ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult,
};
use deltalake_core::storage::{str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, ObjectStoreError, Path};
Expand Down Expand Up @@ -220,10 +220,19 @@ impl std::fmt::Debug for S3StorageBackend {

#[async_trait::async_trait]
impl ObjectStore for S3StorageBackend {
async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> {
async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<PutResult> {
self.inner.put(location, bytes).await
}

async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
options: PutOptions,
) -> ObjectStoreResult<PutResult> {
self.inner.put_opts(location, bytes, options).await
}

async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
self.inner.get(location).await
}
Expand All @@ -244,19 +253,16 @@ impl ObjectStore for S3StorageBackend {
self.inner.delete(location).await
}

async fn list(
&self,
prefix: Option<&Path>,
) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
self.inner.list(prefix).await
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
self.inner.list(prefix)
}

async fn list_with_offset(
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
self.inner.list_with_offset(prefix, offset).await
) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
Expand Down
24 changes: 19 additions & 5 deletions crates/deltalake-aws/tests/repair_s3_rename_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use deltalake_aws::storage::S3StorageBackend;
use deltalake_core::storage::object_store::{
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, Result as ObjectStoreResult,
ObjectMeta, PutOptions, PutResult, Result as ObjectStoreResult,
};
use deltalake_core::{DeltaTableBuilder, ObjectStore, Path};
use deltalake_test::utils::IntegrationContext;
Expand Down Expand Up @@ -166,10 +166,19 @@ impl ObjectStore for DelayedObjectStore {
self.delete(from).await
}

async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> {
async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<PutResult> {
self.inner.put(location, bytes).await
}

async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
options: PutOptions,
) -> ObjectStoreResult<PutResult> {
self.inner.put_opts(location, bytes, options).await
}

async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
self.inner.get(location).await
}
Expand All @@ -190,11 +199,16 @@ impl ObjectStore for DelayedObjectStore {
self.inner.delete(location).await
}

async fn list(
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
self.inner.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
self.inner.list(prefix).await
offset: &Path,
) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
Expand Down
6 changes: 3 additions & 3 deletions crates/deltalake-azure/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use context::*;

static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"];
/// TEST_PREFIXES as they should appear in object stores.
static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"];
static TEST_PREFIXES_ENCODED: &[&str] = &["my table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"];

#[tokio::test]
#[serial]
Expand All @@ -21,8 +21,8 @@ async fn test_read_tables_azure() -> TestResult {

test_read_tables(&context).await?;

for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) {
read_table_paths(&context, prefix, prefix_encoded).await?;
for (prefix, _prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) {
read_table_paths(&context, prefix, prefix).await?;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ lazy_static = "1"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
object_store = "0.7"
object_store = { workspace = true }
once_cell = "1.16.0"
parking_lot = "0.12"
percent-encoding = "2"
Expand All @@ -93,7 +93,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
sqlparser = { version = "0.39", optional = true }
sqlparser = { version = "0.40", optional = true }

[dev-dependencies]
criterion = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ListingSchemaProvider {

/// Reload table information from ObjectStore
pub async fn refresh(&self) -> datafusion_common::Result<()> {
let entries: Vec<_> = self.store.list(None).await?.try_collect().await?;
let entries: Vec<_> = self.store.list(None).try_collect().await?;
let mut tables = HashSet::new();
for file in entries.iter() {
let mut parent = Path::new(file.location.as_ref());
Expand Down
6 changes: 2 additions & 4 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use datafusion::execution::context::SessionState;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::{
expr::{InList, ScalarUDF},
AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource,
expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource,
};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use sqlparser::ast::escape_quoted_string;
Expand Down Expand Up @@ -186,8 +185,7 @@ impl<'a> Display for SqlFormat<'a> {
Expr::IsNotFalse(expr) => write!(f, "{} IS NOT FALSE", SqlFormat { expr }),
Expr::IsNotUnknown(expr) => write!(f, "{} IS NOT UNKNOWN", SqlFormat { expr }),
Expr::BinaryExpr(expr) => write!(f, "{}", BinaryExprFormat { expr }),
Expr::ScalarFunction(func) => fmt_function(f, &func.fun.to_string(), false, &func.args),
Expr::ScalarUDF(ScalarUDF { fun, args }) => fmt_function(f, &fun.name, false, args),
Expr::ScalarFunction(func) => fmt_function(f, func.func_def.name(), false, &func.args),
Expr::Cast(Cast { expr, data_type }) => {
write!(f, "arrow_cast({}, '{}')", SqlFormat { expr }, data_type)
}
Expand Down
34 changes: 17 additions & 17 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider,
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::filter::FilterExec;
Expand All @@ -60,8 +59,9 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema};
use datafusion_expr::expr::{ScalarFunction, ScalarUDF};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
Expand Down Expand Up @@ -1329,22 +1329,21 @@ impl TreeNodeVisitor for FindFilesExprProperties {
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_) => (),
Expr::ScalarFunction(ScalarFunction { fun, .. }) => {
let v = fun.volatility();
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
"Find files predicate contains nondeterministic function {}",
fun
)));
return Ok(VisitRecursion::Stop);
}
}
Expr::ScalarUDF(ScalarUDF { fun, .. }) => {
let v = fun.signature.volatility;
Expr::ScalarFunction(ScalarFunction { func_def, .. }) => {
let v = match func_def {
datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(),
datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility,
datafusion_expr::ScalarFunctionDefinition::Name(n) => {
self.result = Err(DeltaTableError::Generic(format!(
"Cannot determine volatility of find files predicate function {n}",
)));
return Ok(VisitRecursion::Stop);
}
};
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
"Find files predicate contains nondeterministic function {}",
fun.name
func_def.name()
)));
return Ok(VisitRecursion::Stop);
}
Expand Down Expand Up @@ -1804,7 +1803,8 @@ mod tests {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
size: 10644,
e_tag: None
e_tag: None,
version: None,
},
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
range: None,
Expand Down Expand Up @@ -1894,7 +1894,7 @@ mod tests {
]));
let exec_plan = Arc::from(DeltaScan {
table_uri: "s3://my_bucket/this/is/some/path".to_string(),
parquet_scan: Arc::from(EmptyExec::new(false, schema.clone())),
parquet_scan: Arc::from(EmptyExec::new(schema.clone())),
config: DeltaScanConfig::default(),
logical_schema: schema.clone(),
});
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub trait LogStore: Sync + Send {
async fn is_delta_table_location(&self) -> DeltaResult<bool> {
// TODO We should really be using HEAD here, but this fails in windows tests
let object_store = self.object_store();
let mut stream = object_store.list(Some(self.log_path())).await?;
let mut stream = object_store.list(Some(self.log_path()));
if let Some(res) = stream.next().await {
match res {
Ok(_) => Ok(true),
Expand Down Expand Up @@ -386,7 +386,7 @@ pub async fn get_latest_version(
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(max_version);
let object_store = log_store.object_store();
let mut files = object_store.list_with_offset(prefix, &offset_path).await?;
let mut files = object_store.list_with_offset(prefix, &offset_path);

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ impl ConvertToDeltaBuilder {
let mut files = Vec::new();
object_store
.list(None)
.await?
.try_for_each_concurrent(10, |meta| {
if Some("parquet") == meta.location.extension() {
debug!("Found parquet file {:#?}", meta.location);
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl FileSystemCheckBuilder {
}

let object_store = log_store.object_store();
let mut files = object_store.list(None).await?;
let mut files = object_store.list(None);
while let Some(result) = files.next().await {
let file = result?;
files_relative.remove(file.location.as_ref());
Expand Down
13 changes: 3 additions & 10 deletions crates/deltalake-core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,13 +723,6 @@ fn generalize_filter(
None
}
}
Expr::ScalarUDF(udf) => {
if udf.args.len() == 1 {
references_table(&udf.args[0], table)
} else {
None
}
}
_ => None,
}
}
Expand Down Expand Up @@ -863,16 +856,16 @@ async fn try_construct_early_filter(
} else {
// if we have some recognised partitions, then discover the distinct set of partitions in the source data and
// make a new filter, which expands out the placeholders for each distinct partition (and then OR these together)
let distinct_partitions = LogicalPlan::Distinct(Distinct {
input: LogicalPlan::Projection(Projection::try_new(
let distinct_partitions = LogicalPlan::Distinct(Distinct::All(
LogicalPlan::Projection(Projection::try_new(
placeholders
.into_iter()
.map(|(alias, expr)| expr.alias(alias))
.collect_vec(),
source.clone().into(),
)?)
.into(),
});
));

let execution_plan = session_state
.create_physical_plan(&distinct_partitions)
Expand Down
Loading

0 comments on commit 22e6fea

Please sign in to comment.