Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): add feature operation #2712

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.2.1"
version = "0.3.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.19.1", path = "../core" }
deltalake-core = { version = "0.20.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.2.0"
version = "0.3.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.19.1", path = "../core" }
deltalake-core = { version = "0.20.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.3.0"
version = "0.4.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.19.1", path = "../core" }
deltalake-core = { version = "0.20.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.19.1"
version = "0.20.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
131 changes: 124 additions & 7 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use url::Url;

use super::schema::StructType;
use crate::kernel::{error::Error, DeltaResult};
use crate::DeltaConfigKey;
use crate::TableProperty;

/// Defines a file format used in table
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -240,10 +240,10 @@ impl Protocol {
new_properties: &HashMap<String, String>,
raise_if_not_exists: bool,
) -> DeltaResult<Protocol> {
let mut parsed_properties: HashMap<DeltaConfigKey, String> = HashMap::new();
let mut parsed_properties: HashMap<TableProperty, String> = HashMap::new();

for (key, value) in new_properties {
if let Ok(parsed_key) = key.parse::<DeltaConfigKey>() {
if let Ok(parsed_key) = key.parse::<TableProperty>() {
parsed_properties.insert(parsed_key, value.to_string());
} else if raise_if_not_exists {
return Err(Error::Generic(format!(
Expand All @@ -254,7 +254,7 @@ impl Protocol {
}

// Check and update delta.minReaderVersion
if let Some(min_reader_version) = parsed_properties.get(&DeltaConfigKey::MinReaderVersion) {
if let Some(min_reader_version) = parsed_properties.get(&TableProperty::MinReaderVersion) {
let new_min_reader_version = min_reader_version.parse::<i32>();
match new_min_reader_version {
Ok(version) => match version {
Expand All @@ -280,7 +280,7 @@ impl Protocol {
}

// Check and update delta.minWriterVersion
if let Some(min_writer_version) = parsed_properties.get(&DeltaConfigKey::MinWriterVersion) {
if let Some(min_writer_version) = parsed_properties.get(&TableProperty::MinWriterVersion) {
let new_min_writer_version = min_writer_version.parse::<i32>();
match new_min_writer_version {
Ok(version) => match version {
Expand All @@ -306,7 +306,7 @@ impl Protocol {
}

// Check enableChangeDataFeed and bump protocol or add writerFeature if writer versions is >=7
if let Some(enable_cdf) = parsed_properties.get(&DeltaConfigKey::EnableChangeDataFeed) {
if let Some(enable_cdf) = parsed_properties.get(&TableProperty::EnableChangeDataFeed) {
let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::<bool>();
match if_enable_cdf {
Ok(true) => {
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Protocol {
}
}

if let Some(enable_dv) = parsed_properties.get(&DeltaConfigKey::EnableDeletionVectors) {
if let Some(enable_dv) = parsed_properties.get(&TableProperty::EnableDeletionVectors) {
let if_enable_dv = enable_dv.to_ascii_lowercase().parse::<bool>();
match if_enable_dv {
Ok(true) => {
Expand Down Expand Up @@ -377,6 +377,97 @@ impl Protocol {
}
}

/// High level table features
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum TableFeatures {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should the relation between this enum and the DeltaConfigKey in src/table/config.rs be? 🤔 😕

/// Mapping of one column to another
ColumnMapping,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// version 2 of checkpointing
V2Checkpoint,
/// Append Only Tables
AppendOnly,
/// Table invariants
Invariants,
/// Check constraints on columns
CheckConstraints,
/// CDF on a table
ChangeDataFeed,
/// Columns with generated values
GeneratedColumns,
/// ID Columns
IdentityColumns,
/// Row tracking on tables
RowTracking,
/// domain specific metadata
DomainMetadata,
/// Iceberg compatibility support
IcebergCompatV1,
}

impl FromStr for TableFeatures {
type Err = ();

fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"columnMapping" => Ok(TableFeatures::ColumnMapping),
"deletionVectors" => Ok(TableFeatures::DeletionVectors),
"timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone),
"v2Checkpoint" => Ok(TableFeatures::V2Checkpoint),
"appendOnly" => Ok(TableFeatures::AppendOnly),
"invariants" => Ok(TableFeatures::Invariants),
"checkConstraints" => Ok(TableFeatures::CheckConstraints),
"changeDataFeed" => Ok(TableFeatures::ChangeDataFeed),
"generatedColumns" => Ok(TableFeatures::GeneratedColumns),
"identityColumns" => Ok(TableFeatures::IdentityColumns),
"rowTracking" => Ok(TableFeatures::RowTracking),
"domainMetadata" => Ok(TableFeatures::DomainMetadata),
"icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1),
_ => Err(()),
}
}
}

impl AsRef<str> for TableFeatures {
fn as_ref(&self) -> &str {
match self {
TableFeatures::ColumnMapping => "columnMapping",
TableFeatures::DeletionVectors => "deletionVectors",
TableFeatures::TimestampWithoutTimezone => "timestampNtz",
TableFeatures::V2Checkpoint => "v2Checkpoint",
TableFeatures::AppendOnly => "appendOnly",
TableFeatures::Invariants => "invariants",
TableFeatures::CheckConstraints => "checkConstraints",
TableFeatures::ChangeDataFeed => "changeDataFeed",
TableFeatures::GeneratedColumns => "generatedColumns",
TableFeatures::IdentityColumns => "identityColumns",
TableFeatures::RowTracking => "rowTracking",
TableFeatures::DomainMetadata => "domainMetadata",
TableFeatures::IcebergCompatV1 => "icebergCompatV1",
}
}
}

impl fmt::Display for TableFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -449,6 +540,19 @@ impl fmt::Display for ReaderFeatures {
}
}

impl TryFrom<&TableFeatures> for ReaderFeatures {
type Error = String;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match ReaderFeatures::from(value.as_ref()) {
ReaderFeatures::Other(_) => {
Err(format!("Table feature {} is not a reader feature", value))
}
value => Ok(value),
}
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -540,6 +644,19 @@ impl fmt::Display for WriterFeatures {
}
}

impl TryFrom<&TableFeatures> for WriterFeatures {
type Error = String;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match WriterFeatures::from(value.as_ref()) {
WriterFeatures::Other(_) => {
Err(format!("Table feature {} is not a writer feature", value))
}
value => Ok(value),
}
}
}

impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub use self::errors::*;
pub use self::schema::partitions::*;
pub use self::schema::*;
pub use self::table::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion};
pub use self::table::config::DeltaConfigKey;
pub use self::table::config::TableProperty;
pub use self::table::DeltaTable;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
pub use operations::DeltaOps;
Expand Down
Loading
Loading