Skip to content

Commit

Permalink
fix: handle replacecommit for loading file slices (#53)
Browse files Browse the repository at this point in the history
Filter out file slices that belong to replaced file groups. 

Also add a full table test case `v6_simplekeygen_nonhivestyle_overwritetable`.
  • Loading branch information
xushiyan authored Jul 7, 2024
1 parent 4fa2440 commit 9920d5c
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 28 deletions.
16 changes: 16 additions & 0 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -116,6 +117,21 @@ pub struct FileGroup {
pub file_slices: BTreeMap<String, FileSlice>,
}

impl PartialEq for FileGroup {
fn eq(&self, other: &Self) -> bool {
self.id == other.id && self.partition_path == other.partition_path
}
}

impl Eq for FileGroup {}

impl Hash for FileGroup {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
self.partition_path.hash(state);
}
}

impl fmt::Display for FileGroup {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str(
Expand Down
25 changes: 21 additions & 4 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -121,11 +121,18 @@ impl FileSystemView {
Ok(file_groups)
}

pub fn get_file_slices_as_of(&self, timestamp: &str) -> Result<Vec<FileSlice>> {
pub fn get_file_slices_as_of(
&self,
timestamp: &str,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.iter() {
let fgs_ref = fgs.value();
for fg in fgs_ref {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
// TODO: pass ref instead of copying
file_slices.push(fsl.clone());
Expand All @@ -135,10 +142,17 @@ impl FileSystemView {
Ok(file_slices)
}

pub async fn load_file_slices_stats_as_of(&self, timestamp: &str) -> Result<()> {
pub async fn load_file_slices_stats_as_of(
&self,
timestamp: &str,
exclude_file_groups: &HashSet<FileGroup>,
) -> Result<()> {
for mut fgs in self.partition_to_file_groups.iter_mut() {
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
if exclude_file_groups.contains(fg) {
continue;
}
if let Some(file_slice) = fg.get_file_slice_mut_as_of(timestamp) {
file_slice
.load_stats(&self.storage)
Expand Down Expand Up @@ -215,7 +229,10 @@ mod tests {
.await
.unwrap();

let file_slices = fs_view.get_file_slices_as_of("20240418173551906").unwrap();
let excludes = HashSet::new();
let file_slices = fs_view
.get_file_slices_as_of("20240418173551906", &excludes)
.unwrap();
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,13 @@ impl Table {
}

async fn get_file_slices_as_of(&self, timestamp: &str) -> Result<Vec<FileSlice>> {
let excludes = self.timeline.get_replaced_file_groups().await?;
self.file_system_view
.load_file_slices_stats_as_of(timestamp)
.load_file_slices_stats_as_of(timestamp, &excludes)
.await
.context("Fail to load file slice stats.")?;
self.file_system_view.get_file_slices_as_of(timestamp)
self.file_system_view
.get_file_slices_as_of(timestamp, &excludes)
}

pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
Expand Down
84 changes: 63 additions & 21 deletions crates/core/src/table/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* under the License.
*/

use std::cmp::Ordering;
use std::collections::HashMap;
use std::cmp::{Ordering, PartialOrd};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -30,6 +30,7 @@ use serde_json::{Map, Value};
use url::Url;

use crate::config::HudiConfigs;
use crate::file_group::FileGroup;
use crate::storage::utils::split_filename;
use crate::storage::Storage;

Expand Down Expand Up @@ -72,6 +73,19 @@ impl Instant {
pub fn file_name(&self) -> String {
format!("{}.{}{}", self.timestamp, self.action, self.state_suffix())
}

pub fn relative_path(&self) -> Result<String> {
let mut commit_file_path = PathBuf::from(".hoodie");
commit_file_path.push(self.file_name());
commit_file_path
.to_str()
.ok_or(anyhow!("Failed to get file path for {:?}", self))
.map(|s| s.to_string())
}

pub fn is_replacecommit(&self) -> bool {
self.action == "replacecommit"
}
}

#[derive(Clone, Debug)]
Expand All @@ -89,23 +103,23 @@ impl Timeline {
configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, &storage_options)?;
let instants = Self::load_completed_commit_instants(&storage).await?;
let instants = Self::load_completed_commits(&storage).await?;
Ok(Self {
storage,
configs,
instants,
})
}

async fn load_completed_commit_instants(storage: &Storage) -> Result<Vec<Instant>> {
async fn load_completed_commits(storage: &Storage) -> Result<Vec<Instant>> {
let mut completed_commits = Vec::new();
for file_info in storage.list_files(Some(".hoodie")).await? {
let (file_stem, file_ext) = split_filename(file_info.name.as_str())?;
if file_ext == "commit" {
if matches!(file_ext.as_str(), "commit" | "replacecommit") {
completed_commits.push(Instant {
state: State::Completed,
timestamp: file_stem,
action: "commit".to_owned(),
action: file_ext.to_owned(),
})
}
}
Expand All @@ -120,23 +134,22 @@ impl Timeline {
.map(|instant| instant.timestamp.as_str())
}

async fn get_commit_metadata(&self, instant: &Instant) -> Result<Map<String, Value>> {
let bytes = self
.storage
.get_file_data(instant.relative_path()?.as_str())
.await?;
let json: Value = serde_json::from_slice(&bytes)?;
let commit_metadata = json
.as_object()
.ok_or_else(|| anyhow!("Expected JSON object"))?
.clone();
Ok(commit_metadata)
}

async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
match self.instants.iter().next_back() {
Some(instant) => {
let mut commit_file_path = PathBuf::from(".hoodie");
commit_file_path.push(instant.file_name());
let relative_path = commit_file_path.to_str().ok_or(anyhow!(
"Failed to get commit file path for instant: {:?}",
instant
))?;
let bytes = self.storage.get_file_data(relative_path).await?;
let json: Value = serde_json::from_slice(&bytes)?;
let commit_metadata = json
.as_object()
.ok_or_else(|| anyhow!("Expected JSON object"))?
.clone();
Ok(commit_metadata)
}
Some(instant) => self.get_commit_metadata(instant).await,
None => Ok(Map::new()),
}
}
Expand Down Expand Up @@ -167,6 +180,35 @@ impl Timeline {
))
}
}

pub async fn get_replaced_file_groups(&self) -> Result<HashSet<FileGroup>> {
let mut fgs: HashSet<FileGroup> = HashSet::new();
for instant in self.instants.iter().filter(|i| i.is_replacecommit()) {
let commit_metadata = self.get_commit_metadata(instant).await?;
if let Some(ptn_to_replaced) = commit_metadata.get("partitionToReplaceFileIds") {
for (ptn, fg_ids) in ptn_to_replaced
.as_object()
.expect("partitionToReplaceFileIds should be a map")
{
let fg_ids = fg_ids
.as_array()
.expect("file group ids should be an array")
.iter()
.map(|fg_id| fg_id.as_str().expect("file group id should be a string"));

let ptn = Some(ptn.to_string()).filter(|s| !s.is_empty());

for fg_id in fg_ids {
fgs.insert(FileGroup::new(fg_id.to_string(), ptn.clone()));
}
}
}
}

// TODO: return file group and instants, and handle multi-writer fg id conflicts

Ok(fgs)
}
}

#[cfg(test)]
Expand Down
41 changes: 40 additions & 1 deletion crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ mod tests {
use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_tests::TestTable::{
V6ComplexkeygenHivestyle, V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle, V6TimebasedkeygenNonhivestyle,
V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
V6TimebasedkeygenNonhivestyle,
};
use hudi_tests::{utils, TestTable};
use utils::{get_bool_column, get_i32_column, get_str_column};
Expand Down Expand Up @@ -228,4 +229,42 @@ mod tests {
verify_data(&ctx, &sql, test_table.as_ref()).await
}
}

async fn verify_data_with_replacecommits(ctx: &SessionContext, sql: &str, table_name: &str) {
let df = ctx.sql(sql).await.unwrap();
let rb = df.collect().await.unwrap();
let rb = rb.first().unwrap();
assert_eq!(get_i32_column(rb, "id"), &[4]);
assert_eq!(get_str_column(rb, "name"), &["Diana"]);
assert_eq!(get_bool_column(rb, "isActive"), &[false]);
assert_eq!(
get_i32_column(rb, &format!("{}.structField[field2]", table_name)),
&[50]
);
}

#[tokio::test]
async fn datafusion_read_hudi_table_with_replacecommits() {
for (test_table, planned_input_partitions) in
&[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
{
println!(">>> testing for {}", test_table.as_ref());
let ctx = prepare_session_context(
test_table,
&[(InputPartitions.as_ref().to_string(), "2".to_string())],
)
.await;

let sql = format!(
r#"
SELECT id, name, isActive, structField.field2
FROM {} WHERE id % 2 = 0
AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
test_table.as_ref()
);

verify_plan(&ctx, &sql, test_table.as_ref(), planned_input_partitions).await;
verify_data_with_replacecommits(&ctx, &sql, test_table.as_ref()).await
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

CREATE TABLE v6_simplekeygen_nonhivestyle_overwritetable (
id INT,
name STRING,
isActive BOOLEAN,
shortField SHORT,
intField INT,
longField LONG,
floatField FLOAT,
doubleField DOUBLE,
decimalField DECIMAL(10,5),
dateField DATE,
timestampField TIMESTAMP,
binaryField BINARY,
arrayField ARRAY<STRUCT<arr_struct_f1: STRING, arr_struct_f2: INT>>, -- Array of structs
mapField MAP<STRING, STRUCT<map_field_value_struct_f1: DOUBLE, map_field_value_struct_f2: BOOLEAN>>, -- Map with struct values
structField STRUCT<
field1: STRING,
field2: INT,
child_struct: STRUCT<
child_field1: DOUBLE,
child_field2: BOOLEAN
>
>,
byteField BYTE
)
USING HUDI
TBLPROPERTIES (
type = 'cow',
primaryKey = 'id',
preCombineField = 'longField',
'hoodie.metadata.enable' = 'false',
'hoodie.datasource.write.hive_style_partitioning' = 'false',
'hoodie.datasource.write.drop.partition.columns' = 'false'
)
PARTITIONED BY (byteField);

INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
(1, 'Alice', true, 300, 15000, 1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
ARRAY(STRUCT('red', 100), STRUCT('blue', 200), STRUCT('green', 300)),
MAP('key1', STRUCT(123.456, true), 'key2', STRUCT(789.012, false)),
STRUCT('Alice', 30, STRUCT(123.456, true)),
10
),
(2, 'Bob', false, 100, 25000, 9876543210, 2.0, 2.71828, 67890.12345, CAST('2023-04-02' AS DATE), CAST('2023-04-02 13:02:00' AS TIMESTAMP), CAST('more binary data' AS BINARY),
ARRAY(STRUCT('yellow', 400), STRUCT('purple', 500)),
MAP('key3', STRUCT(234.567, true), 'key4', STRUCT(567.890, false)),
STRUCT('Bob', 40, STRUCT(789.012, false)),
20
),
(3, 'Carol', true, 200, 35000, 1928374650, 3.0, 1.41421, 11111.22222, CAST('2023-04-03' AS DATE), CAST('2023-04-03 14:03:00' AS TIMESTAMP), CAST('even more binary data' AS BINARY),
ARRAY(STRUCT('black', 600), STRUCT('white', 700), STRUCT('pink', 800)),
MAP('key5', STRUCT(345.678, true), 'key6', STRUCT(654.321, false)),
STRUCT('Carol', 25, STRUCT(456.789, true)),
10
);

INSERT INTO v6_simplekeygen_nonhivestyle_overwritetable VALUES
(1, 'Alice', false, 300, 15000, 1234567890, 1.0, 3.14159, 12345.67890, CAST('2023-04-01' AS DATE), CAST('2023-04-01 12:01:00' AS TIMESTAMP), CAST('binary data' AS BINARY),
ARRAY(STRUCT('red', 100), STRUCT('blue', 200), STRUCT('green', 300)),
MAP('key1', STRUCT(123.456, true), 'key2', STRUCT(789.012, false)),
STRUCT('Alice', 30, STRUCT(123.456, true)),
10
),
(4, 'Diana', true, 500, 45000, 987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
ARRAY(STRUCT('orange', 900), STRUCT('gray', 1000)),
MAP('key7', STRUCT(456.789, true), 'key8', STRUCT(123.456, false)),
STRUCT('Diana', 50, STRUCT(987.654, true)),
30
);

INSERT OVERWRITE TABLE v6_simplekeygen_nonhivestyle_overwritetable SELECT
4, 'Diana', false, 500, 45000, 987654321, 4.0, 2.468, 65432.12345, CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00' AS TIMESTAMP), CAST('new binary data' AS BINARY),
ARRAY(STRUCT('orange', 900), STRUCT('gray', 1000)),
MAP('key7', STRUCT(456.789, true), 'key8', STRUCT(123.456, false)),
STRUCT('Diana', 50, STRUCT(987.654, true)),
30
;
Binary file not shown.
Loading

0 comments on commit 9920d5c

Please sign in to comment.