Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove LoadCheckpointError and ApplyLogError #1432

Merged
merged 4 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr {
match err {
ProtocolError::Arrow { source } => arrow_to_py(source),
ProtocolError::ObjectStore { source } => object_store_to_py(source),
ProtocolError::EndOfLog => DeltaProtocolError::new_err("End of log"),
ProtocolError::NoMetaData => DeltaProtocolError::new_err("Table metadata missing"),
ProtocolError::CheckpointNotFound => DeltaProtocolError::new_err(err.to_string()),
ProtocolError::InvalidField(err) => PyValueError::new_err(err),
ProtocolError::InvalidRow(err) => PyValueError::new_err(err),
ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()),
ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()),
ProtocolError::IO { source } => PyIOError::new_err(source.to_string()),
ProtocolError::Generic(msg) => DeltaError::new_err(msg),
}
}
Expand Down
115 changes: 104 additions & 11 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ mod parquet_read;

#[cfg(all(feature = "arrow"))]
use arrow_schema::ArrowError;
use object_store::Error as ObjectStoreError;
use futures::StreamExt;
use lazy_static::lazy_static;
use log::*;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use percent_encoding::percent_decode;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::borrow::Borrow;
Expand All @@ -21,7 +25,8 @@ use std::hash::{Hash, Hasher};

use crate::delta_config::IsolationLevel;
use crate::errors::DeltaResult;
use crate::{schema::*, DeltaTableMetaData};
use crate::storage::ObjectStoreRef;
use crate::{delta::CheckPoint, schema::*, DeltaTableMetaData};

/// Error returned when an invalid Delta log action is encountered.
#[allow(missing_docs)]
Expand All @@ -30,6 +35,12 @@ pub enum ProtocolError {
#[error("Table state does not contain metadata")]
NoMetaData,

#[error("Checkpoint file not found")]
CheckpointNotFound,

#[error("End of transaction log")]
EndOfLog,

/// The action contains an invalid field.
#[error("Invalid action field: {0}")]
InvalidField(String),
Expand All @@ -42,20 +53,15 @@ pub enum ProtocolError {
#[error("Generic action error: {0}")]
Generic(String),

#[cfg(feature = "parquet2")]
#[error("Failed to parse parquet checkpoint: {}", .source)]
/// Error returned when parsing checkpoint parquet using the parquet2 crate.
/// Error returned when parsing checkpoint parquet using the parquet crate.
#[error("Failed to parse parquet checkpoint: {source}")]
ParquetParseError {
/// Parquet error details returned when parsing the checkpoint parquet
#[cfg(feature = "parquet2")]
#[from]
source: parquet2::error::Error,
},

#[cfg(feature = "parquet")]
#[error("Failed to parse parquet checkpoint: {}", .source)]
/// Error returned when parsing checkpoint parquet using the parquet crate.
ParquetParseError {
/// Parquet error details returned when parsing the checkpoint parquet
#[cfg(feature = "parquet")]
#[from]
source: parquet::errors::ParquetError,
},
Expand Down Expand Up @@ -84,6 +90,12 @@ pub enum ProtocolError {
#[from]
source: ObjectStoreError,
},

#[error("Io: {source}")]
IO {
#[from]
source: std::io::Error,
},
}

fn decode_path(raw_path: &str) -> Result<String, ProtocolError> {
Expand Down Expand Up @@ -722,6 +734,87 @@ pub enum OutputMode {
Update,
}

pub(crate) async fn get_last_checkpoint(
object_store: &ObjectStoreRef,
) -> Result<CheckPoint, ProtocolError> {
let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
debug!("loading checkpoint from {last_checkpoint_path}");
match object_store.get(&last_checkpoint_path).await {
Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?),
Err(ObjectStoreError::NotFound { .. }) => {
match find_latest_check_point_for_version(object_store, i64::MAX).await {
Ok(Some(cp)) => Ok(cp),
_ => Err(ProtocolError::CheckpointNotFound),
}
}
Err(err) => Err(ProtocolError::ObjectStore { source: err }),
}
}

pub(crate) async fn find_latest_check_point_for_version(
object_store: &ObjectStoreRef,
version: i64,
) -> Result<Option<CheckPoint>, ProtocolError> {
lazy_static! {
static ref CHECKPOINT_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap();
static ref CHECKPOINT_PARTS_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#).unwrap();
}

let mut cp: Option<CheckPoint> = None;
let mut stream = object_store.list(Some(object_store.log_path())).await?;

while let Some(obj_meta) = stream.next().await {
// Exit early if any objects can't be listed.
// We exclude the special case of a not found error on some of the list entities.
// This error mainly occurs for local stores when a temporary file has been deleted by
// concurrent writers or if the table is vacuumed by another client.
let obj_meta = match obj_meta {
Ok(meta) => Ok(meta),
Err(ObjectStoreError::NotFound { .. }) => continue,
Err(err) => Err(err),
}?;
if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: i64 = curr_ver_str.parse().unwrap();
if curr_ver > version {
// skip checkpoints newer than max version
continue;
}
if cp.is_none() || curr_ver > cp.unwrap().version {
cp = Some(CheckPoint {
version: curr_ver,
size: 0,
parts: None,
});
}
continue;
}

if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: i64 = curr_ver_str.parse().unwrap();
if curr_ver > version {
// skip checkpoints newer than max version
continue;
}
if cp.is_none() || curr_ver > cp.unwrap().version {
let parts_str = captures.get(2).unwrap().as_str();
let parts = parts_str.parse().unwrap();
cp = Some(CheckPoint {
version: curr_ver,
size: 0,
parts: Some(parts),
});
}
continue;
}
}

Ok(cp)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading