Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement polling for table updates. (#413) #550

Merged
merged 2 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 74 additions & 26 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use serde_json::{Map, Value};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
use std::io::{BufRead, BufReader, Cursor};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{cmp::max, cmp::Ordering, collections::HashSet};
use uuid::Uuid;
Expand Down Expand Up @@ -178,9 +179,19 @@ pub enum DeltaTableError {
"Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
)]
InvalidVacuumRetentionPeriod,
/// Error returned when a line from log record is invalid.
#[error("Failed to read line from log record")]
Io {
/// Source error details returned while reading the log record.
#[from]
source: std::io::Error,
},
/// Error returned when transaction is failed to be committed because given version already exists.
#[error("Delta transaction failed, version {0} already exists.")]
VersionAlreadyExists(DeltaDataTypeVersion),
/// Error returned when user attempts to commit actions that don't belong to the next version.
#[error("Delta transaction failed, version {0} does not follow {1}")]
VersionMismatch(DeltaDataTypeVersion, DeltaDataTypeVersion),
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
Expand Down Expand Up @@ -407,7 +418,7 @@ fn extract_rel_path<'a, 'b>(
pub enum DeltaVersion {
/// load the newest version
Newest,
/// specify the version to laod
/// specify the version to load
Version(DeltaDataTypeVersion),
/// specify the timestamp in UTC
Timestamp(DateTime<Utc>),
Expand Down Expand Up @@ -535,6 +546,17 @@ impl DeltaTableBuilder {
}
}

/// The next commit that's available from underlying storage
/// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit
///
#[derive(Debug)]
pub enum PeekCommit {
/// The next commit version and assoicated actions
New(DeltaDataTypeVersion, Vec<Action>),
/// Provided DeltaVersion is up to date
UpToDate,
}

/// In memory representation of a Delta Table
pub struct DeltaTable {
/// The version of the table as of the most recent loaded Delta log entry.
Expand Down Expand Up @@ -755,6 +777,46 @@ impl DeltaTable {
self.update().await
}

/// Get the list of actions for the next commit
pub async fn peek_next_commit(
&self,
current_version: DeltaDataTypeVersion,
) -> Result<PeekCommit, DeltaTableError> {
let next_version = current_version + 1;
let commit_uri = self.commit_uri_from_version(next_version);
let commit_log_bytes = self.storage.get_obj(&commit_uri).await;
let commit_log_bytes = match commit_log_bytes {
Err(StorageError::NotFound) => return Ok(PeekCommit::UpToDate),
_ => commit_log_bytes?,
};

let reader = BufReader::new(Cursor::new(commit_log_bytes));

let mut actions = Vec::new();
for line in reader.lines() {
let action: action::Action = serde_json::from_str(line?.as_str())?;
actions.push(action);
}
Ok(PeekCommit::New(next_version, actions))
}

///Apply any actions assoicated with the PeekCommit to the DeltaTable
pub fn apply_actions(
&mut self,
new_version: DeltaDataTypeVersion,
actions: Vec<Action>,
) -> Result<(), DeltaTableError> {
if self.version + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(new_version, self.version));
}

let s = DeltaTableState::from_actions(actions)?;
self.state.merge(s, self.config.require_tombstones);
self.version = new_version;

Ok(())
}

/// Updates the DeltaTable to the most recent state committed to the transaction log by
/// loading the last checkpoint and incrementally applying each version since.
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
Expand All @@ -777,33 +839,19 @@ impl DeltaTable {
/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
self.version += 1;
while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version).await? {
self.apply_actions(version, actions)?;
}

loop {
match self.apply_log(self.version).await {
Ok(_) => {
self.version += 1;
}
Err(e) => {
match e {
ApplyLogError::EndOfLog => {
self.version -= 1;
if self.version == -1 {
let err = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
self.table_uri
);
return Err(DeltaTableError::NotATable(err));
}
}
_ => {
return Err(DeltaTableError::from(e));
}
}
return Ok(());
}
}
if self.version == -1 {
let err = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
self.table_uri
);
return Err(DeltaTableError::NotATable(err));
}

Ok(())
}

/// Loads the DeltaTable state for the given version.
Expand Down
11 changes: 10 additions & 1 deletion rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
ApplyLogError, CheckPoint, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable,
DeltaTableError, DeltaTableMetaData,
};
use crate::action;
use crate::action::{self, Action};
use crate::delta_config;

/// State snapshot currently held by the Delta Table instance.
Expand Down Expand Up @@ -54,6 +54,15 @@ impl DeltaTableState {
Ok(new_state)
}

/// Construct a delta table state object from a list of actions
pub fn from_actions(actions: Vec<Action>) -> Result<Self, ApplyLogError> {
let mut new_state = DeltaTableState::default();
for action in actions {
new_state.process_action(action, true)?;
}
Ok(new_state)
}

/// Construct a delta table state object from checkpoint.
pub async fn from_checkpoint(
table: &DeltaTable,
Expand Down
37 changes: 37 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate deltalake;
use chrono::Utc;
use deltalake::storage::file::FileStorageBackend;
use deltalake::DeltaTableBuilder;
use deltalake::PeekCommit;
use deltalake::StorageBackend;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
Expand Down Expand Up @@ -469,3 +470,39 @@ async fn test_table_history() {
.expect("Cannot get table history");
assert_eq!(history3.len(), 5);
}

#[tokio::test]
async fn test_poll_table_commits() {
let path = "./tests/data/simple_table_with_checkpoint";
let mut table = deltalake::open_table_with_version(path, 9).await.unwrap();
let peek = table.peek_next_commit(table.version).await.unwrap();

let is_new = if let PeekCommit::New(version, actions) = peek {
assert_eq!(table.version, 9);
assert!(!table
.get_files_iter()
.any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"));

assert_eq!(version, 10);
assert_eq!(actions.len(), 2);

table.apply_actions(version, actions).unwrap();

assert_eq!(table.version, 10);
assert!(table
.get_files_iter()
.any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"));

true
} else {
false
};
assert!(is_new);

let peek = table.peek_next_commit(table.version).await.unwrap();
let is_up_to_date = match peek {
PeekCommit::UpToDate => true,
_ => false,
};
assert!(is_up_to_date);
}