Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move checkpoint and errors into separate module #1430

Merged
merged 6 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow_schema::ArrowError;
use deltalake::checkpoints::CheckpointError;
use deltalake::{DeltaTableError, ObjectStoreError};
use deltalake::action::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError,
};
Expand Down Expand Up @@ -59,16 +59,16 @@ fn arrow_to_py(err: ArrowError) -> PyErr {
}
}

fn checkpoint_to_py(err: CheckpointError) -> PyErr {
fn checkpoint_to_py(err: ProtocolError) -> PyErr {
match err {
CheckpointError::Io { source } => PyIOError::new_err(source.to_string()),
CheckpointError::Arrow { source } => arrow_to_py(source),
CheckpointError::DeltaTable { source } => inner_to_py_err(source),
CheckpointError::ObjectStore { source } => object_store_to_py(source),
CheckpointError::MissingMetaData => DeltaProtocolError::new_err("Table metadata missing"),
CheckpointError::PartitionValueNotParseable(err) => PyValueError::new_err(err),
CheckpointError::JSONSerialization { source } => PyValueError::new_err(source.to_string()),
CheckpointError::Parquet { source } => PyIOError::new_err(source.to_string()),
ProtocolError::Arrow { source } => arrow_to_py(source),
ProtocolError::ObjectStore { source } => object_store_to_py(source),
ProtocolError::NoMetaData => DeltaProtocolError::new_err("Table metadata missing"),
ProtocolError::InvalidField(err) => PyValueError::new_err(err),
ProtocolError::InvalidRow(err) => PyValueError::new_err(err),
ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()),
ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()),
ProtocolError::Generic(msg) => DeltaError::new_err(msg),
}
}

Expand All @@ -81,7 +81,7 @@ pub enum PythonError {
#[error("Error in arrow")]
Arrow(#[from] ArrowError),
#[error("Error in checkpoint")]
Checkpoint(#[from] CheckpointError),
Protocol(#[from] ProtocolError),
}

impl From<PythonError> for pyo3::PyErr {
Expand All @@ -90,7 +90,7 @@ impl From<PythonError> for pyo3::PyErr {
PythonError::DeltaTable(err) => inner_to_py_err(err),
PythonError::ObjectStore(err) => object_store_to_py(err),
PythonError::Arrow(err) => arrow_to_py(err),
PythonError::Checkpoint(err) => checkpoint_to_py(err),
PythonError::Protocol(err) => checkpoint_to_py(err),
}
}
}
5 changes: 3 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use deltalake::builder::DeltaTableBuilder;
use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
Expand Down Expand Up @@ -174,7 +175,7 @@ impl RawDeltaTable {
&self,
partitions_filters: Vec<(&str, &str, PartitionFilterValue)>,
) -> PyResult<Vec<String>> {
let partition_filters: Result<Vec<PartitionFilter<&str>>, deltalake::DeltaTableError> =
let partition_filters: Result<Vec<PartitionFilter<&str>>, DeltaTableError> =
partitions_filters
.into_iter()
.map(|filter| match filter {
Expand Down Expand Up @@ -520,7 +521,7 @@ impl RawDeltaTable {

fn convert_partition_filters<'a>(
partitions_filters: Vec<(&'a str, &'a str, PartitionFilterValue<'a>)>,
) -> Result<Vec<PartitionFilter<&'a str>>, deltalake::DeltaTableError> {
) -> Result<Vec<PartitionFilter<&'a str>>, DeltaTableError> {
partitions_filters
.into_iter()
.map(|filter| match filter {
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn get_table_batches() -> RecordBatch {
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), deltalake::DeltaTableError> {
async fn main() -> Result<(), deltalake::errors::DeltaTableError> {
// Create a delta operations client pointing at an un-initialized in-memory location.
// In a production environment this would be created with "try_new" and point at
// a real storage location.
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/read_delta_table.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), deltalake::DeltaTableError> {
async fn main() -> Result<(), deltalake::errors::DeltaTableError> {
let table_path = "./tests/data/delta-0.8.0";
let table = deltalake::open_table(table_path).await?;
println!("{table}");
Expand Down
1 change: 1 addition & 0 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use chrono::prelude::*;
use deltalake::arrow::array::*;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::errors::DeltaTableError;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::*;
use log::*;
Expand Down
123 changes: 47 additions & 76 deletions rust/src/checkpoints.rs → rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
use log::*;
use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
use object_store::{path::Path, ObjectMeta, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
use regex::Regex;
Expand All @@ -17,89 +17,62 @@ use std::convert::TryFrom;
use std::iter::Iterator;
use std::ops::Add;

use super::action;
use super::delta_arrow::delta_log_schema_for_table;
use super::open_table_with_version;
use super::schema::*;
use super::storage::DeltaObjectStore;
use super::table_state::DeltaTableState;
use super::time_utils;
use super::DeltaTable;
use super::{CheckPoint, DeltaTableError};
use super::{Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn};
use crate::delta_arrow::delta_log_schema_for_table;
use crate::schema::*;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::{open_table_with_version, time_utils, CheckPoint, DeltaTable};

type SchemaPath = Vec<String>;

/// Error returned when there is an error during creating a checkpoint.
#[derive(thiserror::Error, Debug)]
pub enum CheckpointError {
/// Error returned when the DeltaTableState does not contain a metadata action.
#[error("DeltaTableMetadata not present in DeltaTableState")]
MissingMetaData,
enum CheckpointError {
/// Error returned when a string formatted partition value cannot be parsed to its appropriate
/// data type.
#[error("Partition value {0} cannot be parsed from string.")]
PartitionValueNotParseable(String),
/// Passthrough error returned when calling DeltaTable.
#[error("DeltaTableError: {source}")]
DeltaTable {
/// The source DeltaTableError.
#[from]
source: DeltaTableError,
},

/// Error returned when the parquet writer fails while writing the checkpoint.
#[error("Failed to write parquet: {}", .source)]
Parquet {
/// Parquet error details returned when writing the checkpoint failed.
#[from]
source: ParquetError,
},

/// Error returned when converting the schema to Arrow format failed.
#[error("Failed to convert into Arrow schema: {}", .source)]
Arrow {
/// Arrow error details returned when converting the schema in Arrow format failed
#[from]
source: ArrowError,
},
/// Passthrough error returned when calling ObjectStore.
#[error("ObjectStoreError: {source}")]
ObjectStore {
/// The source ObjectStoreError.
#[from]
source: ObjectStoreError,
},
/// Passthrough error returned by serde_json.
#[error("serde_json::Error: {source}")]
JSONSerialization {
/// The source serde_json::Error.
#[from]
source: serde_json::Error,
},
/// Passthrough error returned when doing std::io operations
#[error("std::io::Error: {source}")]
Io {
/// The source std::io::Error
#[from]
source: std::io::Error,
},
}

/// The record batch size for checkpoint parquet file
pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;

impl From<CheckpointError> for ArrowError {
fn from(error: CheckpointError) -> Self {
ArrowError::from_external_error(Box::new(error))
impl From<CheckpointError> for ProtocolError {
fn from(value: CheckpointError) -> Self {
match value {
CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()),
CheckpointError::Arrow { source } => Self::Arrow { source },
CheckpointError::Parquet { source } => Self::ParquetParseError { source },
}
}
}

/// The record batch size for checkpoint parquet file
pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;

/// Creates checkpoint at current table version
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> {
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> {
create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?;

Ok(())
}

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, DeltaTableError> {
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, ProtocolError> {
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
Expand All @@ -117,8 +90,10 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
table_uri: &str,
version: i64,
cleanup: Option<bool>,
) -> Result<(), CheckpointError> {
let table = open_table_with_version(table_uri, version).await?;
) -> Result<(), ProtocolError> {
let table = open_table_with_version(table_uri, version)
.await
.map_err(|err| ProtocolError::Generic(err.to_string()))?;
create_checkpoint_for(version, table.get_state(), table.storage.as_ref()).await?;

let enable_expired_log_cleanup =
Expand All @@ -136,7 +111,7 @@ async fn create_checkpoint_for(
version: i64,
state: &DeltaTableState,
storage: &DeltaObjectStore,
) -> Result<(), CheckpointError> {
) -> Result<(), ProtocolError> {
// TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for
// an appropriate split point yet though so only writing a single part currently.
// See https://github.com/delta-io/delta-rs/issues/288
Expand Down Expand Up @@ -169,7 +144,7 @@ async fn flush_delete_files<T: Fn(&(i64, ObjectMeta)) -> bool>(
maybe_delete_files: &mut Vec<(i64, ObjectMeta)>,
files_to_delete: &mut Vec<(i64, ObjectMeta)>,
should_delete_file: T,
) -> Result<i32, DeltaTableError> {
) -> Result<i32, ProtocolError> {
if !maybe_delete_files.is_empty() && should_delete_file(maybe_delete_files.last().unwrap()) {
files_to_delete.append(maybe_delete_files);
}
Expand All @@ -179,7 +154,7 @@ async fn flush_delete_files<T: Fn(&(i64, ObjectMeta)) -> bool>(
.map(|file| async move {
match storage.delete(&file.1.location).await {
Ok(_) => Ok(1),
Err(e) => Err(DeltaTableError::from(e)),
Err(e) => Err(ProtocolError::from(e)),
}
})
.collect::<Vec<_>>();
Expand All @@ -202,7 +177,7 @@ pub async fn cleanup_expired_logs_for(
until_version: i64,
storage: &DeltaObjectStore,
log_retention_timestamp: i64,
) -> Result<i32, DeltaTableError> {
) -> Result<i32, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap();
Expand Down Expand Up @@ -306,10 +281,8 @@ pub async fn cleanup_expired_logs_for(
}
}

fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, CheckpointError> {
let current_metadata = state
.current_metadata()
.ok_or(CheckpointError::MissingMetaData)?;
fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, ProtocolError> {
let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?;

let partition_col_data_types = current_metadata.get_partition_col_data_types();

Expand Down Expand Up @@ -339,21 +312,21 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
}

// protocol
let jsons = std::iter::once(action::Action::protocol(action::Protocol {
let jsons = std::iter::once(Action::protocol(Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
}))
// metaData
.chain(std::iter::once(action::Action::metaData(
action::MetaData::try_from(current_metadata.clone())?,
)))
.chain(std::iter::once(Action::metaData(MetaData::try_from(
current_metadata.clone(),
)?)))
// txns
.chain(
state
.app_transaction_version()
.iter()
.map(|(app_id, version)| {
action::Action::txn(action::Txn {
Action::txn(Txn {
app_id: app_id.clone(),
version: *version,
last_updated: None,
Expand All @@ -370,9 +343,9 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
r.extended_file_metadata = Some(false);
}

action::Action::remove(r)
Action::remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(|err| ArrowError::JsonError(err.to_string())))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(state.files().iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
Expand All @@ -392,7 +365,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons: Vec<serde_json::Value> = jsons.map(|r| r.unwrap()).collect();
let jsons = jsons.collect::<Result<Vec<serde_json::Value>, _>>()?;
decoder.serialize(&jsons)?;

while let Some(batch) = decoder.flush()? {
Expand All @@ -406,11 +379,11 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
}

fn checkpoint_add_from_state(
add: &action::Add,
add: &AddAction,
partition_col_data_types: &[(&str, &SchemaDataType)],
stats_conversions: &[(SchemaPath, SchemaDataType)],
) -> Result<Value, ArrowError> {
let mut v = serde_json::to_value(action::Action::add(add.clone()))
) -> Result<Value, ProtocolError> {
let mut v = serde_json::to_value(Action::add(add.clone()))
.map_err(|err| ArrowError::JsonError(err.to_string()))?;

v["add"]["dataChange"] = Value::Bool(false);
Expand Down Expand Up @@ -457,7 +430,7 @@ fn checkpoint_add_from_state(
fn typed_partition_value_from_string(
string_value: &str,
data_type: &SchemaDataType,
) -> Result<Value, CheckpointError> {
) -> Result<Value, ProtocolError> {
match data_type {
SchemaDataType::primitive(primitive_type) => match primitive_type.as_str() {
"string" | "binary" => Ok(string_value.to_owned().into()),
Expand Down Expand Up @@ -504,7 +477,7 @@ fn typed_partition_value_from_string(
fn typed_partition_value_from_option_string(
string_value: &Option<String>,
data_type: &SchemaDataType,
) -> Result<Value, CheckpointError> {
) -> Result<Value, ProtocolError> {
match string_value {
Some(s) => {
if s.is_empty() {
Expand All @@ -517,8 +490,6 @@ fn typed_partition_value_from_option_string(
}
}

type SchemaPath = Vec<String>;

fn collect_stats_conversions(
paths: &mut Vec<(SchemaPath, SchemaDataType)>,
fields: &[SchemaField],
Expand Down
Loading