diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 701297bed3..4d39c90275 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -357,10 +357,10 @@ fn parquet_bytes_from_state( let buf = serde_json::to_string(&j?).unwrap(); let _ = decoder.decode(buf.as_bytes())?; + total_actions += 1; while let Some(batch) = decoder.flush()? { writer.write(&batch)?; } - total_actions += 1; } let _ = writer.close()?; @@ -1095,4 +1095,65 @@ mod tests { } }); } + + #[ignore = "This test is only useful if the batch size has been made small"] + #[tokio::test] + async fn test_checkpoint_large_table() -> crate::DeltaResult<()> { + use crate::writer::test_utils::get_arrow_schema; + + let table_schema = get_delta_schema(); + let temp_dir = tempfile::tempdir()?; + let table_path = temp_dir.path().to_str().unwrap(); + let mut table = DeltaOps::try_from_uri(&table_path) + .await? + .create() + .with_columns(table_schema.fields().cloned()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let count = 20; + + for _ in 0..count { + table.load().await?; + let batch = RecordBatch::try_new( + Arc::clone(&get_arrow_schema(&None)), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-02", + "2021-02-03", + "2021-02-02", + "2021-02-04", + ])), + ], + ) + .unwrap(); + let _ = DeltaOps(table.clone()).write(vec![batch]).await?; + } + + table.load().await?; + assert_eq!(table.version(), count, "Expected {count} transactions"); + let pre_checkpoint_actions = table.snapshot()?.file_actions()?; + + let before = table.version(); + let res = create_checkpoint(&table).await; + assert!(res.is_ok(), "Failed to create the checkpoint! {res:#?}"); + + let table = crate::open_table(&table_path).await?; + assert_eq!( + before, + table.version(), + "Why on earth did a checkpoint creata version?" + ); + + let post_checkpoint_actions = table.snapshot()?.file_actions()?; + + assert_eq!( + pre_checkpoint_actions.len(), + post_checkpoint_actions.len(), + "The number of actions read from the table after checkpointing is wrong!" + ); + Ok(()) + } }