Skip to content

Commit

Permalink
add comment and rename to make code more clear
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Nov 20, 2024
1 parent ac29484 commit 88c7ff6
Showing 1 changed file with 50 additions and 56 deletions.
106 changes: 50 additions & 56 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, StructArray};
use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef};
use itertools::Itertools;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::spec::{DataFile, Struct};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
Expand All @@ -43,7 +44,9 @@ impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {

/// Config for `EqualityDeleteWriter`.
pub struct EqualityDeleteWriterConfig {
// Field ids used to determine row equality in equality delete files.
equality_ids: Vec<usize>,
// Projector used to project the data chunk into specific fields.
projector: ArrowFieldProjector,
partition_value: Struct,
}
Expand Down Expand Up @@ -78,7 +81,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuil
}
}

/// A writer write data
/// Writer used to write equality delete files.
pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
inner_writer: Option<B::R>,
projector: ArrowFieldProjector,
Expand Down Expand Up @@ -122,32 +125,32 @@ impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
}
}

/// Help to project specific field from `RecordBatch`` according to the column id.
/// Help to project specific field from `RecordBatch`` according to the fields id.
#[derive(Clone)]
pub struct ArrowFieldProjector {
index_vec_vec: Vec<Vec<usize>>,
// A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure.
// E.g. [[0], [1, 2]] means the first field is accessed directly from the first column,
// while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column).
field_indexs: Vec<Vec<usize>>,

Check warning on line 134 in crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

View workflow job for this annotation

GitHub Actions / typos check

"indexs" should be "indexes" or "indices".
// The schema reference after projection. This schema is derived from the original schema based on the given field IDs.
projected_schema: SchemaRef,
}

impl ArrowFieldProjector {
/// Init FieldProjector
pub fn new(
original_schema: SchemaRef,
column_ids: &[usize],
column_id_meta_key: &str,
) -> Result<Self> {
let mut index_vec_vec = Vec::with_capacity(column_ids.len());
let mut fields = Vec::with_capacity(column_ids.len());
for &id in column_ids {
let mut index_vec = vec![];
if let Ok(field) = Self::fetch_column_index(
/// Init ArrowFieldProjector
pub fn new(original_schema: SchemaRef, field_ids: &[usize]) -> Result<Self> {
let mut field_indexs = Vec::with_capacity(field_ids.len());

Check warning on line 142 in crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

View workflow job for this annotation

GitHub Actions / typos check

"indexs" should be "indexes" or "indices".
let mut fields = Vec::with_capacity(field_ids.len());
for &id in field_ids {
let mut field_index = vec![];
if let Ok(field) = Self::fetch_field_index(
original_schema.fields(),
&mut index_vec,
&mut field_index,
id as i64,
column_id_meta_key,
PARQUET_FIELD_ID_META_KEY,
) {
fields.push(field.clone());
index_vec_vec.push(index_vec);
field_indexs.push(field_index);

Check warning on line 153 in crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

View workflow job for this annotation

GitHub Actions / typos check

"indexs" should be "indexes" or "indices".
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
Expand All @@ -160,34 +163,12 @@ impl ArrowFieldProjector {
}
let delete_arrow_schema = Arc::new(Schema::new(fields));
Ok(Self {
index_vec_vec,
field_indexs,

Check warning on line 166 in crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

View workflow job for this annotation

GitHub Actions / typos check

"indexs" should be "indexes" or "indices".
projected_schema: delete_arrow_schema,
})
}

/// Return the reference of projected schema
pub fn projected_schema_ref(&self) -> &SchemaRef {
&self.projected_schema
}

/// Do projection with record batch
pub fn project_bacth(&self, batch: RecordBatch) -> Result<RecordBatch> {
RecordBatch::try_new(
self.projected_schema.clone(),
self.project_column(batch.columns())?,
)
.map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}")))
}

/// Do projection with columns
pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
self.index_vec_vec
.iter()
.map(|index_vec| Self::get_column_by_index_vec(batch, index_vec))
.collect::<Result<Vec<_>>>()
}

fn fetch_column_index(
fn fetch_field_index(
fields: &Fields,
index_vec: &mut Vec<usize>,
col_id: i64,
Expand Down Expand Up @@ -216,7 +197,7 @@ impl ArrowFieldProjector {
}
if let DataType::Struct(inner) = field.data_type() {
let res =
Self::fetch_column_index(inner, index_vec, col_id, column_id_meta_key);
Self::fetch_field_index(inner, index_vec, col_id, column_id_meta_key);
if !index_vec.is_empty() {
index_vec.push(pos);
return res;
Expand All @@ -231,8 +212,30 @@ impl ArrowFieldProjector {
))
}

fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) -> Result<ArrayRef> {
let mut rev_iterator = index_vec.iter().rev();
/// Return the reference of projected schema
pub fn projected_schema_ref(&self) -> &SchemaRef {
&self.projected_schema
}

/// Do projection with record batch
pub fn project_bacth(&self, batch: RecordBatch) -> Result<RecordBatch> {
RecordBatch::try_new(
self.projected_schema.clone(),
self.project_column(batch.columns())?,
)
.map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}")))
}

/// Do projection with columns
pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
self.field_indexs

Check warning on line 231 in crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

View workflow job for this annotation

GitHub Actions / typos check

"indexs" should be "indexes" or "indices".
.iter()
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
.collect::<Result<Vec<_>>>()
}

fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result<ArrayRef> {
let mut rev_iterator = field_index.iter().rev();
let mut array = batch[*rev_iterator.next().unwrap()].clone();
for idx in rev_iterator {
array = array
Expand Down Expand Up @@ -467,8 +470,7 @@ mod test {
let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();

let equality_ids = vec![1, 3];
let projector =
ArrowFieldProjector::new(arrow_schema, &equality_ids, PARQUET_FIELD_ID_META_KEY)?;
let projector = ArrowFieldProjector::new(arrow_schema, &equality_ids)?;
let delete_schema = arrow_schema_to_schema(projector.projected_schema_ref()).unwrap();

// prepare writer
Expand Down Expand Up @@ -524,19 +526,11 @@ mod test {
};

let equality_id_float = vec![0];
let result_float = ArrowFieldProjector::new(
schema.clone(),
&equality_id_float,
PARQUET_FIELD_ID_META_KEY,
);
let result_float = ArrowFieldProjector::new(schema.clone(), &equality_id_float);
assert!(result_float.is_err());

let equality_ids_double = vec![1];
let result_double = ArrowFieldProjector::new(
schema.clone(),
&equality_ids_double,
PARQUET_FIELD_ID_META_KEY,
);
let result_double = ArrowFieldProjector::new(schema.clone(), &equality_ids_double);
assert!(result_double.is_err());

Ok(())
Expand Down

0 comments on commit 88c7ff6

Please sign in to comment.