diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b058c8d25..592945544 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -133,7 +133,7 @@ impl ArrowReader { |(file_scan_task, file_io, tx)| async move { match file_scan_task { Ok(task) => { - let file_path = task.data_file_path().to_string(); + let file_path = task.data_file_path.to_string(); spawn(async move { Self::process_file_scan_task( @@ -171,7 +171,7 @@ impl ArrowReader { ) -> Result<()> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within - let parquet_file = file_io.new_input(task.data_file_path())?; + let parquet_file = file_io.new_input(&task.data_file_path)?; let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); @@ -187,8 +187,8 @@ impl ArrowReader { // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response let projection_mask = Self::get_arrow_projection_mask( - task.project_field_ids(), - task.schema(), + &task.project_field_ids, + &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), )?; @@ -198,7 +198,7 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - if let Some(predicate) = task.predicate() { + if let Some(predicate) = &task.predicate { let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), predicate, @@ -218,7 +218,7 @@ impl ArrowReader { predicate, record_batch_stream_builder.metadata(), &field_id_map, - task.schema(), + &task.schema, )?; selected_row_groups = Some(result); diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 45d7d4fd1..f1cb86ab3 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{ - DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema, - SchemaRef, SnapshotRef, TableMetadataRef, + DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile, + ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::utils::available_parallelism; @@ -529,14 +529,19 @@ impl ManifestEntryContext { /// created from it fn into_file_scan_task(self) -> FileScanTask { FileScanTask { - data_file_path: self.manifest_entry.file_path().to_string(), start: 0, length: self.manifest_entry.file_size_in_bytes(), + record_count: Some(self.manifest_entry.record_count()), + + data_file_path: self.manifest_entry.file_path().to_string(), + data_file_content: self.manifest_entry.content_type(), + data_file_format: self.manifest_entry.file_format(), + + schema: self.snapshot_schema, project_field_ids: self.field_ids.to_vec(), predicate: self .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), - schema: self.snapshot_schema, } } } @@ -854,35 +859,30 @@ impl ExpressionEvaluatorCache { /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileScanTask { - data_file_path: String, - start: u64, - length: u64, - project_field_ids: Vec, + /// The start offset of the file to scan. + pub start: u64, + /// The length of the file to scan. + pub length: u64, + /// The number of records in the file to scan. + /// + /// This is an optional field, and only available if we are + /// reading the entire data file. + pub record_count: Option, + + /// The data file path corresponding to the task. + pub data_file_path: String, + /// The content type of the file to scan. + pub data_file_content: DataContentType, + /// The format of the file to scan. + pub data_file_format: DataFileFormat, + + /// The schema of the file to scan. + pub schema: SchemaRef, + /// The field ids to project. + pub project_field_ids: Vec, + /// The predicate to filter. #[serde(skip_serializing_if = "Option::is_none")] - predicate: Option, - schema: SchemaRef, -} - -impl FileScanTask { - /// Returns the data file path of this file scan task. - pub fn data_file_path(&self) -> &str { - &self.data_file_path - } - - /// Returns the project field id of this file scan task. - pub fn project_field_ids(&self) -> &[i32] { - &self.project_field_ids - } - - /// Returns the predicate of this file scan task. - pub fn predicate(&self) -> Option<&BoundPredicate> { - self.predicate.as_ref() - } - - /// Returns the schema id of this file scan task. - pub fn schema(&self) -> &Schema { - &self.schema - } + pub predicate: Option, } #[cfg(test)] @@ -1219,17 +1219,17 @@ mod tests { assert_eq!(tasks.len(), 2); - tasks.sort_by_key(|t| t.data_file_path().to_string()); + tasks.sort_by_key(|t| t.data_file_path.to_string()); // Check first task is added data file assert_eq!( - tasks[0].data_file_path(), + tasks[0].data_file_path, format!("{}/1.parquet", &fixture.table_location) ); // Check second task is existing data file assert_eq!( - tasks[1].data_file_path(), + tasks[1].data_file_path, format!("{}/3.parquet", &fixture.table_location) ); } @@ -1582,22 +1582,28 @@ mod tests { ); let task = FileScanTask { data_file_path: "data_file_path".to_string(), + data_file_content: DataContentType::Data, start: 0, length: 100, project_field_ids: vec![1, 2, 3], predicate: None, schema: schema.clone(), + record_count: Some(100), + data_file_format: DataFileFormat::Parquet, }; test_fn(task); // with predicate let task = FileScanTask { data_file_path: "data_file_path".to_string(), + data_file_content: DataContentType::Data, start: 0, length: 100, project_field_ids: vec![1, 2, 3], predicate: Some(BoundPredicate::AlwaysTrue), schema, + record_count: None, + data_file_format: DataFileFormat::Avro, }; test_fn(task); } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index e1fe33c73..f0dfdf47c 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -23,7 +23,9 @@ use std::sync::Arc; use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; use bytes::Bytes; +use serde_derive::{Deserialize, Serialize}; use serde_json::to_vec; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use typed_builder::TypedBuilder; use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; @@ -866,6 +868,12 @@ impl ManifestEntry { &self.data_file.file_path } + /// Data file record count of the manifest entry. + #[inline] + pub fn record_count(&self) -> u64 { + self.data_file.record_count + } + /// Inherit data from manifest list, such as snapshot id, sequence number. pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) { if self.snapshot_id.is_none() { @@ -1141,7 +1149,7 @@ impl DataFile { } /// Type of content stored by the data file: data, equality deletes, or /// position deletes (all v1 files are data files) -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] pub enum DataContentType { /// value: 0 Data = 0, @@ -1168,7 +1176,7 @@ impl TryFrom for DataContentType { } /// Format of this data. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)] pub enum DataFileFormat { /// Avro file format: Avro,