diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 6045bf410d..69076bd066 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -9,8 +9,9 @@ use itertools::Itertools; use lazy_static::lazy_static; use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore}; -use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use parquet::arrow::ProjectionMask; use regex::Regex; use serde::{Deserialize, Serialize}; use tracing::debug; @@ -250,19 +251,45 @@ impl LogSegment { pub(super) fn checkpoint_stream( &self, store: Arc, - _read_schema: &Schema, + read_schema: &Schema, config: &DeltaTableConfig, ) -> BoxStream<'_, DeltaResult> { let batch_size = config.log_batch_size; + let read_schema = Arc::new(read_schema.clone()); futures::stream::iter(self.checkpoint_files.clone()) .map(move |meta| { let store = store.clone(); + let read_schema = read_schema.clone(); async move { - let reader = ParquetObjectReader::new(store, meta); - let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index); - let builder = - ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?; - builder.with_batch_size(batch_size).build() + let mut reader = ParquetObjectReader::new(store, meta); + let options = ArrowReaderOptions::new(); + let reader_meta = ArrowReaderMetadata::load_async(&mut reader, options).await?; + + // Create projection selecting read_schema fields from parquet file's arrow schema + let projection = reader_meta + .schema() + .fields + .iter() + .enumerate() + .filter_map(|(i, f)| { + if read_schema.fields.contains_key(f.name()) { + Some(i) + } else { + None + } + }) + .collect::>(); + let projection = + ProjectionMask::roots(reader_meta.parquet_schema(), projection); + + // Note: the output batch stream batches have all null value rows for action types not + // present in the projection. When a RowFilter was used to remove null rows, the performance + // got worse when projecting all fields, and was no better when projecting a subset. + // The all null rows are filtered out anyway when the batch stream is consumed. + ParquetRecordBatchStreamBuilder::new_with_metadata(reader, reader_meta) + .with_projection(projection.clone()) + .with_batch_size(batch_size) + .build() } }) .buffered(config.log_buffer_size) @@ -514,7 +541,13 @@ pub(super) mod tests { use deltalake_test::utils::*; use tokio::task::JoinHandle; - use crate::checkpoints::create_checkpoint_from_table_uri_and_cleanup; + use crate::{ + checkpoints::{create_checkpoint_for, create_checkpoint_from_table_uri_and_cleanup}, + kernel::{Action, Add, Format, Remove}, + operations::transaction::{CommitBuilder, TableReference}, + protocol::{DeltaOperation, SaveMode}, + DeltaTableBuilder, + }; use super::*; @@ -737,4 +770,94 @@ pub(super) mod tests { assert!(!path.is_commit_file()); } } + + #[tokio::test] + async fn test_checkpoint_stream_parquet_read() { + let metadata = Metadata { + id: "test".to_string(), + format: Format::new("parquet".to_string(), None), + schema_string: r#"{"type":"struct", "fields": []}"#.to_string(), + ..Default::default() + }; + let protocol = Protocol::default(); + + let mut actions = vec![Action::Metadata(metadata), Action::Protocol(protocol)]; + for i in 0..10 { + actions.push(Action::Add(Add { + path: format!("part-{}.parquet", i), + modification_time: chrono::Utc::now().timestamp_millis() as i64, + ..Default::default() + })); + } + + let log_store = DeltaTableBuilder::from_uri("memory:///".to_string()) + .build_storage() + .unwrap(); + let op = DeltaOperation::Write { + mode: SaveMode::Overwrite, + partition_by: None, + predicate: None, + }; + let commit = CommitBuilder::default() + .with_actions(actions) + .build(None, log_store.clone(), op) + .await + .unwrap(); + + let mut actions = Vec::new(); + // remove all but one file + for i in 0..9 { + actions.push(Action::Remove(Remove { + path: format!("part-{}.parquet", i), + deletion_timestamp: Some(chrono::Utc::now().timestamp_millis() as i64), + ..Default::default() + })) + } + + let op = DeltaOperation::Delete { predicate: None }; + let table_data = &commit.snapshot as &dyn TableReference; + let commit = CommitBuilder::default() + .with_actions(actions) + .build(Some(table_data), log_store.clone(), op) + .await + .unwrap(); + + create_checkpoint_for(commit.version, &commit.snapshot, log_store.as_ref()) + .await + .unwrap(); + + let batches = LogSegment::try_new( + &Path::default(), + Some(commit.version), + log_store.object_store().as_ref(), + ) + .await + .unwrap() + .checkpoint_stream( + log_store.object_store(), + &StructType::new(vec![ + ActionType::Metadata.schema_field().clone(), + ActionType::Protocol.schema_field().clone(), + ActionType::Add.schema_field().clone(), + ]), + &Default::default(), + ) + .try_collect::>() + .await + .unwrap(); + + let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter()).unwrap(); + + // there are 9 remove action rows but all columns are null + // because the removes are not projected in the schema + // these get filtered out upstream and there was no perf + // benefit when applying a row filter + // in addition there is 1 add, 1 metadata, and 1 protocol row + assert_eq!(batch.num_rows(), 12); + + assert_eq!(batch.schema().fields().len(), 3); + assert!(batch.schema().field_with_name("metaData").is_ok()); + assert!(batch.schema().field_with_name("protocol").is_ok()); + assert!(batch.schema().field_with_name("add").is_ok()); + } }