Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: no longer load full table into ram in write #2265

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ pub async fn convert_tpcds_web_returns(input_path: String, table_path: String) -
.await
.unwrap();

let tbl = table.collect().await.unwrap();
let _schema = tbl[0].schema().clone();

DeltaOps::try_from_uri(table_path)
.await
.unwrap()
.write(table.collect().await.unwrap())
.write(tbl.into())
.with_partition_columns(vec!["wr_returned_date_sk"])
.await
.unwrap();
Expand Down Expand Up @@ -551,7 +554,7 @@ async fn main() {
]));

let batch = RecordBatch::try_new(
schema,
schema.clone(),
vec![
Arc::new(StringArray::from(group_ids)),
Arc::new(StringArray::from(name)),
Expand All @@ -565,7 +568,7 @@ async fn main() {
DeltaOps::try_from_uri(output)
.await
.unwrap()
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,7 @@ impl From<Column> for DeltaColumn {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
Expand Down Expand Up @@ -1911,7 +1912,7 @@ mod tests {
.unwrap();
// write some data
let table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -1975,7 +1976,7 @@ mod tests {
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ mod tests {
use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use datafusion_expr::{col, lit};

use crate::operations::write::WriteData;
use crate::writer::test_utils::{create_bare_table, get_arrow_schema, get_record_batch};
use crate::{DeltaOps, DeltaResult, DeltaTable};

Expand Down Expand Up @@ -244,7 +245,7 @@ mod tests {
async fn add_constraint_with_invalid_data() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -260,7 +261,7 @@ mod tests {
async fn add_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -285,7 +286,7 @@ mod tests {
// Add constraint by providing a datafusion expression.
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand Down Expand Up @@ -328,7 +329,7 @@ mod tests {
)
.unwrap();

let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap();
let table = DeltaOps::new_in_memory().write(batch.into()).await.unwrap();

let mut table = DeltaOps(table)
.add_constraint()
Expand All @@ -351,7 +352,7 @@ mod tests {
async fn add_conflicting_named_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -373,7 +374,7 @@ mod tests {
async fn write_data_that_violates_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;

let table = DeltaOps(write)
Expand All @@ -387,7 +388,7 @@ mod tests {
Arc::new(StringArray::from(vec!["2021-02-02"])),
];
let batch = RecordBatch::try_new(get_arrow_schema(&None), invalid_values)?;
let err = table.write(vec![batch]).await;
let err = table.write(batch.into()).await;
assert!(err.is_err());
Ok(())
}
Expand All @@ -396,11 +397,11 @@ mod tests {
async fn write_data_that_does_not_violate_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

let err = table.write(vec![batch]).await;
let err = table.write(batch.into()).await;

assert!(err.is_ok());
Ok(())
Expand Down
18 changes: 11 additions & 7 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl std::future::IntoFuture for DeleteBuilder {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
Expand Down Expand Up @@ -396,7 +397,7 @@ mod tests {
.unwrap();
// write some data
let table = DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -458,7 +459,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand All @@ -482,7 +483,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -549,7 +550,7 @@ mod tests {
)
.unwrap();

DeltaOps::new_in_memory().write(vec![batch]).await.unwrap()
DeltaOps::new_in_memory().write(batch.into()).await.unwrap()
}

// Validate behaviour of greater than
Expand Down Expand Up @@ -638,7 +639,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -696,7 +697,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -765,7 +766,10 @@ mod tests {
];
let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];

let table = DeltaOps::new_in_memory().write(batches).await.unwrap();
let table = DeltaOps::new_in_memory()
.write(batches.into())
.await
.unwrap();

let (table, _metrics) = DeltaOps(table)
.delete()
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl std::future::IntoFuture for DropConstraintBuilder {
#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::writer::test_utils::{create_bare_table, get_record_batch};
use crate::{DeltaOps, DeltaResult, DeltaTable};

Expand All @@ -121,7 +122,7 @@ mod tests {
async fn drop_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -145,7 +146,7 @@ mod tests {
async fn drop_invalid_constraint_not_existing() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;

let table = DeltaOps(write)
Expand All @@ -161,7 +162,7 @@ mod tests {
async fn drop_invalid_constraint_ignore() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.write(batch.clone().into())
.await?;

let version = write.version();
Expand Down
9 changes: 7 additions & 2 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl std::future::IntoFuture for LoadBuilder {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;
use crate::operations::{collect_sendable_stream, DeltaOps};
use crate::writer::test_utils::{get_record_batch, TestResult};
use crate::DeltaTableBuilder;
Expand Down Expand Up @@ -115,7 +116,9 @@ mod tests {
#[tokio::test]
async fn test_write_load() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?;
let table = DeltaOps::new_in_memory()
.write(batch.clone().into())
.await?;

let (_table, stream) = DeltaOps(table).load().await?;
let data = collect_sendable_stream(stream).await?;
Expand Down Expand Up @@ -146,7 +149,9 @@ mod tests {
#[tokio::test]
async fn test_load_with_columns() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?;
let table = DeltaOps::new_in_memory()
.write(batch.clone().into())
.await?;

let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?;
let data = collect_sendable_stream(stream).await?;
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,7 @@ mod tests {
use crate::kernel::StructField;
use crate::operations::merge::generalize_filter;
use crate::operations::merge::try_construct_early_filter;
use crate::operations::write::WriteData;
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
Expand Down Expand Up @@ -1604,7 +1605,7 @@ mod tests {
.unwrap();
// write some data
DeltaOps(table)
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(SaveMode::Append)
.await
.unwrap()
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use self::{
};
#[cfg(feature = "datafusion")]
pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
#[cfg(feature = "datafusion")]
use arrow::record_batch::RecordBatch;

use optimize::OptimizeBuilder;
use restore::RestoreBuilder;

Expand Down Expand Up @@ -137,8 +136,8 @@ impl DeltaOps {
/// Write data to Delta table
#[cfg(feature = "datafusion")]
#[must_use]
pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
WriteBuilder::new(self.0.log_store, self.0.state).with_input_batches(batches)
pub fn write(self, data: write::WriteData) -> WriteBuilder {
WriteBuilder::new(self.0.log_store, self.0.state).with_data(data)
}

/// Vacuum stale files from delta table
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,8 @@ pub(super) mod zorder {

#[cfg(test)]
mod tests {
use crate::operations::write::WriteData;

use super::*;
use ::datafusion::assert_batches_eq;
use arrow_array::{Int32Array, StringArray};
Expand Down Expand Up @@ -1357,7 +1359,7 @@ pub(super) mod zorder {
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ mod tests {
use crate::kernel::PrimitiveType;
use crate::kernel::StructField;
use crate::kernel::StructType;
use crate::operations::write::WriteData;
use crate::operations::DeltaOps;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::datafusion::write_batch;
Expand Down Expand Up @@ -525,7 +526,7 @@ mod tests {
)
.unwrap();

DeltaOps::new_in_memory().write(vec![batch]).await.unwrap()
DeltaOps::new_in_memory().write(batch.into()).await.unwrap()
}

#[tokio::test]
Expand Down
Loading
Loading