Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial table features implementation #1796

Merged
merged 15 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ impl CreateBuilder {
.unwrap_or_else(|| Protocol {
min_reader_version: MAX_SUPPORTED_READER_VERSION,
min_writer_version: MAX_SUPPORTED_WRITER_VERSION,
writer_features: None,
reader_features: None,
});

let metadata = DeltaTableMetaData::new(
Expand Down Expand Up @@ -399,6 +401,8 @@ mod tests {
let protocol = Protocol {
min_reader_version: 0,
min_writer_version: 0,
writer_features: None,
reader_features: None,
};
let table = CreateBuilder::new()
.with_location("memory://")
Expand Down
12 changes: 12 additions & 0 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ async fn execute(
Protocol {
min_reader_version: table.get_min_reader_version(),
min_writer_version: table.get_min_writer_version(),
writer_features: if snapshot.min_writer_version() < 7 {
None
} else {
table.get_writer_features().cloned()
},
reader_features: if snapshot.min_reader_version() < 3 {
None
} else {
table.get_reader_features().cloned()
},
}
} else {
Protocol {
Expand All @@ -216,6 +226,8 @@ async fn execute(
table.get_min_writer_version(),
snapshot.min_writer_version(),
),
writer_features: snapshot.writer_features().cloned(),
reader_features: snapshot.reader_features().cloned(),
}
};
actions.push(Action::protocol(protocol));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub fn create_protocol_action(max_reader: Option<i32>, max_writer: Option<i32>)
let protocol = Protocol {
min_reader_version: max_reader.unwrap_or(crate::operations::MAX_SUPPORTED_READER_VERSION),
min_writer_version: max_writer.unwrap_or(crate::operations::MAX_SUPPORTED_WRITER_VERSION),
writer_features: None,
reader_features: None,
};
Action::protocol(protocol)
}
Expand Down Expand Up @@ -134,6 +136,8 @@ pub async fn create_initialized_table(
protocol: Protocol {
min_reader_version: 1,
min_writer_version: 1,
writer_features: None,
reader_features: None,
},
metadata: DeltaTableMetaData::new(
None,
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ fn parquet_bytes_from_state(
let jsons = std::iter::once(Action::protocol(Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
writer_features: None,
reader_features: None,
}))
// metaData
.chain(std::iter::once(Action::metaData(MetaData::try_from(
Expand Down
186 changes: 185 additions & 1 deletion crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;
Expand Down Expand Up @@ -640,6 +640,190 @@ pub struct Protocol {
/// Minimum version of the Delta write protocol a client must implement to correctly read the
/// table.
pub min_writer_version: i32,
/// Table features are missing from older versions
/// The table features this reader supports
#[serde(skip_serializing_if = "Option::is_none")]
pub reader_features: Option<HashSet<ReaderFeatures>>,
/// Table features are missing from older versions
/// The table features this writer supports
#[serde(skip_serializing_if = "Option::is_none")]
pub writer_features: Option<HashSet<WriterFeatures>>,
}

/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
pub enum ReaderFeatures {
/// Mapping of one column to another
#[serde(alias = "columnMapping")]
COLUMN_MAPPING,
/// Deletion vectors for merge, update, delete
#[serde(alias = "deletionVectors")]
DELETION_VECTORS,
/// timestamps without timezone support
#[serde(alias = "timestampNtz")]
TIMESTAMP_WITHOUT_TIMEZONE,
/// version 2 of checkpointing
#[serde(alias = "v2Checkpoint")]
V2_CHECKPOINT,
/// If we do not match any other reader features
#[serde(untagged)]
OTHER(String),
}

#[allow(clippy::from_over_into)]
impl Into<usize> for ReaderFeatures {
fn into(self) -> usize {
match self {
ReaderFeatures::OTHER(_) => 0,
ReaderFeatures::COLUMN_MAPPING => 2,
ReaderFeatures::DELETION_VECTORS
| ReaderFeatures::TIMESTAMP_WITHOUT_TIMEZONE
| ReaderFeatures::V2_CHECKPOINT => 3,
}
}
}

#[cfg(all(not(feature = "parquet2"), feature = "parquet"))]
impl From<&parquet::record::Field> for ReaderFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"columnMapping" => ReaderFeatures::COLUMN_MAPPING,
"deletionVectors" => ReaderFeatures::DELETION_VECTORS,
"timestampNtz" => ReaderFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"v2Checkpoint" => ReaderFeatures::V2_CHECKPOINT,
f => ReaderFeatures::OTHER(f.to_string()),
},
f => ReaderFeatures::OTHER(f.to_string()),
}
}
}

impl From<String> for ReaderFeatures {
fn from(value: String) -> Self {
match value.as_str() {
"columnMapping" => ReaderFeatures::COLUMN_MAPPING,
"deletionVectors" => ReaderFeatures::DELETION_VECTORS,
"timestampNtz" => ReaderFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"v2Checkpoint" => ReaderFeatures::V2_CHECKPOINT,
f => ReaderFeatures::OTHER(f.to_string()),
}
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
pub enum WriterFeatures {
/// Append Only Tables
#[serde(alias = "appendOnly")]
APPEND_ONLY,
/// Table invariants
#[serde(alias = "invariants")]
INVARIANTS,
/// Check constraints on columns
#[serde(alias = "checkConstraints")]
CHECK_CONSTRAINTS,
/// CDF on a table
#[serde(alias = "changeDataFeed")]
CHANGE_DATA_FEED,
/// Columns with generated values
#[serde(alias = "generatedColumns")]
GENERATED_COLUMNS,
/// Mapping of one column to another
#[serde(alias = "columnMapping")]
COLUMN_MAPPING,
/// ID Columns
#[serde(alias = "identityColumns")]
IDENTITY_COLUMNS,
/// Deletion vectors for merge, update, delete
#[serde(alias = "deletionVectors")]
DELETION_VECTORS,
/// Row tracking on tables
#[serde(alias = "rowTracking")]
ROW_TRACKING,
/// timestamps without timezone support
#[serde(alias = "timestampNtz")]
TIMESTAMP_WITHOUT_TIMEZONE,
/// domain specific metadata
#[serde(alias = "domainMetadata")]
DOMAIN_METADATA,
/// version 2 of checkpointing
#[serde(alias = "v2Checkpoint")]
V2_CHECKPOINT,
/// Iceberg compatability support
#[serde(alias = "icebergCompatV1")]
ICEBERG_COMPAT_V1,
/// If we do not match any other reader features
#[serde(untagged)]
OTHER(String),
}

#[allow(clippy::from_over_into)]
impl Into<usize> for WriterFeatures {
fn into(self) -> usize {
match self {
WriterFeatures::OTHER(_) => 0,
WriterFeatures::APPEND_ONLY | WriterFeatures::INVARIANTS => 2,
WriterFeatures::CHECK_CONSTRAINTS => 3,
WriterFeatures::CHANGE_DATA_FEED | WriterFeatures::GENERATED_COLUMNS => 4,
WriterFeatures::COLUMN_MAPPING => 5,
WriterFeatures::IDENTITY_COLUMNS
| WriterFeatures::DELETION_VECTORS
| WriterFeatures::ROW_TRACKING
| WriterFeatures::TIMESTAMP_WITHOUT_TIMEZONE
| WriterFeatures::DOMAIN_METADATA
| WriterFeatures::V2_CHECKPOINT
| WriterFeatures::ICEBERG_COMPAT_V1 => 7,
}
}
}

impl From<String> for WriterFeatures {
fn from(value: String) -> Self {
match value.as_str() {
"appendOnly" => WriterFeatures::APPEND_ONLY,
"invariants" => WriterFeatures::INVARIANTS,
"checkConstraints" => WriterFeatures::CHECK_CONSTRAINTS,
"changeDataFeed" => WriterFeatures::CHANGE_DATA_FEED,
"generatedColumns" => WriterFeatures::GENERATED_COLUMNS,
"columnMapping" => WriterFeatures::COLUMN_MAPPING,
"identityColumns" => WriterFeatures::IDENTITY_COLUMNS,
"deletionVectors" => WriterFeatures::DELETION_VECTORS,
"rowTracking" => WriterFeatures::ROW_TRACKING,
"timestampNtz" => WriterFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"domainMetadata" => WriterFeatures::DOMAIN_METADATA,
"v2Checkpoint" => WriterFeatures::V2_CHECKPOINT,
"icebergCompatV1" => WriterFeatures::ICEBERG_COMPAT_V1,
f => WriterFeatures::OTHER(f.to_string()),
}
}
}

#[cfg(all(not(feature = "parquet2"), feature = "parquet"))]
impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"appendOnly" => WriterFeatures::APPEND_ONLY,
"invariants" => WriterFeatures::INVARIANTS,
"checkConstraints" => WriterFeatures::CHECK_CONSTRAINTS,
"changeDataFeed" => WriterFeatures::CHANGE_DATA_FEED,
"generatedColumns" => WriterFeatures::GENERATED_COLUMNS,
"columnMapping" => WriterFeatures::COLUMN_MAPPING,
"identityColumns" => WriterFeatures::IDENTITY_COLUMNS,
"deletionVectors" => WriterFeatures::DELETION_VECTORS,
"rowTracking" => WriterFeatures::ROW_TRACKING,
"timestampNtz" => WriterFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"domainMetadata" => WriterFeatures::DOMAIN_METADATA,
"v2Checkpoint" => WriterFeatures::V2_CHECKPOINT,
"icebergCompatV1" => WriterFeatures::ICEBERG_COMPAT_V1,
f => WriterFeatures::OTHER(f.to_string()),
},
f => WriterFeatures::OTHER(f.to_string()),
}
}
}

/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
Expand Down
28 changes: 26 additions & 2 deletions crates/deltalake-core/src/protocol/parquet2_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use parquet2::read::decompress;
use parquet2::read::get_page_iterator;
use parquet2::read::levels::get_bit_width;

use super::ProtocolError;
use super::{ProtocolError, ReaderFeatures, WriterFeatures};
use crate::protocol::{Action, Add, CommitInfo, MetaData, Protocol, Remove, Txn};
use crate::schema::Guid;
use boolean::for_each_boolean_field_value;
Expand All @@ -26,7 +26,7 @@ mod stats;
mod string;
mod validity;

/// Parquet deserilization error
/// Parquet deserialization error
#[derive(thiserror::Error, Debug)]
pub enum ParseError {
/// Generic parsing error
Expand Down Expand Up @@ -614,6 +614,30 @@ fn deserialize_protocol_column_page(
|action: &mut Protocol, v: i32| action.min_writer_version = v,
)?;
}
"readerFeatures" => {
for_each_repeated_string_field_value(
actions,
page,
dict,
descriptor,
|action: &mut Protocol, v: Vec<String>| {
action.reader_features =
Some(v.into_iter().map(ReaderFeatures::from).collect());
},
)?;
}
"writerFeatures" => {
for_each_repeated_string_field_value(
actions,
page,
dict,
descriptor,
|action: &mut Protocol, v: Vec<String>| {
action.writer_features =
Some(v.into_iter().map(WriterFeatures::from).collect());
},
)?;
}
_ => {
warn!("Unexpected field `{}` in protocol", f);
}
Expand Down
12 changes: 12 additions & 0 deletions crates/deltalake-core/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,18 @@ impl Protocol {
gen_action_type_error("protocol", "minWriterVersion", "int")
})?;
}
"readerFeatures" => {
re.reader_features = record
.get_list(i)
.map(|l| l.elements().iter().map(From::from).collect())
.ok()
}
"writerFeatures" => {
re.writer_features = record
.get_list(i)
.map(|l| l.elements().iter().map(From::from).collect())
.ok()
}
_ => {
log::debug!(
"Unexpected field name `{}` for protocol action: {:?}",
Expand Down
15 changes: 14 additions & 1 deletion crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use self::builder::DeltaTableConfig;
use self::state::DeltaTableState;
use crate::errors::DeltaTableError;
use crate::partitions::PartitionFilter;
use crate::protocol::{self, find_latest_check_point_for_version, get_last_checkpoint, Action};
use crate::protocol::{
self, find_latest_check_point_for_version, get_last_checkpoint, Action, ReaderFeatures,
WriterFeatures,
};
use crate::protocol::{Add, ProtocolError, Stats};
use crate::schema::*;
use crate::storage::{commit_uri_from_version, ObjectStoreRef};
Expand Down Expand Up @@ -818,6 +821,16 @@ impl DeltaTable {
self.state.min_writer_version()
}

/// Returns current supported reader features by this table
pub fn get_reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.state.reader_features()
}

/// Returns current supported writer features by this table
pub fn get_writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.state.writer_features()
}

/// Return table schema parsed from transaction log. Return None if table hasn't been loaded or
/// no metadata was found in the log.
pub fn schema(&self) -> Option<&Schema> {
Expand Down
Loading
Loading