diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 953e13dc13..aadefc5b1a 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -15,6 +15,7 @@ use serde_json::{Map, Value}; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; +use std::io::{BufRead, BufReader, Cursor}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp::max, cmp::Ordering, collections::HashSet}; use uuid::Uuid; @@ -178,9 +179,19 @@ pub enum DeltaTableError { "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)" )] InvalidVacuumRetentionPeriod, + /// Error returned when a line from log record is invalid. + #[error("Failed to read line from log record")] + Io { + /// Source error details returned while reading the log record. + #[from] + source: std::io::Error, + }, /// Error returned when transaction is failed to be committed because given version already exists. #[error("Delta transaction failed, version {0} already exists.")] VersionAlreadyExists(DeltaDataTypeVersion), + /// Error returned when user attempts to commit actions that don't belong to the next version. + #[error("Delta transaction failed, version {0} does not follow {1}")] + VersionMismatch(DeltaDataTypeVersion, DeltaDataTypeVersion), /// Generic Delta Table error #[error("Generic DeltaTable error: {0}")] Generic(String), @@ -407,7 +418,7 @@ fn extract_rel_path<'a, 'b>( pub enum DeltaVersion { /// load the newest version Newest, - /// specify the version to laod + /// specify the version to load Version(DeltaDataTypeVersion), /// specify the timestamp in UTC Timestamp(DateTime), @@ -535,6 +546,17 @@ impl DeltaTableBuilder { } } +/// The next commit that's available from underlying storage +/// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit +/// +#[derive(Debug)] +pub enum PeekCommit { + /// The next commit version and assoicated actions + New(DeltaDataTypeVersion, Vec), + /// Provided DeltaVersion is up to date + UpToDate, +} + /// In memory representation of a Delta Table pub struct DeltaTable { /// The version of the table as of the most recent loaded Delta log entry. @@ -755,6 +777,46 @@ impl DeltaTable { self.update().await } + /// Get the list of actions for the next commit + pub async fn peek_next_commit( + &self, + current_version: DeltaDataTypeVersion, + ) -> Result { + let next_version = current_version + 1; + let commit_uri = self.commit_uri_from_version(next_version); + let commit_log_bytes = self.storage.get_obj(&commit_uri).await; + let commit_log_bytes = match commit_log_bytes { + Err(StorageError::NotFound) => return Ok(PeekCommit::UpToDate), + _ => commit_log_bytes?, + }; + + let reader = BufReader::new(Cursor::new(commit_log_bytes)); + + let mut actions = Vec::new(); + for line in reader.lines() { + let action: action::Action = serde_json::from_str(line?.as_str())?; + actions.push(action); + } + Ok(PeekCommit::New(next_version, actions)) + } + + ///Apply any actions assoicated with the PeekCommit to the DeltaTable + pub fn apply_actions( + &mut self, + new_version: DeltaDataTypeVersion, + actions: Vec, + ) -> Result<(), DeltaTableError> { + if self.version + 1 != new_version { + return Err(DeltaTableError::VersionMismatch(new_version, self.version)); + } + + let s = DeltaTableState::from_actions(actions)?; + self.state.merge(s, self.config.require_tombstones); + self.version = new_version; + + Ok(()) + } + /// Updates the DeltaTable to the most recent state committed to the transaction log by /// loading the last checkpoint and incrementally applying each version since. pub async fn update(&mut self) -> Result<(), DeltaTableError> { @@ -777,33 +839,19 @@ impl DeltaTable { /// Updates the DeltaTable to the latest version by incrementally applying newer versions. /// It assumes that the table is already updated to the current version `self.version`. pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> { - self.version += 1; + while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version).await? { + self.apply_actions(version, actions)?; + } - loop { - match self.apply_log(self.version).await { - Ok(_) => { - self.version += 1; - } - Err(e) => { - match e { - ApplyLogError::EndOfLog => { - self.version -= 1; - if self.version == -1 { - let err = format!( - "No snapshot or version 0 found, perhaps {} is an empty dir?", - self.table_uri - ); - return Err(DeltaTableError::NotATable(err)); - } - } - _ => { - return Err(DeltaTableError::from(e)); - } - } - return Ok(()); - } - } + if self.version == -1 { + let err = format!( + "No snapshot or version 0 found, perhaps {} is an empty dir?", + self.table_uri + ); + return Err(DeltaTableError::NotATable(err)); } + + Ok(()) } /// Loads the DeltaTable state for the given version. diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 1edc2fba1f..63152f3b0f 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -15,7 +15,7 @@ use super::{ ApplyLogError, CheckPoint, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, DeltaTableMetaData, }; -use crate::action; +use crate::action::{self, Action}; use crate::delta_config; /// State snapshot currently held by the Delta Table instance. @@ -54,6 +54,15 @@ impl DeltaTableState { Ok(new_state) } + /// Construct a delta table state object from a list of actions + pub fn from_actions(actions: Vec) -> Result { + let mut new_state = DeltaTableState::default(); + for action in actions { + new_state.process_action(action, true)?; + } + Ok(new_state) + } + /// Construct a delta table state object from checkpoint. pub async fn from_checkpoint( table: &DeltaTable, diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index 5eabf7ccbd..8ff4dce381 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -3,6 +3,7 @@ extern crate deltalake; use chrono::Utc; use deltalake::storage::file::FileStorageBackend; use deltalake::DeltaTableBuilder; +use deltalake::PeekCommit; use deltalake::StorageBackend; use pretty_assertions::assert_eq; use std::collections::HashMap; @@ -469,3 +470,39 @@ async fn test_table_history() { .expect("Cannot get table history"); assert_eq!(history3.len(), 5); } + +#[tokio::test] +async fn test_poll_table_commits() { + let path = "./tests/data/simple_table_with_checkpoint"; + let mut table = deltalake::open_table_with_version(path, 9).await.unwrap(); + let peek = table.peek_next_commit(table.version).await.unwrap(); + + let is_new = if let PeekCommit::New(version, actions) = peek { + assert_eq!(table.version, 9); + assert!(!table + .get_files_iter() + .any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet")); + + assert_eq!(version, 10); + assert_eq!(actions.len(), 2); + + table.apply_actions(version, actions).unwrap(); + + assert_eq!(table.version, 10); + assert!(table + .get_files_iter() + .any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet")); + + true + } else { + false + }; + assert!(is_new); + + let peek = table.peek_next_commit(table.version).await.unwrap(); + let is_up_to_date = match peek { + PeekCommit::UpToDate => true, + _ => false, + }; + assert!(is_up_to_date); +}