Skip to content

Commit

Permalink
fix: derive record merge strategy based on table configs
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 23, 2025
1 parent 3527f54 commit 39d641b
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 24 deletions.
74 changes: 71 additions & 3 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ impl ConfigParser for HudiTableConfig {
Self::DropsPartitionFields => Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
Self::RecordMergeStrategy => Some(HudiConfigValue::String(
RecordMergeStrategyValue::default().as_ref().to_string(),
)),
Self::TimelineTimezone => Some(HudiConfigValue::String(
TimelineTimezoneValue::UTC.as_ref().to_string(),
)),
Expand Down Expand Up @@ -238,6 +235,41 @@ impl ConfigParser for HudiTableConfig {
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
}
}

fn parse_value_or_default(&self, configs: &HashMap<String, String>) -> Self::Output {
self.parse_value(configs).unwrap_or_else(|_| {
match self {
Self::RecordMergeStrategy => {
let populates_meta_fields = HudiTableConfig::PopulatesMetaFields
.parse_value_or_default(configs)
.to::<bool>();
if !populates_meta_fields {
// When populatesMetaFields is false, meta fields such as record key and
// partition path are null, the table is supposed to be append-only.
return HudiConfigValue::String(
RecordMergeStrategyValue::AppendOnly.as_ref().to_string(),
);
}

if !configs.contains_key(HudiTableConfig::PrecombineField.as_ref()) {
// When precombine field is not available, we treat the table as append-only
return HudiConfigValue::String(
RecordMergeStrategyValue::AppendOnly.as_ref().to_string(),
);
}

return HudiConfigValue::String(
RecordMergeStrategyValue::OverwriteWithLatest
.as_ref()
.to_string(),
);
}
_ => self
.default_value()
.unwrap_or_else(|| panic!("No default value for config '{}'", self.as_ref())),
}
})
}
}

/// Config value for [HudiTableConfig::TableType].
Expand Down Expand Up @@ -310,6 +342,7 @@ impl FromStr for TimelineTimezoneValue {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::HudiConfigs;

#[test]
fn create_table_type() {
Expand Down Expand Up @@ -438,4 +471,39 @@ mod tests {
"hoodie.table.name"
);
}

#[test]
fn test_derive_record_merger_strategy() {
let hudi_configs = HudiConfigs::new(vec![
(HudiTableConfig::PopulatesMetaFields, "false"),
(HudiTableConfig::PrecombineField, "ts"),
]);
assert_eq!(
hudi_configs
.get_or_default(HudiTableConfig::RecordMergeStrategy)
.to::<String>(),
RecordMergeStrategyValue::AppendOnly.as_ref(),
"Should derive as append-only due to populatesMetaFields=false"
);

let hudi_configs = HudiConfigs::new(vec![(HudiTableConfig::PopulatesMetaFields, "true")]);
assert_eq!(
hudi_configs
.get_or_default(HudiTableConfig::RecordMergeStrategy)
.to::<String>(),
RecordMergeStrategyValue::AppendOnly.as_ref(),
"Should derive as append-only due to missing precombine field"
);

let hudi_configs = HudiConfigs::new(vec![
(HudiTableConfig::PopulatesMetaFields, "true"),
(HudiTableConfig::PrecombineField, "ts"),
]);
assert_eq!(
hudi_configs
.get_or_default(HudiTableConfig::RecordMergeStrategy)
.to::<String>(),
RecordMergeStrategyValue::OverwriteWithLatest.as_ref(),
);
}
}
6 changes: 0 additions & 6 deletions crates/core/src/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ pub enum RecordMergeStrategyValue {
OverwriteWithLatest,
}

impl Default for RecordMergeStrategyValue {
fn default() -> Self {
Self::OverwriteWithLatest
}
}

impl FromStr for RecordMergeStrategyValue {
type Err = ConfigError;

Expand Down
11 changes: 0 additions & 11 deletions crates/core/src/merge/record_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ pub struct RecordMerger {

impl RecordMerger {
/// Validates the given [HudiConfigs] against the [RecordMergeStrategy].
///
/// # Notes
/// This should be ideally called during table creation. However, an empty
/// table could have no precombine field being set, and we also want to keep
/// the default merge strategy as [OverwriteWithLatest] to fulfill the
/// snapshot read semantics out-of-the-box. This would conflict with
/// having no precombine field.
///
/// TODO: We should derive merge strategy dynamically if not set by user.
pub fn validate_configs(hudi_configs: &HudiConfigs) -> ConfigResult<()> {
let merge_strategy = hudi_configs
.get_or_default(RecordMergeStrategy)
Expand Down Expand Up @@ -93,8 +84,6 @@ impl RecordMerger {
schema: &SchemaRef,
batches: &[RecordBatch],
) -> Result<RecordBatch> {
Self::validate_configs(&self.hudi_configs)?;

if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::config::table::HudiTableConfig::{
use crate::config::util::{parse_data_for_options, split_hudi_options_from_others};
use crate::config::{HudiConfigs, HUDI_CONF_DIR};
use crate::error::CoreError;
use crate::merge::record_merger::RecordMerger;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::Table;
Expand Down Expand Up @@ -276,6 +277,8 @@ impl TableBuilder {
)));
}

RecordMerger::validate_configs(hudi_configs)?;

Ok(())
}

Expand Down
24 changes: 23 additions & 1 deletion crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ mod tests {

#[tokio::test]
async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
for base_url in &[SampleTable::V6SimplekeygenNonhivestyle.url_to_mor()] {
for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let commit_timestamps = hudi_table
.timeline
Expand All @@ -1038,6 +1038,28 @@ mod tests {
}
Ok(())
}

#[tokio::test]
async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
for base_url in SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let records = hudi_table.read_snapshot(&[]).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;

let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![
(1, "Alice", false),
(2, "Bob", false),
(3, "Carol", true),
(4, "Diana", true),
]
)
}
Ok(())
}
}

mod test_incremental_queries {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ CREATE TABLE v6_complexkeygen_hivestyle (
shortField SHORT
)
USING HUDI
LOCATION '/opt/data/external_tables/v6_complexkeygen_hivestyle'
TBLPROPERTIES (
type = 'mor',
primaryKey = 'id,name',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

CREATE TABLE v6_simplekeygen_hivestyle_no_metafields (
id INT,
name STRING,
isActive BOOLEAN,
shortField SHORT,
intField INT,
longField LONG,
floatField FLOAT,
doubleField DOUBLE,
decimalField DECIMAL(10,5),
dateField DATE,
timestampField TIMESTAMP,
binaryField BINARY,
arrayField ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>, -- Array of structs
mapField MAP<STRING, STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>, -- Map with struct values
structField STRUCT<
field1: STRING,
field2: INT,
child_struct: STRUCT<
child_field1: DOUBLE,
child_field2: BOOLEAN
>
>,
byteField BYTE
)
USING HUDI
TBLPROPERTIES (
type = 'mor',
primaryKey = 'id',
preCombineField = 'longField',
'hoodie.metadata.enable' = 'false',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hoodie.datasource.write.drop.partition.columns' = 'false',
'hoodie.populate.meta.fields' = 'false',
'hoodie.table.log.file.format' = 'PARQUET',
'hoodie.logfile.data.block.format' = 'parquet',
'hoodie.datasource.write.record.merger.impls' = 'org.apache.hudi.HoodieSparkRecordMerger',
'hoodie.parquet.small.file.limit' = '0'
)
PARTITIONED BY (byteField);

INSERT INTO v6_simplekeygen_hivestyle_no_metafields VALUES
(1, 'Alice', false, 300, 15000, 1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
ARRAY(STRUCT('red', 100), STRUCT('blue', 200), STRUCT('green', 300)),
MAP('key1', STRUCT(123.456, true), 'key2', STRUCT(789.012, false)),
STRUCT('Alice', 30, STRUCT(123.456, true)),
10
),
(2, 'Bob', false, 100, 25000, 9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
ARRAY(STRUCT('yellow', 400), STRUCT('purple', 500)),
MAP('key3', STRUCT(234.567, true), 'key4', STRUCT(567.890, false)),
STRUCT('Bob', 40, STRUCT(789.012, false)),
20
),
(3, 'Carol', true, 200, 35000, 1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE), CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS BINARY),
ARRAY(STRUCT('black', 600), STRUCT('white', 700), STRUCT('pink', 800)),
MAP('key5', STRUCT(345.678, true), 'key6', STRUCT(654.321, false)),
STRUCT('Carol', 25, STRUCT(456.789, true)),
10
),
(4, 'Diana', true, 500, 45000, 987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
ARRAY(STRUCT('orange', 900), STRUCT('gray', 1000)),
MAP('key7', STRUCT(456.789, true), 'key8', STRUCT(123.456, false)),
STRUCT('Diana', 50, STRUCT(987.654, true)),
30
);
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ CREATE TABLE v6_simplekeygen_nonhivestyle (
byteField BYTE
)
USING HUDI
LOCATION '/opt/data/external_tables/v6_simplekeygen_nonhivestyle'
TBLPROPERTIES (
type = 'mor',
primaryKey = 'id',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ CREATE TABLE v6_simplekeygen_nonhivestyle_overwritetable (
byteField BYTE
)
USING HUDI
LOCATION '/opt/data/external_tables/v6_simplekeygen_nonhivestyle_overwritetable'
TBLPROPERTIES (
type = 'mor',
primaryKey = 'id',
Expand Down

0 comments on commit 39d641b

Please sign in to comment.