Skip to content

Commit

Permalink
refactor: remove LoadCheckpointError and ApplyLogError (#1432)
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap authored Jun 6, 2023
1 parent 27043ea commit 2db7f9b
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 238 deletions.
3 changes: 3 additions & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr {
match err {
ProtocolError::Arrow { source } => arrow_to_py(source),
ProtocolError::ObjectStore { source } => object_store_to_py(source),
ProtocolError::EndOfLog => DeltaProtocolError::new_err("End of log"),
ProtocolError::NoMetaData => DeltaProtocolError::new_err("Table metadata missing"),
ProtocolError::CheckpointNotFound => DeltaProtocolError::new_err(err.to_string()),
ProtocolError::InvalidField(err) => PyValueError::new_err(err),
ProtocolError::InvalidRow(err) => PyValueError::new_err(err),
ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()),
ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()),
ProtocolError::IO { source } => PyIOError::new_err(source.to_string()),
ProtocolError::Generic(msg) => DeltaError::new_err(msg),
}
}
Expand Down
115 changes: 104 additions & 11 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ mod parquet_read;

#[cfg(all(feature = "arrow"))]
use arrow_schema::ArrowError;
use object_store::Error as ObjectStoreError;
use futures::StreamExt;
use lazy_static::lazy_static;
use log::*;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use percent_encoding::percent_decode;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::borrow::Borrow;
Expand All @@ -21,7 +25,8 @@ use std::hash::{Hash, Hasher};

use crate::delta_config::IsolationLevel;
use crate::errors::DeltaResult;
use crate::{schema::*, DeltaTableMetaData};
use crate::storage::ObjectStoreRef;
use crate::{delta::CheckPoint, schema::*, DeltaTableMetaData};

/// Error returned when an invalid Delta log action is encountered.
#[allow(missing_docs)]
Expand All @@ -30,6 +35,12 @@ pub enum ProtocolError {
#[error("Table state does not contain metadata")]
NoMetaData,

#[error("Checkpoint file not found")]
CheckpointNotFound,

#[error("End of transaction log")]
EndOfLog,

/// The action contains an invalid field.
#[error("Invalid action field: {0}")]
InvalidField(String),
Expand All @@ -42,20 +53,15 @@ pub enum ProtocolError {
#[error("Generic action error: {0}")]
Generic(String),

#[cfg(feature = "parquet2")]
#[error("Failed to parse parquet checkpoint: {}", .source)]
/// Error returned when parsing checkpoint parquet using the parquet2 crate.
/// Error returned when parsing checkpoint parquet using the parquet crate.
#[error("Failed to parse parquet checkpoint: {source}")]
ParquetParseError {
/// Parquet error details returned when parsing the checkpoint parquet
#[cfg(feature = "parquet2")]
#[from]
source: parquet2::error::Error,
},

#[cfg(feature = "parquet")]
#[error("Failed to parse parquet checkpoint: {}", .source)]
/// Error returned when parsing checkpoint parquet using the parquet crate.
ParquetParseError {
/// Parquet error details returned when parsing the checkpoint parquet
#[cfg(feature = "parquet")]
#[from]
source: parquet::errors::ParquetError,
},
Expand Down Expand Up @@ -84,6 +90,12 @@ pub enum ProtocolError {
#[from]
source: ObjectStoreError,
},

#[error("Io: {source}")]
IO {
#[from]
source: std::io::Error,
},
}

fn decode_path(raw_path: &str) -> Result<String, ProtocolError> {
Expand Down Expand Up @@ -722,6 +734,87 @@ pub enum OutputMode {
Update,
}

pub(crate) async fn get_last_checkpoint(
object_store: &ObjectStoreRef,
) -> Result<CheckPoint, ProtocolError> {
let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
debug!("loading checkpoint from {last_checkpoint_path}");
match object_store.get(&last_checkpoint_path).await {
Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?),
Err(ObjectStoreError::NotFound { .. }) => {
match find_latest_check_point_for_version(object_store, i64::MAX).await {
Ok(Some(cp)) => Ok(cp),
_ => Err(ProtocolError::CheckpointNotFound),
}
}
Err(err) => Err(ProtocolError::ObjectStore { source: err }),
}
}

pub(crate) async fn find_latest_check_point_for_version(
object_store: &ObjectStoreRef,
version: i64,
) -> Result<Option<CheckPoint>, ProtocolError> {
lazy_static! {
static ref CHECKPOINT_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap();
static ref CHECKPOINT_PARTS_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#).unwrap();
}

let mut cp: Option<CheckPoint> = None;
let mut stream = object_store.list(Some(object_store.log_path())).await?;

while let Some(obj_meta) = stream.next().await {
// Exit early if any objects can't be listed.
// We exclude the special case of a not found error on some of the list entities.
// This error mainly occurs for local stores when a temporary file has been deleted by
// concurrent writers or if the table is vacuumed by another client.
let obj_meta = match obj_meta {
Ok(meta) => Ok(meta),
Err(ObjectStoreError::NotFound { .. }) => continue,
Err(err) => Err(err),
}?;
if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: i64 = curr_ver_str.parse().unwrap();
if curr_ver > version {
// skip checkpoints newer than max version
continue;
}
if cp.is_none() || curr_ver > cp.unwrap().version {
cp = Some(CheckPoint {
version: curr_ver,
size: 0,
parts: None,
});
}
continue;
}

if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: i64 = curr_ver_str.parse().unwrap();
if curr_ver > version {
// skip checkpoints newer than max version
continue;
}
if cp.is_none() || curr_ver > cp.unwrap().version {
let parts_str = captures.get(2).unwrap().as_str();
let parts = parts_str.parse().unwrap();
cp = Some(CheckPoint {
version: curr_ver,
size: 0,
parts: Some(parts),
});
}
continue;
}
}

Ok(cp)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 2db7f9b

Please sign in to comment.