Skip to content

Commit

Permalink
Merge branch 'main' into fix-num-rows-stat
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Nov 21, 2024
2 parents 2311974 + 25ce389 commit 34fddd7
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.3.1" }
delta_kernel = { version = "0.4.1", features = ["sync-engine"] }
# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" }

# arrow
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use arrow_schema::{DataType, Field};
use chrono::{DateTime, NaiveDate};
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::FunctionRegistry;
use datafusion::functions_array::make_array::MakeArray;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
Expand Down
6 changes: 2 additions & 4 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{
Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray, UInt64Array,
};
use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use chrono::{DateTime, Utc};
use delta_kernel::expressions::Scalar;
use indexmap::IndexMap;
Expand Down Expand Up @@ -484,7 +482,7 @@ mod datafusion {
use ::datafusion::physical_plan::Accumulator;
use arrow::compute::concat_batches;
use arrow_arith::aggregate::sum;
use arrow_array::{ArrayRef, BooleanArray, Int64Array};
use arrow_array::{ArrayRef, BooleanArray, Int64Array, UInt64Array};
use arrow_schema::DataType as ArrowDataType;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
Expand Down
43 changes: 9 additions & 34 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,24 +220,14 @@ impl Snapshot {
schema_actions.insert(ActionType::Add);
let checkpoint_stream = self.log_segment.checkpoint_stream(
store.clone(),
&StructType::new(
schema_actions
.iter()
.map(|a| a.schema_field().clone())
.collect(),
),
&StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())),
&self.config,
);

schema_actions.insert(ActionType::Remove);
let log_stream = self.log_segment.commit_stream(
store.clone(),
&StructType::new(
schema_actions
.iter()
.map(|a| a.schema_field().clone())
.collect(),
),
&StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())),
&self.config,
)?;

Expand Down Expand Up @@ -460,12 +450,8 @@ impl EagerSnapshot {
// NOTE: we don't need to add the visitor relevant data here, as it is repüresented in teh state already
futures::stream::iter(files.into_iter().map(Ok)).boxed()
} else {
let read_schema = StructType::new(
schema_actions
.iter()
.map(|a| a.schema_field().clone())
.collect(),
);
let read_schema =
StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
new_slice
.checkpoint_stream(
log_store.object_store(),
Expand All @@ -476,12 +462,7 @@ impl EagerSnapshot {
};

schema_actions.insert(ActionType::Remove);
let read_schema = StructType::new(
schema_actions
.iter()
.map(|a| a.schema_field().clone())
.collect(),
);
let read_schema = StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
let log_stream = new_slice.commit_stream(
log_store.object_store().clone(),
&read_schema,
Expand Down Expand Up @@ -618,12 +599,7 @@ impl EagerSnapshot {
let mut schema_actions: HashSet<_> =
visitors.iter().flat_map(|v| v.required_actions()).collect();
schema_actions.extend([ActionType::Add, ActionType::Remove]);
let read_schema = StructType::new(
schema_actions
.iter()
.map(|a| a.schema_field().clone())
.collect(),
);
let read_schema = StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
let actions = self.snapshot.log_segment.advance(
send,
&self.table_root(),
Expand Down Expand Up @@ -712,7 +688,7 @@ fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult<Str
StructField::new("maxValues", StructType::new(stats_fields.clone()), true),
StructField::new(
"nullCount",
StructType::new(stats_fields.iter().filter_map(to_count_field).collect()),
StructType::new(stats_fields.iter().filter_map(to_count_field)),
true,
),
]))
Expand Down Expand Up @@ -751,8 +727,7 @@ fn stats_field(idx: usize, num_indexed_cols: i32, field: &StructField) -> Option
StructType::new(
dt_struct
.fields()
.flat_map(|f| stats_field(idx, num_indexed_cols, f))
.collect(),
.flat_map(|f| stats_field(idx, num_indexed_cols, f)),
),
true,
)),
Expand All @@ -769,7 +744,7 @@ fn to_count_field(field: &StructField) -> Option<StructField> {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None,
DataType::Struct(s) => Some(StructField::new(
field.name(),
StructType::new(s.fields().filter_map(to_count_field).collect::<Vec<_>>()),
StructType::new(s.fields().filter_map(to_count_field)),
true,
)),
_ => Some(StructField::new(field.name(), DataType::LONG, true)),
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
use tracing::*;
use url::Url;

use super::transaction::PROTOCOL;
use super::writer::{PartitionWriter, PartitionWriterConfig};
Expand Down Expand Up @@ -1213,6 +1212,8 @@ pub(super) mod zorder {
#[cfg(feature = "datafusion")]
pub(super) mod datafusion {
use super::*;
use url::Url;

use ::datafusion::{
execution::{
memory_pool::FairSpillPool,
Expand Down Expand Up @@ -1245,7 +1246,7 @@ pub(super) mod zorder {

let memory_pool = FairSpillPool::new(max_spill_size);
let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool));
let runtime = Arc::new(RuntimeEnv::new(config)?);
let runtime = Arc::new(RuntimeEnv::try_new(config)?);
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ mod tests {
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration,
};
use crate::{DeltaTable, TableProperty};
use arrow::array::types::Int32Type;
use arrow::array::{Int32Array, ListArray, StringArray};
use arrow::datatypes::Schema as ArrowSchema;
use arrow::datatypes::{Field, Schema};
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/test_utils/factories/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ impl ActionFactory {
schema
.fields()
.filter(|f| !partition_columns.contains(f.name()))
.cloned()
.collect(),
.cloned(),
);

let batch = DataFactory::record_batch(&data_schema, 10, &bounds).unwrap();
Expand Down

0 comments on commit 34fddd7

Please sign in to comment.