Skip to content

Commit

Permalink
refactor: increase metadata action usage
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 4, 2024
1 parent 7981b95 commit 3701460
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 79 deletions.
10 changes: 3 additions & 7 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,11 +588,7 @@ impl<'a> DeltaScanBuilder<'a> {
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();

let table_partition_cols = &self
.snapshot
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;
let table_partition_cols = &self.snapshot.metadata()?.partition_columns;

for action in files.iter() {
let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);
Expand Down Expand Up @@ -1095,7 +1091,7 @@ impl DeltaDataChecker {

/// Create a new DeltaDataChecker
pub fn new(snapshot: &DeltaTableState) -> Self {
let metadata = snapshot.metadata();
let metadata = snapshot.delta_metadata();

let invariants = metadata
.and_then(|meta| meta.schema.get_invariants().ok())
Expand Down Expand Up @@ -1539,7 +1535,7 @@ pub async fn find_files<'a>(
state: &SessionState,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata()?;

match &predicate {
Some(predicate) => {
Expand Down
14 changes: 4 additions & 10 deletions crates/deltalake-core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,26 +137,20 @@ impl TryFrom<&DataType> for ArrowDataType {
DataType::Struct(s) => Ok(ArrowDataType::Struct(
s.fields()
.iter()
.map(<ArrowField as TryFrom<&StructField>>::try_from)
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?
.into(),
)),
DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(<ArrowField as TryFrom<
&ArrayType,
>>::try_from(a)?))),
DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(a.as_ref().try_into()?))),
DataType::Map(m) => Ok(ArrowDataType::Map(
Arc::new(ArrowField::new(
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new(
MAP_KEYS_NAME,
<ArrowDataType as TryFrom<&DataType>>::try_from(m.key_type())?,
false,
),
ArrowField::new(MAP_KEYS_NAME, m.key_type().try_into()?, false),
ArrowField::new(
MAP_VALUES_NAME,
<ArrowDataType as TryFrom<&DataType>>::try_from(m.value_type())?,
m.value_type().try_into()?,
m.value_contains_null(),
),
]
Expand Down
10 changes: 3 additions & 7 deletions crates/deltalake-core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext,
};
use crate::kernel::{Action, CommitInfo, IsolationLevel, Metadata, Protocol};
use crate::kernel::{Action, CommitInfo, IsolationLevel, Protocol};
use crate::logstore::LogStoreRef;
use crate::operations::datafusion_utils::Expression;
use crate::operations::transaction::commit;
Expand Down Expand Up @@ -86,11 +86,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
.expr
.ok_or_else(|| DeltaTableError::Generic("No Expresion provided".to_string()))?;

let mut metadata = this
.snapshot
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.clone();
let mut metadata = this.snapshot.metadata()?.clone();
let configuration_key = format!("delta.constraints.{}", name);

if metadata.configuration.contains_key(&configuration_key) {
Expand Down Expand Up @@ -190,7 +186,7 @@ impl std::future::IntoFuture for ConstraintBuilder {

let actions = vec![
Action::CommitInfo(commit_info),
Action::Metadata(Metadata::try_from(metadata)?),
Action::Metadata(metadata),
Action::Protocol(protocol),
];

Expand Down
8 changes: 2 additions & 6 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use super::datafusion_utils::Expression;
use super::transaction::PROTOCOL;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder, DeltaSessionContext};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
use crate::operations::transaction::commit;
use crate::operations::write::write_execution_plan;
Expand Down Expand Up @@ -138,11 +138,7 @@ async fn excute_non_empty_expr(
let input_schema = snapshot.input_schema()?;
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

let table_partition_cols = snapshot
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.clone();
let table_partition_cols = snapshot.metadata()?.partition_columns.clone();

let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state)
.with_files(rewrite)
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ async fn try_construct_early_filter(
) -> DeltaResult<Option<Expr>> {
let table_metadata = table_snapshot.metadata();

if table_metadata.is_none() {
if table_metadata.is_err() {
return Ok(None);
}

Expand Down Expand Up @@ -921,7 +921,7 @@ async fn execute(
let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();

let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata()?;

// TODO: Given the join predicate, remove any expression that involve the
// source table and keep expressions that only involve the target table.
Expand Down
15 changes: 4 additions & 11 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,7 @@ pub fn create_merge_plan(
) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());

let partitions_keys = &snapshot
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;
let partitions_keys = &snapshot.metadata()?.partition_columns;

let (operations, metrics) = match optimize_type {
OptimizeType::Compact => {
Expand All @@ -792,10 +789,7 @@ pub fn create_merge_plan(
let file_schema = arrow_schema_without_partitions(
&Arc::new(
<ArrowSchema as TryFrom<&crate::kernel::StructType>>::try_from(
&snapshot
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.schema,
&snapshot.metadata()?.schema()?,
)?,
),
partitions_keys,
Expand Down Expand Up @@ -945,9 +939,8 @@ fn build_zorder_plan(
)));
}
let field_names = snapshot
.metadata()
.unwrap()
.schema
.metadata()?
.schema()?
.fields()
.iter()
.map(|field| field.name().to_string())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ impl<'a> ConflictChecker<'a> {
.txn_info
.read_snapshot
.metadata()
.ok_or(CommitConflictError::NoMetadata)?
.map_err(|_|CommitConflictError::NoMetadata)?
.partition_columns;
AddContainer::new(&added_files_to_check, partition_columns, arrow_schema)
.predicate_matches(predicate.clone())
Expand Down
8 changes: 4 additions & 4 deletions crates/deltalake-core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl DeltaTableState {
}

fn _arrow_schema(&self, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
let meta = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let meta = self.delta_metadata().ok_or(DeltaTableError::NoMetadata)?;
let fields = meta
.schema
.fields()
Expand Down Expand Up @@ -298,7 +298,7 @@ impl PruningStatistics for DeltaTableState {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.metadata()?.partition_columns;
let partition_columns = &self.metadata().ok()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.min_values(column)
Expand All @@ -307,7 +307,7 @@ impl PruningStatistics for DeltaTableState {
/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.metadata()?.partition_columns;
let partition_columns = &self.metadata().ok()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.max_values(column)
Expand All @@ -324,7 +324,7 @@ impl PruningStatistics for DeltaTableState {
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.metadata()?.partition_columns;
let partition_columns = &self.metadata().ok()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.null_counts(column)
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::kernel::{Action, Remove};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
use crate::{DeltaResult, DeltaTable};

/// Updates records in the Delta Table.
/// See this module's documentation for more information
Expand Down Expand Up @@ -209,7 +209,7 @@ async fn execute(
})
.collect::<Result<HashMap<Column, Expr>, _>>()?;

let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata()?;
let table_partition_cols = current_metadata.partition_columns.clone();

let scan_start = Instant::now();
Expand Down
9 changes: 4 additions & 5 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ enum VacuumError {
/// Error returned
#[error(transparent)]
DeltaTable(#[from] DeltaTableError),

#[error(transparent)]
Protocol(#[from] crate::protocol::ProtocolError),
}

impl From<VacuumError> for DeltaTableError {
Expand Down Expand Up @@ -191,11 +194,7 @@ impl VacuumBuilder {
let mut file_sizes = vec![];
let object_store = self.log_store.object_store();
let mut all_files = object_store.list(None);
let partition_columns = &self
.snapshot
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;
let partition_columns = &self.snapshot.metadata()?.partition_columns;

while let Some(obj_meta) = all_files.next().await {
// TODO should we allow NotFound here in case we have a temporary commit file in the list
Expand Down
16 changes: 6 additions & 10 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::writer::{DeltaWriter, WriterConfig};
use super::{transaction::commit, CreateBuilder};
use crate::delta_datafusion::DeltaDataChecker;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Add, Metadata, Remove, StructType};
use crate::kernel::{Action, Add, Remove, StructType};
use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
Expand Down Expand Up @@ -376,7 +376,7 @@ impl std::future::IntoFuture for WriteBuilder {

let active_partitions = this
.snapshot
.metadata()
.delta_metadata()
.map(|meta| meta.partition_columns.clone());

// validate partition columns
Expand Down Expand Up @@ -492,14 +492,10 @@ impl std::future::IntoFuture for WriteBuilder {
.unwrap_or(schema.clone());

if schema != table_schema {
let mut metadata = this
.snapshot
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.clone();
metadata.schema = schema.clone().try_into()?;
let metadata_action = Metadata::try_from(metadata)?;
actions.push(Action::Metadata(metadata_action));
let mut metadata = this.snapshot.metadata()?.clone();
let delta_schema: StructType = schema.as_ref().try_into()?;
metadata.schema_string = serde_json::to_string(&delta_schema)?;
actions.push(Action::Metadata(metadata));
}
// This should never error, since now() will always be larger than UNIX_EPOCH
let deletion_timestamp = SystemTime::now()
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub async fn cleanup_expired_logs_for(
fn parquet_bytes_from_state(
state: &DeltaTableState,
) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> {
let current_metadata = state.metadata().ok_or(ProtocolError::NoMetaData)?;
let current_metadata = state.delta_metadata().ok_or(ProtocolError::NoMetaData)?;

let partition_col_data_types = current_metadata.get_partition_col_data_types();

Expand Down
14 changes: 8 additions & 6 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,15 @@ impl DeltaTable {

/// Returns the metadata associated with the loaded state.
pub fn metadata(&self) -> Result<&Metadata, DeltaTableError> {
Ok(self.state.metadata_action()?)
Ok(self.state.metadata()?)
}

/// Returns the metadata associated with the loaded state.
#[deprecated(since = "0.17.0", note = "use metadata() instead")]
pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> {
self.state.metadata().ok_or(DeltaTableError::NoMetadata)
self.state
.delta_metadata()
.ok_or(DeltaTableError::NoMetadata)
}

/// Returns a vector of active tombstones (i.e. `Remove` actions present in the current delta log).
Expand Down Expand Up @@ -855,7 +857,7 @@ impl DeltaTable {
pub fn get_configurations(&self) -> Result<&HashMap<String, Option<String>>, DeltaTableError> {
Ok(self
.state
.metadata()
.delta_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.get_configuration())
}
Expand Down Expand Up @@ -905,10 +907,10 @@ impl fmt::Display for DeltaTable {
writeln!(f, "DeltaTable({})", self.table_uri())?;
writeln!(f, "\tversion: {}", self.version())?;
match self.state.metadata() {
Some(metadata) => {
writeln!(f, "\tmetadata: {metadata}")?;
Ok(metadata) => {
writeln!(f, "\tmetadata: {metadata:?}")?;
}
None => {
_ => {
writeln!(f, "\tmetadata: None")?;
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/deltalake-core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ impl DeltaTableState {
}

/// The most recent metadata of the table.
pub fn metadata_action(&self) -> Result<&Metadata, ProtocolError> {
pub fn metadata(&self) -> Result<&Metadata, ProtocolError> {
self.metadata.as_ref().ok_or(ProtocolError::NoMetaData)
}

/// The most recent metadata of the table.
pub fn metadata(&self) -> Option<&DeltaTableMetaData> {
pub fn delta_metadata(&self) -> Option<&DeltaTableMetaData> {
self.current_metadata.as_ref()
}

Expand Down Expand Up @@ -328,7 +328,7 @@ impl DeltaTableState {
&'a self,
filters: &'a [PartitionFilter],
) -> Result<impl Iterator<Item = &Add> + '_, DeltaTableError> {
let current_metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = self.delta_metadata().ok_or(DeltaTableError::NoMetadata)?;

let nonpartitioned_columns: Vec<String> = filters
.iter()
Expand Down
8 changes: 4 additions & 4 deletions crates/deltalake-core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl DeltaTableState {
(Cow::Borrowed("data_change"), Arc::new(data_change)),
];

let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let metadata = self.metadata()?;

if !metadata.partition_columns.is_empty() {
let partition_cols_batch = self.partition_columns_as_batch(flatten)?;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl DeltaTableState {
&self,
flatten: bool,
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let metadata = self.delta_metadata().ok_or(DeltaTableError::NoMetadata)?;
let column_mapping_mode = self.table_config().column_mapping_mode();
let partition_column_types: Vec<arrow::datatypes::DataType> = metadata
.partition_columns
Expand Down Expand Up @@ -413,8 +413,8 @@ impl DeltaTableState {
.map(|maybe_stat| maybe_stat.as_ref().map(|stat| stat.num_records))
.collect::<Vec<Option<i64>>>(),
);
let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let schema = &metadata.schema;
let metadata = self.metadata()?;
let schema = &metadata.schema()?;

#[derive(Debug)]
struct ColStats<'a> {
Expand Down

0 comments on commit 3701460

Please sign in to comment.