Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to append to delta table without datafusion feature #2204

Closed
jhoekx opened this issue Feb 22, 2024 · 3 comments
Closed

Unable to append to delta table without datafusion feature #2204

jhoekx opened this issue Feb 22, 2024 · 3 comments
Assignees
Labels
binding/rust Issues for the Rust crate bug Something isn't working
Milestone

Comments

@jhoekx
Copy link
Contributor

jhoekx commented Feb 22, 2024

Environment

Delta-rs version:
0.17

Binding:
rust

Environment:

  • Cloud provider: -
  • OS: Linux
  • Other: -

Bug

What happened:

First, create a Delta table in rust and write a RecordBatch to it.
Then, open the exact same table and try to write again.

This fails with:

Error: Transaction { source: UnsupportedWriterFeatures([Invariants]) }

What you expected to happen:

Adding new record batches to the table I just created and already wrote to is possible.

How to reproduce it:

Given Cargo.toml:

[package]
name = "delta-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deltalake = { version = "0.17" }
tokio = "1"

Run src/main.rs to write an empty record batch twice:

use std::{future::IntoFuture, sync::Arc};

use deltalake::{
    arrow::array::{ArrayRef, Float64Builder, RecordBatch, TimestampMicrosecondBuilder},
    kernel::{DataType, PrimitiveType, StructField},
    open_table,
    operations::create::CreateBuilder,
    writer::{DeltaWriter, RecordBatchWriter},
    DeltaTable, DeltaTableError,
};

#[tokio::main]
async fn main() -> Result<(), DeltaTableError> {
    let ts: ArrayRef = Arc::new(TimestampMicrosecondBuilder::new().finish());
    let value: ArrayRef = Arc::new(Float64Builder::new().finish());
    let batch = RecordBatch::try_from_iter(vec![("ts", ts), ("value", value)]).unwrap();

    let mut table = create_or_get_table("./data/write").await?;

    let mut writer = RecordBatchWriter::for_table(&table)?;
    writer.write(batch.clone()).await?;
    writer.flush_and_commit(&mut table).await?;

    let mut table = create_or_get_table("./data/write").await?;

    let mut writer = RecordBatchWriter::for_table(&table)?;
    writer.write(batch.clone()).await?;
    writer.flush_and_commit(&mut table).await?;

    Ok(())
}

async fn create_or_get_table(table_uri: &str) -> Result<DeltaTable, DeltaTableError> {
    let table = match open_table(table_uri).await {
        Ok(table) => table,
        Err(err) => {
            if let DeltaTableError::NotATable(_) = err {
                let schema = vec![
                    StructField::new(
                        "ts".to_string(),
                        DataType::Primitive(PrimitiveType::Timestamp),
                        false,
                    ),
                    StructField::new(
                        "value".to_string(),
                        DataType::Primitive(PrimitiveType::Double),
                        false,
                    ),
                ];
                CreateBuilder::new()
                    .with_location(table_uri)
                    .with_columns(schema)
                    .into_future()
                    .await?
            } else {
                return Err(err.into());
            }
        }
    };
    Ok(table)
}

More details:

The reproduction case passes with features = ["datafusion"].

I would expect that I can perform the basic operation of adding a new record batch to the table without needing to pull in datafusion. This was possible in 0.16.

@jhoekx jhoekx added the bug Something isn't working label Feb 22, 2024
@rtyler rtyler self-assigned this Mar 2, 2024
@rtyler rtyler added the binding/rust Issues for the Rust crate label Mar 2, 2024
@rtyler rtyler added this to the Rust v0.18 milestone Mar 2, 2024
@rtyler
Copy link
Member

rtyler commented Mar 7, 2024

I cannot get this to reproduce inside of our integration tests but can easily reproduce it outside of them 🙃

@jhoekx
Copy link
Contributor Author

jhoekx commented May 7, 2024

@rtyler The example code is still producing the same error as before with:

deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev="81593e9" }

Has this been fixed on another branch?

Due to the recent timestamp changes, the empty array of timestamps in my example should now be created using:

    let ts: ArrayRef = Arc::new(
        TimestampMicrosecondBuilder::new()
            .with_timezone("UTC")
            .finish(),
    );

@jhoekx
Copy link
Contributor Author

jhoekx commented May 19, 2024

507c3a3 added a way to accomplish writing to a delta table without enabling the datafusion feature.

The idea is to create a table with the final reader and writer versions. No implicit features are defined in that case. This means the Invariants feature that requires datafusion is not part of the required set of features for writing.

    let mut table = CreateBuilder::new()
        .with_location("memory:")
        .with_columns(schema)
        .with_configuration_property(deltalake::DeltaConfigKey::MinReaderVersion, Some("3"))
        .with_configuration_property(deltalake::DeltaConfigKey::MinWriterVersion, Some("7"))
        .into_future()
        .await?;

Complete example:

use std::{future::IntoFuture, sync::Arc};

use deltalake::{
    arrow::array::{ArrayRef, Float64Builder, RecordBatch, TimestampMicrosecondBuilder},
    kernel::{DataType, PrimitiveType, StructField},
    operations::create::CreateBuilder,
    writer::{DeltaWriter, RecordBatchWriter},
    DeltaConfigKey, DeltaTableError,
};

#[tokio::main]
async fn main() -> Result<(), DeltaTableError> {
    let schema = vec![
        StructField::new(
            "ts".to_string(),
            DataType::Primitive(PrimitiveType::Timestamp),
            false,
        ),
        StructField::new(
            "value".to_string(),
            DataType::Primitive(PrimitiveType::Double),
            false,
        ),
    ];
    let mut table = CreateBuilder::new()
        .with_location("memory:")
        .with_columns(schema)
        .with_configuration_property(DeltaConfigKey::MinReaderVersion, Some("3"))
        .with_configuration_property(DeltaConfigKey::MinWriterVersion, Some("7"))
        .into_future()
        .await?;

    let ts: ArrayRef = Arc::new(
        TimestampMicrosecondBuilder::new()
            .finish()
            .with_timezone("UTC"),
    );
    let value: ArrayRef = Arc::new(Float64Builder::new().finish());
    let batch = RecordBatch::try_from_iter(vec![("ts", ts), ("value", value)]).unwrap();

    let mut writer = RecordBatchWriter::for_table(&table)?;
    writer.write(batch.clone()).await?;
    writer.flush_and_commit(&mut table).await?;

    Ok(())
}

cc @kallydev

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants