Skip to content

Commit

Permalink
fix: checkpoint features format below v3,7 (#2307)
Browse files Browse the repository at this point in the history
# Description
Misread the protocol before but we the columns readerFeatures or
writerFeatures should be null and not empty list when you are below v3
or v7.

Also will immediately push a patch release since checkpoints with python
v0.16.1 breaks reader compatibility with Spark
  • Loading branch information
ion-elgreco authored Mar 20, 2024
1 parent b001186 commit e729bf4
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 34 deletions.
81 changes: 48 additions & 33 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use tracing::{debug, error};

use super::{time_utils, ProtocolError};
use crate::kernel::arrow::delta_log_schema_for_table;
use crate::kernel::{Action, Add as AddAction, DataType, PrimitiveType, Remove, StructField, Txn};
use crate::kernel::{
Action, Add as AddAction, DataType, PrimitiveType, Protocol, Remove, StructField, Txn,
};
use crate::logstore::LogStore;
use crate::table::state::DeltaTableState;
use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
Expand Down Expand Up @@ -274,39 +276,52 @@ fn parquet_bytes_from_state(
}
let files = state.file_actions().unwrap();
// protocol
let jsons = std::iter::once(Action::Protocol(state.protocol().clone()))
// metaData
.chain(std::iter::once(Action::Metadata(current_metadata.clone())))
// txns
.chain(
state
.app_transaction_version()
.iter()
.map(|(app_id, version)| {
Action::Txn(Txn {
app_id: app_id.clone(),
version: *version,
last_updated: None,
})
}),
)
// removes
.chain(tombstones.iter().map(|r| {
let mut r = (*r).clone();

// As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly.
// https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314
if r.extended_file_metadata.is_none() {
r.extended_file_metadata = Some(false);
}
let jsons = std::iter::once(Action::Protocol(Protocol {
min_reader_version: state.protocol().min_reader_version,
min_writer_version: state.protocol().min_writer_version,
writer_features: if state.protocol().min_writer_version >= 7 {
Some(state.protocol().writer_features.clone().unwrap_or_default())
} else {
None
},
reader_features: if state.protocol().min_reader_version >= 3 {
Some(state.protocol().reader_features.clone().unwrap_or_default())
} else {
None
},
}))
// metaData
.chain(std::iter::once(Action::Metadata(current_metadata.clone())))
// txns
.chain(
state
.app_transaction_version()
.iter()
.map(|(app_id, version)| {
Action::Txn(Txn {
app_id: app_id.clone(),
version: *version,
last_updated: None,
})
}),
)
// removes
.chain(tombstones.iter().map(|r| {
let mut r = (*r).clone();

// As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly.
// https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314
if r.extended_file_metadata.is_none() {
r.extended_file_metadata = Some(false);
}

Action::Remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(files.iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
}));
Action::Remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(files.iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
}));

// Create the arrow schema that represents the Checkpoint parquet file.
let arrow_schema = delta_log_schema_for_table(
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.16.1"
version = "0.16.2"
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
34 changes: 34 additions & 0 deletions python/tests/pyspark_integration/test_writer_readable.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,37 @@ def test_issue_1591_roundtrip_special_characters(tmp_path: pathlib.Path):

loaded = DeltaTable(spark_path).to_pandas()
assert loaded.shape == data.shape


@pytest.mark.pyspark
@pytest.mark.integration
def test_read_checkpointed_table(tmp_path: pathlib.Path):
data = pa.table(
{
"int": pa.array([1]),
}
)
write_deltalake(tmp_path, data)

dt = DeltaTable(tmp_path)
dt.create_checkpoint()

assert_spark_read_equal(data, str(tmp_path), ["int"])


@pytest.mark.pyspark
@pytest.mark.integration
def test_read_checkpointed_features_table(tmp_path: pathlib.Path):
from datetime import datetime

data = pa.table(
{
"timestamp": pa.array([datetime(2010, 1, 1)]),
}
)
write_deltalake(tmp_path, data)

dt = DeltaTable(tmp_path)
dt.create_checkpoint()

assert_spark_read_equal(data, str(tmp_path), ["timestamp"])
29 changes: 29 additions & 0 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pathlib

import pyarrow as pa
import pyarrow.parquet as pq

from deltalake import DeltaTable, write_deltalake

Expand Down Expand Up @@ -105,3 +106,31 @@ def test_features_maintained_after_checkpoint(tmp_path: pathlib.Path):

assert protocol_after_checkpoint.reader_features == ["timestampNtz"]
assert current_protocol == protocol_after_checkpoint


def test_features_null_on_below_v3_v7(tmp_path: pathlib.Path):
data = pa.table(
{
"int": pa.array([1]),
}
)
write_deltalake(tmp_path, data)

dt = DeltaTable(tmp_path)
current_protocol = dt.protocol()

dt.create_checkpoint()

dt = DeltaTable(tmp_path)
protocol_after_checkpoint = dt.protocol()

assert protocol_after_checkpoint.reader_features is None
assert protocol_after_checkpoint.writer_features is None
assert current_protocol == protocol_after_checkpoint

checkpoint = pq.read_table(
os.path.join(tmp_path, "_delta_log/00000000000000000000.checkpoint.parquet")
)

assert checkpoint["protocol"][0]["writerFeatures"].as_py() is None
assert checkpoint["protocol"][0]["readerFeatures"].as_py() is None

0 comments on commit e729bf4

Please sign in to comment.