Skip to content

Commit

Permalink
feat: performance testing harness and perf tests for scan file plan a…
Browse files Browse the repository at this point in the history
…nd execute
  • Loading branch information
sdd committed Aug 9, 2024
1 parent 80c1399 commit 0c2a071
Show file tree
Hide file tree
Showing 12 changed files with 666 additions and 6 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ Cargo.lock
.vscode
**/.DS_Store
dist/*

**/venv
*.so
*.pyc

crates/iceberg/testdata/performance/raw_data/*
crates/iceberg/testdata/performance/warehouse
11 changes: 11 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,19 @@ url = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
criterion = { version = "0.3", features = ["async_tokio", "async_futures"] }
ctor = { workspace = true }
futures-util = "0.3"
iceberg-catalog-rest = { path = "../catalog/rest" }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }

[[bench]]
name = "table_scan_plan_files"
harness = false

[[bench]]
name = "table_scan_execute_query"
harness = false
122 changes: 122 additions & 0 deletions crates/iceberg/benches/table_scan_execute_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::*;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use tokio::runtime::Runtime;

mod utils;
use utils::{create_file_plan, create_task_stream, exec_plan, setup};

pub fn bench_read_all_files_all_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
let scan = table.scan().build().unwrap();
let tasks = create_file_plan(&runtime, scan);

c.bench_function("scan: read (all files, all rows)", |b| {
b.to_async(&runtime).iter_batched(
|| create_task_stream(tasks.clone()),
|plan| exec_plan(table.clone(), plan),
BatchSize::SmallInput,
)
});
}

pub fn bench_read_all_files_some_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
let scan = table
.scan()
.with_filter(Reference::new("passenger_count").equal_to(Datum::double(1.0)))
.build()
.unwrap();
let tasks = create_file_plan(&runtime, scan);

c.bench_function("scan: read (all files, some rows)", |b| {
b.to_async(&runtime).iter_batched(
|| create_task_stream(tasks.clone()),
|plan| exec_plan(table.clone(), plan),
BatchSize::SmallInput,
)
});
}

pub fn bench_read_some_files_all_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
let scan = table
.scan()
.with_filter(
Reference::new("tpep_pickup_datetime")
.greater_than_or_equal_to(
Datum::timestamptz_from_str("2024-02-01T00:00:00.000 UTC").unwrap(),
)
.and(Reference::new("tpep_pickup_datetime").less_than(
Datum::timestamptz_from_str("2024-02-02T00:00:00.000 UTC").unwrap(),
)),
)
.build()
.unwrap();
let tasks = create_file_plan(&runtime, scan);

c.bench_function("scan: read (some files, all rows)", |b| {
b.to_async(&runtime).iter_batched(
|| create_task_stream(tasks.clone()),
|plan| exec_plan(table.clone(), plan),
BatchSize::SmallInput,
)
});
}

pub fn bench_read_some_files_some_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
let scan =
table
.scan()
.with_filter(
Reference::new("tpep_pickup_datetime")
.greater_than_or_equal_to(
Datum::timestamptz_from_str("2024-02-01T00:00:00.000 UTC").unwrap(),
)
.and(Reference::new("tpep_pickup_datetime").less_than(
Datum::timestamptz_from_str("2024-02-02T00:00:00.000 UTC").unwrap(),
))
.and(Reference::new("passenger_count").equal_to(Datum::double(1.0))),
)
.build()
.unwrap();
let tasks = create_file_plan(&runtime, scan);

c.bench_function("scan: read (some files, some rows)", |b| {
b.to_async(&runtime).iter_batched(
|| create_task_stream(tasks.clone()),
|plan| exec_plan(table.clone(), plan),
BatchSize::SmallInput,
)
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = bench_read_some_files_some_rows, bench_read_some_files_all_rows, bench_read_all_files_some_rows, bench_read_all_files_all_rows
}

criterion_main!(benches);
139 changes: 139 additions & 0 deletions crates/iceberg/benches/table_scan_plan_files.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::*;
use futures_util::StreamExt;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::table::Table;
use tokio::runtime::Runtime;
mod utils;
use utils::setup;

async fn all_files_all_rows(table: &Table) {
let scan = table.scan().build().unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

async fn one_file_all_rows(table: &Table) {
let scan = table
.scan()
.with_filter(
Reference::new("tpep_pickup_datetime")
.greater_than_or_equal_to(
Datum::timestamptz_from_str("2024-02-01T00:00:00.000 UTC").unwrap(),
)
.and(Reference::new("tpep_pickup_datetime").less_than(
Datum::timestamptz_from_str("2024-02-02T00:00:00.000 UTC").unwrap(),
)),
)
.build()
.unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

async fn all_files_some_rows(table: &Table) {
let scan = table
.scan()
.with_filter(Reference::new("passenger_count").equal_to(Datum::double(1.0)))
.build()
.unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

async fn one_file_some_rows(table: &Table) {
let scan =
table
.scan()
.with_filter(
Reference::new("tpep_pickup_datetime")
.greater_than_or_equal_to(
Datum::timestamptz_from_str("2024-02-01T00:00:00.000 UTC").unwrap(),
)
.and(Reference::new("tpep_pickup_datetime").less_than(
Datum::timestamptz_from_str("2024-02-02T00:00:00.000 UTC").unwrap(),
))
.and(Reference::new("passenger_count").equal_to(Datum::double(1.0))),
)
.build()
.unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

pub fn bench_all_files_all_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("scan: plan (all files, all rows)", |b| {
b.to_async(&runtime).iter(|| all_files_all_rows(&table))
});
}

pub fn bench_one_file_all_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("scan: plan (one file, all rows)", |b| {
b.to_async(&runtime).iter(|| one_file_all_rows(&table))
});
}

pub fn bench_all_files_some_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("scan: plan (all files, some rows)", |b| {
b.to_async(&runtime).iter(|| all_files_some_rows(&table))
});
}

pub fn bench_one_file_some_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("scan: plan (one file, some rows)", |b| {
b.to_async(&runtime).iter(|| one_file_some_rows(&table))
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = bench_all_files_all_rows, bench_all_files_some_rows, bench_one_file_all_rows, bench_one_file_some_rows
}

criterion_main!(benches);
Loading

0 comments on commit 0c2a071

Please sign in to comment.