Skip to content

Commit

Permalink
feat(python): expose rust writer as additional engine v2 (#1891)
Browse files Browse the repository at this point in the history
# Description
- Adds rust writer as additional engine in python
- Adds overwrite schema functionality to the rust writer. @roeap feel
free to point out improvements 😄

A couple gaps will exist between current Rust writer and pyarrow writer.
We will have to solve this in a later PR:
- Replacewhere (partition filter / predicate) overwrite  
(users however can solve this by doing DeltaTabel.delete and then
append)

# Related Issue(s)
- closes #1861

---------

Signed-off-by: Nikolay Ulmasov <[email protected]>
Co-authored-by: Robert Pack <[email protected]>
Co-authored-by: Robert Pack <[email protected]>
Co-authored-by: David Blajda <[email protected]>
Co-authored-by: Nikolay Ulmasov <[email protected]>
Co-authored-by: Matthew Powers <[email protected]>
Co-authored-by: Thomas Frederik Hoeck <[email protected]>
Co-authored-by: Adrian Ehrsam <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Co-authored-by: Marijn Valk <[email protected]>
  • Loading branch information
10 people authored Nov 29, 2023
1 parent 733b5ff commit e6ad2e0
Show file tree
Hide file tree
Showing 13 changed files with 743 additions and 267 deletions.
1 change: 1 addition & 0 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#![deny(warnings)]
#![deny(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]
#![allow(clippy::nonminimal_bool)]

#[cfg(all(feature = "parquet", feature = "parquet2"))]
compile_error!(
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async fn excute_non_empty_expr(
None,
writer_properties,
false,
false,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
)
.await?;

Expand Down
17 changes: 17 additions & 0 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use self::vacuum::VacuumBuilder;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::table::builder::DeltaTableBuilder;
use crate::DeltaTable;
use std::collections::HashMap;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod convert_to_delta;
Expand Down Expand Up @@ -73,6 +74,22 @@ impl DeltaOps {
}
}

/// try from uri with storage options
pub async fn try_from_uri_with_storage_options(
uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> DeltaResult<Self> {
let mut table = DeltaTableBuilder::from_uri(uri)
.with_storage_options(storage_options)
.build()?;
// We allow for uninitialized locations, since we may want to create the table
match table.load().await {
Ok(_) => Ok(table.into()),
Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
Err(err) => Err(err),
}
}

/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
///
/// Using this will not persist any changes beyond the lifetime of the table object.
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
)
.await?;

Expand Down
99 changes: 91 additions & 8 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use super::writer::{DeltaWriter, WriterConfig};
use super::{transaction::commit, CreateBuilder};
use crate::delta_datafusion::DeltaDataChecker;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Add, Remove, StructType};
use crate::kernel::{Action, Add, Metadata, Remove, StructType};
use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
Expand Down Expand Up @@ -103,12 +103,20 @@ pub struct WriteBuilder {
write_batch_size: Option<usize>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
/// whether to overwrite the schema
overwrite_schema: bool,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
safe_cast: bool,
/// Parquet writer properties
writer_properties: Option<WriterProperties>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
/// Name of the table, only used when table doesn't exist yet
name: Option<String>,
/// Description of the table, only used when table doesn't exist yet
description: Option<String>,
/// Configurations of the delta table, only used when table doesn't exist
configuration: HashMap<String, Option<String>>,
}

impl WriteBuilder {
Expand All @@ -126,8 +134,12 @@ impl WriteBuilder {
write_batch_size: None,
batches: None,
safe_cast: false,
overwrite_schema: false,
writer_properties: None,
app_metadata: None,
name: None,
description: None,
configuration: Default::default(),
}
}

Expand All @@ -137,6 +149,12 @@ impl WriteBuilder {
self
}

/// Add overwrite_schema
pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self {
self.overwrite_schema = overwrite_schema;
self
}

/// When using `Overwrite` mode, replace data that matches a predicate
pub fn with_replace_where(mut self, predicate: impl Into<String>) -> Self {
self.predicate = Some(predicate.into());
Expand Down Expand Up @@ -205,6 +223,31 @@ impl WriteBuilder {
self
}

/// Specify the table name. Optionally qualified with
/// a database name [database_name.] table_name.
pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}

/// Comment to describe the table.
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}

/// Set configuration on created table
pub fn with_configuration(
mut self,
configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>,
) -> Self {
self.configuration = configuration
.into_iter()
.map(|(k, v)| (k.into(), v.map(|s| s.into())))
.collect();
self
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
match self.log_store.is_delta_table_location().await? {
true => {
Expand All @@ -229,10 +272,20 @@ impl WriteBuilder {
}?;
let mut builder = CreateBuilder::new()
.with_log_store(self.log_store.clone())
.with_columns(schema.fields().clone());
.with_columns(schema.fields().clone())
.with_configuration(self.configuration.clone());
if let Some(partition_columns) = self.partition_columns.as_ref() {
builder = builder.with_partition_columns(partition_columns.clone())
}

if let Some(name) = self.name.as_ref() {
builder = builder.with_table_name(name.clone());
};

if let Some(desc) = self.description.as_ref() {
builder = builder.with_comment(desc.clone());
};

let (_, actions, _) = builder.into_table_and_actions()?;
Ok(actions)
}
Expand All @@ -251,14 +304,19 @@ pub(crate) async fn write_execution_plan(
write_batch_size: Option<usize>,
writer_properties: Option<WriterProperties>,
safe_cast: bool,
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
let invariants = snapshot
.current_metadata()
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();

// Use input schema to prevent wrapping partitions columns into a dictionary.
let schema = snapshot.input_schema().unwrap_or(plan.schema());
let schema: ArrowSchemaRef = if overwrite_schema {
plan.schema()
} else {
snapshot.input_schema().unwrap_or(plan.schema())
};

let checker = DeltaDataChecker::new(invariants);

Expand Down Expand Up @@ -339,23 +397,26 @@ impl std::future::IntoFuture for WriteBuilder {
Ok(this.partition_columns.unwrap_or_default())
}?;

let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into();
let plan = if let Some(plan) = this.input {
Ok(plan)
} else if let Some(batches) = this.batches {
if batches.is_empty() {
Err(WriteError::MissingData)
} else {
let schema = batches[0].schema();
schema = batches[0].schema();
let table_schema = this
.snapshot
.physical_arrow_schema(this.log_store.object_store().clone())
.await
.or_else(|_| this.snapshot.arrow_schema())
.unwrap_or(schema.clone());

if !can_cast_batch(schema.fields(), table_schema.fields()) {
if !can_cast_batch(schema.fields(), table_schema.fields())
&& !(this.overwrite_schema && matches!(this.mode, SaveMode::Overwrite))
{
return Err(DeltaTableError::Generic(
"Updating table schema not yet implemented".to_string(),
"Schema of data does not match table schema".to_string(),
));
};

Expand Down Expand Up @@ -390,7 +451,7 @@ impl std::future::IntoFuture for WriteBuilder {
vec![batches]
};

Ok(Arc::new(MemoryExec::try_new(&data, schema, None)?)
Ok(Arc::new(MemoryExec::try_new(&data, schema.clone(), None)?)
as Arc<dyn ExecutionPlan>)
}
} else {
Expand All @@ -415,12 +476,31 @@ impl std::future::IntoFuture for WriteBuilder {
this.write_batch_size,
this.writer_properties,
this.safe_cast,
this.overwrite_schema,
)
.await?;
actions.extend(add_actions.into_iter().map(Action::Add));

// Collect remove actions if we are overwriting the table
if matches!(this.mode, SaveMode::Overwrite) {
// Update metadata with new schema
let table_schema = this
.snapshot
.physical_arrow_schema(this.log_store.object_store().clone())
.await
.or_else(|_| this.snapshot.arrow_schema())
.unwrap_or(schema.clone());

if schema != table_schema {
let mut metadata = this
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.clone();
metadata.schema = schema.clone().try_into()?;
let metadata_action = Metadata::try_from(metadata)?;
actions.push(Action::Metadata(metadata_action));
}
// This should never error, since now() will always be larger than UNIX_EPOCH
let deletion_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -445,7 +525,10 @@ impl std::future::IntoFuture for WriteBuilder {

match this.predicate {
Some(_pred) => {
todo!("Overwriting data based on predicate is not yet implemented")
return Err(DeltaTableError::Generic(
"Overwriting data based on predicate is not yet implemented"
.to_string(),
));
}
_ => {
let remove_actions = this
Expand Down
25 changes: 11 additions & 14 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,22 +468,19 @@ fn apply_stats_conversion(
data_type: &DataType,
) {
if path.len() == 1 {
match data_type {
DataType::Primitive(PrimitiveType::Timestamp) => {
let v = context.get_mut(&path[0]);

if let Some(v) = v {
let ts = v
.as_str()
.and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok())
.map(|n| Value::Number(serde_json::Number::from(n)));

if let Some(ts) = ts {
*v = ts;
}
if let DataType::Primitive(PrimitiveType::Timestamp) = data_type {
let v = context.get_mut(&path[0]);

if let Some(v) = v {
let ts = v
.as_str()
.and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok())
.map(|n| Value::Number(serde_json::Number::from(n)));

if let Some(ts) = ts {
*v = ts;
}
}
_ => { /* noop */ }
}
} else {
let next_context = context.get_mut(&path[0]).and_then(|v| v.as_object_mut());
Expand Down
14 changes: 9 additions & 5 deletions crates/deltalake-core/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use std::io::Write;
use std::sync::Arc;

use arrow::array::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array,
as_boolean_array, as_generic_binary_array, as_largestring_array, as_primitive_array,
as_string_array, Array,
};
use arrow::datatypes::{
DataType, Date32Type, Date64Type, Int16Type, Int32Type, Int64Type, Int8Type,
Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Int8Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::json::ReaderBuilder;
use arrow::record_batch::*;
Expand Down Expand Up @@ -184,7 +185,10 @@ pub(crate) fn stringified_partition_value(
DataType::UInt16 => as_primitive_array::<UInt16Type>(arr).value(0).to_string(),
DataType::UInt32 => as_primitive_array::<UInt32Type>(arr).value(0).to_string(),
DataType::UInt64 => as_primitive_array::<UInt64Type>(arr).value(0).to_string(),
DataType::Float32 => as_primitive_array::<Float32Type>(arr).value(0).to_string(),
DataType::Float64 => as_primitive_array::<Float64Type>(arr).value(0).to_string(),
DataType::Utf8 => as_string_array(arr).value(0).to_string(),
DataType::LargeUtf8 => as_largestring_array(arr).value(0).to_string(),
DataType::Boolean => as_boolean_array(arr).value(0).to_string(),
DataType::Date32 => as_primitive_array::<Date32Type>(arr)
.value_as_date(0)
Expand Down
13 changes: 13 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ def write_new_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def write_to_deltalake(
table_uri: str,
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
max_rows_per_group: int,
overwrite_schema: bool,
predicate: Optional[str],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
partition_by: Optional[pyarrow.Schema],
Expand Down
Loading

0 comments on commit e6ad2e0

Please sign in to comment.