Skip to content

Commit

Permalink
Implement polling for table updates (#550)
Browse files Browse the repository at this point in the history
fix #413
  • Loading branch information
Blajda authored Jan 27, 2022
1 parent 1a4b7a0 commit 80a7148
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 27 deletions.
100 changes: 74 additions & 26 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use serde_json::{Map, Value};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
use std::io::{BufRead, BufReader, Cursor};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{cmp::max, cmp::Ordering, collections::HashSet};
use uuid::Uuid;
Expand Down Expand Up @@ -178,9 +179,19 @@ pub enum DeltaTableError {
"Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
)]
InvalidVacuumRetentionPeriod,
/// Error returned when a line from log record is invalid.
#[error("Failed to read line from log record")]
Io {
/// Source error details returned while reading the log record.
#[from]
source: std::io::Error,
},
/// Error returned when transaction is failed to be committed because given version already exists.
#[error("Delta transaction failed, version {0} already exists.")]
VersionAlreadyExists(DeltaDataTypeVersion),
/// Error returned when user attempts to commit actions that don't belong to the next version.
#[error("Delta transaction failed, version {0} does not follow {1}")]
VersionMismatch(DeltaDataTypeVersion, DeltaDataTypeVersion),
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
Expand Down Expand Up @@ -407,7 +418,7 @@ fn extract_rel_path<'a, 'b>(
pub enum DeltaVersion {
/// load the newest version
Newest,
/// specify the version to laod
/// specify the version to load
Version(DeltaDataTypeVersion),
/// specify the timestamp in UTC
Timestamp(DateTime<Utc>),
Expand Down Expand Up @@ -535,6 +546,17 @@ impl DeltaTableBuilder {
}
}

/// The next commit that's available from underlying storage
/// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit
///
#[derive(Debug)]
pub enum PeekCommit {
/// The next commit version and assoicated actions
New(DeltaDataTypeVersion, Vec<Action>),
/// Provided DeltaVersion is up to date
UpToDate,
}

/// In memory representation of a Delta Table
pub struct DeltaTable {
/// The version of the table as of the most recent loaded Delta log entry.
Expand Down Expand Up @@ -755,6 +777,46 @@ impl DeltaTable {
self.update().await
}

/// Get the list of actions for the next commit
pub async fn peek_next_commit(
&self,
current_version: DeltaDataTypeVersion,
) -> Result<PeekCommit, DeltaTableError> {
let next_version = current_version + 1;
let commit_uri = self.commit_uri_from_version(next_version);
let commit_log_bytes = self.storage.get_obj(&commit_uri).await;
let commit_log_bytes = match commit_log_bytes {
Err(StorageError::NotFound) => return Ok(PeekCommit::UpToDate),
_ => commit_log_bytes?,
};

let reader = BufReader::new(Cursor::new(commit_log_bytes));

let mut actions = Vec::new();
for line in reader.lines() {
let action: action::Action = serde_json::from_str(line?.as_str())?;
actions.push(action);
}
Ok(PeekCommit::New(next_version, actions))
}

///Apply any actions assoicated with the PeekCommit to the DeltaTable
pub fn apply_actions(
&mut self,
new_version: DeltaDataTypeVersion,
actions: Vec<Action>,
) -> Result<(), DeltaTableError> {
if self.version + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(new_version, self.version));
}

let s = DeltaTableState::from_actions(actions)?;
self.state.merge(s, self.config.require_tombstones);
self.version = new_version;

Ok(())
}

/// Updates the DeltaTable to the most recent state committed to the transaction log by
/// loading the last checkpoint and incrementally applying each version since.
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
Expand All @@ -777,33 +839,19 @@ impl DeltaTable {
/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
self.version += 1;
while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version).await? {
self.apply_actions(version, actions)?;
}

loop {
match self.apply_log(self.version).await {
Ok(_) => {
self.version += 1;
}
Err(e) => {
match e {
ApplyLogError::EndOfLog => {
self.version -= 1;
if self.version == -1 {
let err = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
self.table_uri
);
return Err(DeltaTableError::NotATable(err));
}
}
_ => {
return Err(DeltaTableError::from(e));
}
}
return Ok(());
}
}
if self.version == -1 {
let err = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
self.table_uri
);
return Err(DeltaTableError::NotATable(err));
}

Ok(())
}

/// Loads the DeltaTable state for the given version.
Expand Down
11 changes: 10 additions & 1 deletion rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
ApplyLogError, CheckPoint, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable,
DeltaTableError, DeltaTableMetaData,
};
use crate::action;
use crate::action::{self, Action};
use crate::delta_config;

/// State snapshot currently held by the Delta Table instance.
Expand Down Expand Up @@ -54,6 +54,15 @@ impl DeltaTableState {
Ok(new_state)
}

/// Construct a delta table state object from a list of actions
pub fn from_actions(actions: Vec<Action>) -> Result<Self, ApplyLogError> {
let mut new_state = DeltaTableState::default();
for action in actions {
new_state.process_action(action, true)?;
}
Ok(new_state)
}

/// Construct a delta table state object from checkpoint.
pub async fn from_checkpoint(
table: &DeltaTable,
Expand Down
37 changes: 37 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate deltalake;
use chrono::Utc;
use deltalake::storage::file::FileStorageBackend;
use deltalake::DeltaTableBuilder;
use deltalake::PeekCommit;
use deltalake::StorageBackend;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
Expand Down Expand Up @@ -469,3 +470,39 @@ async fn test_table_history() {
.expect("Cannot get table history");
assert_eq!(history3.len(), 5);
}

#[tokio::test]
async fn test_poll_table_commits() {
let path = "./tests/data/simple_table_with_checkpoint";
let mut table = deltalake::open_table_with_version(path, 9).await.unwrap();
let peek = table.peek_next_commit(table.version).await.unwrap();

let is_new = if let PeekCommit::New(version, actions) = peek {
assert_eq!(table.version, 9);
assert!(!table
.get_files_iter()
.any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"));

assert_eq!(version, 10);
assert_eq!(actions.len(), 2);

table.apply_actions(version, actions).unwrap();

assert_eq!(table.version, 10);
assert!(table
.get_files_iter()
.any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"));

true
} else {
false
};
assert!(is_new);

let peek = table.peek_next_commit(table.version).await.unwrap();
let is_up_to_date = match peek {
PeekCommit::UpToDate => true,
_ => false,
};
assert!(is_up_to_date);
}

0 comments on commit 80a7148

Please sign in to comment.