Skip to content

Commit

Permalink
test: add test cases for different table setup (#36)
Browse files Browse the repository at this point in the history
- run test cases against datafusion APIs
  - v6_nonpartitioned
  - v6_simplekeygen_hivestyle_no_metafields
  - v6_simplekeygen_nonhivestyle
  - v6_timebasedkeygen_nonhivestyle
- organize test data and improve test utils
  • Loading branch information
xushiyan authored Jul 4, 2024
1 parent e8fde26 commit 6080d4d
Show file tree
Hide file tree
Showing 28 changed files with 178 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ header:
paths-ignore:
- 'LICENSE'
- 'NOTICE'
- '**/fixtures/**'
- '**/data/**'

comment: on-failure

Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ serde_json = "1"
anyhow = { version = "1.0.86" }
bytes = { version = "1" }
chrono = { version = "=0.4.34", default-features = false, features = ["clock"] }
strum = { version = "0.26.3", features = ["derive"] }
strum_macros = "0.26.4"
tracing = { version = "0.1", features = ["log"] }
regex = { version = "1" }
url = { version = "2" }
Expand Down
14 changes: 7 additions & 7 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ mod tests {
#[tokio::test]
async fn storage_list_dirs() {
let base_url = Url::from_directory_path(
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
Expand All @@ -235,7 +235,7 @@ mod tests {
#[tokio::test]
async fn storage_list_dirs_as_paths() {
let base_url = Url::from_directory_path(
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
Expand All @@ -258,7 +258,7 @@ mod tests {
#[tokio::test]
async fn storage_list_files() {
let base_url = Url::from_directory_path(
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
Expand Down Expand Up @@ -317,7 +317,7 @@ mod tests {
#[tokio::test]
async fn use_storage_to_get_leaf_dirs() {
let base_url = Url::from_directory_path(
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
Expand All @@ -331,7 +331,7 @@ mod tests {
#[tokio::test]
async fn use_storage_to_get_leaf_dirs_for_leaf_dir() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures/leaf_dir")).unwrap())
Url::from_directory_path(canonicalize(Path::new("tests/data/leaf_dir")).unwrap())
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
Expand All @@ -345,7 +345,7 @@ mod tests {
#[tokio::test]
async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_info = storage.get_file_info("a.parquet").await.unwrap();
assert_eq!(file_info.name, "a.parquet");
Expand All @@ -359,7 +359,7 @@ mod tests {
#[tokio::test]
async fn storage_get_parquet_file_data() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_data = storage.get_parquet_file_data("a.parquet").await.unwrap();
assert_eq!(file_data.num_rows(), 5);
Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Table {

let props = Self::load_properties(base_url.clone(), storage_options.clone())
.await
.context("Failed to create a table")?;
.context("Failed to load table properties")?;

let props = Arc::new(props);
let timeline = Timeline::new(base_url.clone(), storage_options.clone(), props.clone())
Expand Down Expand Up @@ -412,8 +412,10 @@ mod tests {

#[tokio::test]
async fn hudi_table_get_table_metadata() {
let base_path =
canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap();
let base_path = canonicalize(Path::new(
"tests/data/table_metadata/sample_table_properties",
))
.unwrap();
let table = Table::new(base_path.to_str().unwrap(), HashMap::new())
.await
.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/table/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,10 @@ mod tests {

#[tokio::test]
async fn init_commits_timeline() {
let base_url =
Url::from_file_path(canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap())
.unwrap();
let base_url = Url::from_file_path(
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
let timeline = Timeline::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 3 additions & 1 deletion crates/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ rust-version.workspace = true

[dependencies]
hudi-core = { path = "../core" }
hudi-tests = { path = "../tests" }
# arrow
arrow = { workspace = true }
arrow-arith = { workspace = true }
Expand Down Expand Up @@ -67,3 +66,6 @@ chrono = { workspace = true, default-features = false, features = ["clock"] }
regex = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }

[dev-dependencies]
hudi-tests = { path = "../tests" }
136 changes: 73 additions & 63 deletions crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,92 +133,102 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{DataFusionError, Result, ScalarValue};

use hudi_core::config::HudiConfig;
use hudi_tests::TestTable;
use datafusion_common::ScalarValue;

use hudi_core::config::HudiConfig::ReadInputPartitions;
use hudi_tests::utils::get_bool_column;
use hudi_tests::TestTable::{
V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields, V6SimplekeygenNonhivestyle,
V6TimebasedkeygenNonhivestyle,
};
use hudi_tests::{utils, TestTable};
use utils::{get_i32_column, get_str_column};
use TestTable::V6ComplexkeygenHivestyle;

use crate::HudiDataSource;

#[tokio::test]
async fn datafusion_read_hudi_table() -> Result<(), DataFusionError> {
async fn prepare_session_context(
test_table: &TestTable,
options: &[(String, String)],
) -> SessionContext {
let config = SessionConfig::new().set(
"datafusion.sql_parser.enable_ident_normalization",
ScalarValue::from(false),
);
let ctx = SessionContext::new_with_config(config);
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi = HudiDataSource::new(
base_url.as_str(),
HashMap::from([(
HudiConfig::ReadInputPartitions.as_ref().to_string(),
"2".to_string(),
)]),
)
.await?;
ctx.register_table("hudi_table_complexkeygen", Arc::new(hudi))?;
let sql = r#"
SELECT _hoodie_file_name, id, name, structField.field2
FROM hudi_table_complexkeygen WHERE id % 2 = 0
AND structField.field2 > 30 ORDER BY name LIMIT 10"#;

// verify plan
let explaining_df = ctx.sql(sql).await?.explain(false, true).unwrap();
let explaining_rb = explaining_df.collect().await?;
let base_url = test_table.url();
let options = HashMap::from_iter(options.iter().cloned().collect::<HashMap<_, _>>());
let hudi = HudiDataSource::new(base_url.as_str(), options)
.await
.unwrap();
ctx.register_table(test_table.as_ref(), Arc::new(hudi))
.unwrap();
ctx
}

async fn verify_plan(
ctx: &SessionContext,
sql: &str,
table_name: &str,
planned_input_partitioned: &i32,
) {
let explaining_df = ctx.sql(sql).await.unwrap().explain(false, true).unwrap();
let explaining_rb = explaining_df.collect().await.unwrap();
let explaining_rb = explaining_rb.first().unwrap();
let plan = get_str_column(explaining_rb, "plan").join("");
let plan_lines: Vec<&str> = plan.lines().map(str::trim).collect();
assert!(plan_lines[2].starts_with("SortExec: TopK(fetch=10)"));
assert!(plan_lines[3].starts_with("ProjectionExec: expr=[_hoodie_file_name@0 as _hoodie_file_name, id@1 as id, name@2 as name, get_field(structField@3, field2) as hudi_table_complexkeygen.structField[field2]]"));
assert!(plan_lines[3].starts_with(&format!(
"ProjectionExec: expr=[id@0 as id, name@1 as name, isActive@2 as isActive, \
get_field(structField@3, field2) as {}.structField[field2]]",
table_name
)));
assert!(plan_lines[5].starts_with(
"FilterExec: CAST(id@1 AS Int64) % 2 = 0 AND get_field(structField@3, field2) > 30"
"FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND get_field(structField@3, field2) > 30"
));
assert!(plan_lines[6].contains("input_partitions=2"));
assert!(plan_lines[6].contains(&format!("input_partitions={}", planned_input_partitioned)));
}

// verify data
let df = ctx.sql(sql).await?;
let rb = df.collect().await?;
async fn verify_data(ctx: &SessionContext, sql: &str, table_name: &str) {
let df = ctx.sql(sql).await.unwrap();
let rb = df.collect().await.unwrap();
let rb = rb.first().unwrap();
assert_eq!(
get_str_column(rb, "_hoodie_file_name"),
&[
"bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
"4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet"
]
);
assert_eq!(get_i32_column(rb, "id"), &[2, 4]);
assert_eq!(get_str_column(rb, "name"), &["Bob", "Diana"]);
assert_eq!(get_bool_column(rb, "isActive"), &[false, true]);
assert_eq!(
get_i32_column(rb, "hudi_table_complexkeygen.structField[field2]"),
get_i32_column(rb, &format!("{}.structField[field2]", table_name)),
&[40, 50]
);

Ok(())
}

fn get_str_column<'a>(record_batch: &'a RecordBatch, name: &str) -> Vec<&'a str> {
record_batch
.column_by_name(name)
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>()
}

fn get_i32_column(record_batch: &RecordBatch, name: &str) -> Vec<i32> {
record_batch
.column_by_name(name)
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>()
#[tokio::test]
async fn datafusion_read_hudi_table() {
for (test_table, planned_input_partitions) in &[
(V6ComplexkeygenHivestyle, 2),
(V6Nonpartitioned, 1),
(V6SimplekeygenNonhivestyle, 2),
(V6SimplekeygenHivestyleNoMetafields, 2),
(V6TimebasedkeygenNonhivestyle, 2),
] {
println!(">>> testing for {}", test_table.as_ref());
let ctx = prepare_session_context(
test_table,
&[(ReadInputPartitions.as_ref().to_string(), "2".to_string())],
)
.await;

let sql = format!(
r#"
SELECT id, name, isActive, structField.field2
FROM {} WHERE id % 2 = 0
AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
test_table.as_ref()
);

verify_plan(&ctx, &sql, test_table.as_ref(), planned_input_partitions).await;
verify_data(&ctx, &sql, test_table.as_ref()).await
}
}
}
13 changes: 13 additions & 0 deletions crates/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ license.workspace = true
rust-version.workspace = true

[dependencies]
arrow = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true, features = ["chrono-tz"] }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ipc = { workspace = true }
arrow-json = { workspace = true }
arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
tempfile = "3.10.1"
zip-extract = "0.1.3"
url = { workspace = true }
Loading

0 comments on commit 6080d4d

Please sign in to comment.