Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace BTreeMap with IndexMap to preserve insertion order #2150

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ errno = "0.3"
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
hyper = { version = "0.14", optional = true }
indexmap = "2.2.1"
itertools = "0.12"
lazy_static = "1"
libc = ">=0.2.90, <1"
Expand Down
23 changes: 11 additions & 12 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use chrono::{NaiveDateTime, TimeZone, Utc};
use indexmap::IndexMap;
use object_store::path::Path;
use object_store::ObjectMeta;
use percent_encoding::percent_decode_str;
Expand All @@ -19,37 +20,35 @@ const COL_MIN_VALUES: &str = "minValues";
const COL_MAX_VALUES: &str = "maxValues";
const COL_NULL_COUNT: &str = "nullCount";

pub(crate) type PartitionFields<'a> = Arc<BTreeMap<&'a str, &'a StructField>>;
pub(crate) type PartitionValues<'a> = BTreeMap<&'a str, Scalar>;
pub(crate) type PartitionFields<'a> = Arc<IndexMap<&'a str, &'a StructField>>;
pub(crate) type PartitionValues<'a> = IndexMap<&'a str, Scalar>;

pub(crate) trait PartitionsExt {
fn hive_partition_path(&self) -> String;
}

impl PartitionsExt for BTreeMap<&str, Scalar> {
impl PartitionsExt for IndexMap<&str, Scalar> {
fn hive_partition_path(&self) -> String {
let mut fields = self
let fields = self
.iter()
.map(|(k, v)| {
let encoded = v.serialize_encoded();
format!("{k}={encoded}")
})
.collect::<Vec<_>>();
fields.reverse();
fields.join("/")
}
}

impl PartitionsExt for BTreeMap<String, Scalar> {
impl PartitionsExt for IndexMap<String, Scalar> {
fn hive_partition_path(&self) -> String {
let mut fields = self
let fields = self
.iter()
.map(|(k, v)| {
let encoded = v.serialize_encoded();
format!("{k}={encoded}")
})
.collect::<Vec<_>>();
fields.reverse();
fields.join("/")
}
}
Expand Down Expand Up @@ -192,7 +191,7 @@ impl LogicalFile<'_> {
/// The partition values for this logical file.
pub fn partition_values(&self) -> DeltaResult<PartitionValues<'_>> {
if self.partition_fields.is_empty() {
return Ok(BTreeMap::new());
return Ok(IndexMap::new());
}
let map_value = self.partition_values.value(self.index);
let keys = map_value
Expand Down Expand Up @@ -237,7 +236,7 @@ impl LogicalFile<'_> {
.unwrap_or(Scalar::Null(f.data_type.clone()));
Ok((*k, val))
})
.collect::<DeltaResult<BTreeMap<_, _>>>()
.collect::<DeltaResult<IndexMap<_, _>>>()
}

/// Defines a deletion vector
Expand Down Expand Up @@ -355,7 +354,7 @@ impl<'a> FileStatsAccessor<'a> {
.partition_columns
.iter()
.map(|c| Ok((c.as_str(), schema.field_with_name(c.as_str())?)))
.collect::<DeltaResult<BTreeMap<_, _>>>()?,
.collect::<DeltaResult<IndexMap<_, _>>>()?,
);
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
let deletion_vector = deletion_vector.and_then(|dv| {
Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow_array::RecordBatch;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{Future, StreamExt, TryStreamExt};
use indexmap::IndexMap;
use itertools::Itertools;
use num_cpus;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
Expand Down Expand Up @@ -308,7 +309,7 @@ impl From<OptimizeInput> for DeltaOperation {

fn create_remove(
path: &str,
partitions: &BTreeMap<String, Scalar>,
partitions: &IndexMap<String, Scalar>,
size: i64,
) -> Result<Action, DeltaTableError> {
// NOTE unwrap is safe since UNIX_EPOCH will always be earlier then now.
Expand Down Expand Up @@ -353,11 +354,11 @@ enum OptimizeOperations {
///
/// Bins are determined by the bin-packing algorithm to reach an optimal size.
/// Files that are large enough already are skipped. Bins of size 1 are dropped.
Compact(HashMap<String, (BTreeMap<String, Scalar>, Vec<MergeBin>)>),
Compact(HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)>),
/// Plan to Z-order each partition
ZOrder(
Vec<String>,
HashMap<String, (BTreeMap<String, Scalar>, MergeBin)>,
HashMap<String, (IndexMap<String, Scalar>, MergeBin)>,
),
// TODO: Sort
}
Expand Down Expand Up @@ -401,7 +402,7 @@ impl MergePlan {
/// collected during the operation.
async fn rewrite_files<F>(
task_parameters: Arc<MergeTaskParameters>,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
files: MergeBin,
object_store: ObjectStoreRef,
read_stream: F,
Expand Down Expand Up @@ -849,7 +850,7 @@ fn build_compaction_plan(
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
let mut metrics = Metrics::default();

let mut partition_files: HashMap<String, (BTreeMap<String, Scalar>, Vec<ObjectMeta>)> =
let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, Vec<ObjectMeta>)> =
HashMap::new();
for add in snapshot.get_active_add_actions_by_partitions(filters)? {
let add = add?;
Expand All @@ -863,7 +864,7 @@ fn build_compaction_plan(
.partition_values()?
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<BTreeMap<_, _>>();
.collect::<IndexMap<_, _>>();

partition_files
.entry(add.partition_values()?.hive_partition_path())
Expand All @@ -877,7 +878,7 @@ fn build_compaction_plan(
file.sort_by(|a, b| b.size.cmp(&a.size));
}

let mut operations: HashMap<String, (BTreeMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
for (part, (partition, files)) in partition_files {
let mut merge_bins = vec![MergeBin::new()];

Expand Down Expand Up @@ -955,14 +956,14 @@ fn build_zorder_plan(
// For now, just be naive and optimize all files in each selected partition.
let mut metrics = Metrics::default();

let mut partition_files: HashMap<String, (BTreeMap<String, Scalar>, MergeBin)> = HashMap::new();
let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, MergeBin)> = HashMap::new();
for add in snapshot.get_active_add_actions_by_partitions(filters)? {
let add = add?;
let partition_values = add
.partition_values()?
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<BTreeMap<_, _>>();
.collect::<IndexMap<_, _>>();
metrics.total_considered_files += 1;
let object_meta = ObjectMeta::try_from(&add)?;

Expand Down
11 changes: 6 additions & 5 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Abstractions and implementations for writing data to delta tables

use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
Expand Down Expand Up @@ -155,7 +156,7 @@ impl DeltaWriter {
pub async fn write_partition(
&mut self,
record_batch: RecordBatch,
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
) -> DeltaResult<()> {
let partition_key = Path::parse(partition_values.hive_partition_path())?;

Expand Down Expand Up @@ -217,7 +218,7 @@ pub(crate) struct PartitionWriterConfig {
/// Prefix applied to all paths
prefix: Path,
/// Values for all partition columns
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
/// Properties passed to underlying parquet writer
writer_properties: WriterProperties,
/// Size above which we will write a buffered parquet file to disk.
Expand All @@ -230,7 +231,7 @@ pub(crate) struct PartitionWriterConfig {
impl PartitionWriterConfig {
pub fn try_new(
file_schema: ArrowSchemaRef,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
Expand Down Expand Up @@ -514,7 +515,7 @@ mod tests {
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
BTreeMap::new(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
Expand Down
15 changes: 8 additions & 7 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Main writer API to write json messages to delta table
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::*;
use bytes::Bytes;
use indexmap::IndexMap;
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::{
Expand Down Expand Up @@ -45,7 +46,7 @@ pub(crate) struct DataArrowWriter {
writer_properties: WriterProperties,
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
buffered_record_batch_count: usize,
}

Expand Down Expand Up @@ -153,7 +154,7 @@ impl DataArrowWriter {
writer_properties.clone(),
)?;

let partition_values = BTreeMap::new();
let partition_values = IndexMap::new();
let buffered_record_batch_count = 0;

Ok(Self {
Expand Down Expand Up @@ -397,8 +398,8 @@ fn quarantine_failed_parquet_rows(
fn extract_partition_values(
partition_cols: &[String],
record_batch: &RecordBatch,
) -> Result<BTreeMap<String, Scalar>, DeltaWriterError> {
let mut partition_values = BTreeMap::new();
) -> Result<IndexMap<String, Scalar>, DeltaWriterError> {
let mut partition_values = IndexMap::new();

for col_name in partition_cols.iter() {
let arrow_schema = record_batch.schema();
Expand Down Expand Up @@ -499,15 +500,15 @@ mod tests {
&record_batch
)
.unwrap(),
BTreeMap::from([
IndexMap::from([
(String::from("col1"), Scalar::Integer(1)),
(String::from("col2"), Scalar::Integer(2)),
(String::from("col3"), Scalar::Null(DataType::INTEGER)),
])
);
assert_eq!(
extract_partition_values(&[String::from("col1")], &record_batch).unwrap(),
BTreeMap::from([(String::from("col1"), Scalar::Integer(1)),])
IndexMap::from([(String::from("col1"), Scalar::Integer(1)),])
);
assert!(extract_partition_values(&[String::from("col4")], &record_batch).is_err())
}
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! the writer. Once written, add actions are returned by the writer. It's the users responsibility
//! to create the transaction using those actions.

use std::collections::BTreeMap;
use std::{collections::HashMap, sync::Arc};

use arrow::array::{Array, UInt32Array};
Expand All @@ -15,6 +14,7 @@ use arrow_array::ArrayRef;
use arrow_row::{RowConverter, SortField};
use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::{arrow::ArrowWriter, errors::ParquetError};
use parquet::{basic::Compression, file::properties::WriterProperties};
Expand Down Expand Up @@ -127,7 +127,7 @@ impl RecordBatchWriter {
pub async fn write_partition(
&mut self,
record_batch: RecordBatch,
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
) -> Result<(), DeltaTableError> {
let arrow_schema =
arrow_schema_without_partitions(&self.arrow_schema_ref, &self.partition_columns);
Expand Down Expand Up @@ -212,7 +212,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
#[derive(Clone, Debug)]
pub struct PartitionResult {
/// values found in partition columns
pub partition_values: BTreeMap<String, Scalar>,
pub partition_values: IndexMap<String, Scalar>,
/// remaining dataset with partition column values removed
pub record_batch: RecordBatch,
}
Expand All @@ -222,14 +222,14 @@ struct PartitionWriter {
writer_properties: WriterProperties,
pub(super) buffer: ShareableBuffer,
pub(super) arrow_writer: ArrowWriter<ShareableBuffer>,
pub(super) partition_values: BTreeMap<String, Scalar>,
pub(super) partition_values: IndexMap<String, Scalar>,
pub(super) buffered_record_batch_count: usize,
}

impl PartitionWriter {
pub fn new(
arrow_schema: Arc<ArrowSchema>,
partition_values: BTreeMap<String, Scalar>,
partition_values: IndexMap<String, Scalar>,
writer_properties: WriterProperties,
) -> Result<Self, ParquetError> {
let buffer = ShareableBuffer::default();
Expand Down Expand Up @@ -302,7 +302,7 @@ pub(crate) fn divide_by_partition_values(

if partition_columns.is_empty() {
partitions.push(PartitionResult {
partition_values: BTreeMap::new(),
partition_values: IndexMap::new(),
record_batch: values.clone(),
});
return Ok(partitions);
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, ops::AddAssign};

use indexmap::IndexMap;
use parquet::format::FileMetaData;
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{basic::LogicalType, errors::ParquetError};
Expand All @@ -17,7 +17,7 @@ use crate::protocol::{ColumnValueStat, Stats};

/// Creates an [`Add`] log action struct.
pub fn create_add(
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
path: String,
size: i64,
file_metadata: &FileMetaData,
Expand Down Expand Up @@ -59,7 +59,7 @@ pub fn create_add(
}

fn stats_from_file_metadata(
partition_values: &BTreeMap<String, Scalar>,
partition_values: &IndexMap<String, Scalar>,
file_metadata: &FileMetaData,
) -> Result<Stats, DeltaWriterError> {
let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice());
Expand Down
Loading