Skip to content

Commit

Permalink
add runtime bench
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Dec 9, 2024
1 parent cdd79b9 commit 5d72e75
Show file tree
Hide file tree
Showing 7 changed files with 465 additions and 371 deletions.
6 changes: 5 additions & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ name = "bench_state_table"
harness = false

[[bench]]
name = "stream_hash_join"
name = "stream_hash_join_rt"
harness = false

[[bench]]
name = "stream_hash_join_mem"
harness = false

[lints]
Expand Down
3 changes: 1 addition & 2 deletions src/stream/benches/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::executor::block_on;
use futures::StreamExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::field_generator::VarcharProperty;
Expand Down
207 changes: 0 additions & 207 deletions src/stream/benches/stream_hash_join.rs

This file was deleted.

55 changes: 55 additions & 0 deletions src/stream/benches/stream_hash_join_mem.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

//! To run this benchmark you can use the following command:
//! ```sh
//! cargo bench --features dhat-heap --bench stream_hash_join
//! ```
//!
//! You may also specify the amplification size, e.g. 40000
//! ```sh
//! cargo bench --features dhat-heap --bench stream_hash_join -- 40000
//! ```
use std::env;

use risingwave_stream::executor::test_utils::hash_join_executor::*;

risingwave_expr_impl::enable!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[tokio::main]
async fn main() {
let args: Vec<_> = env::args().collect();
let amp = if let Some(raw_arg) = args.get(1)
&& let Ok(arg) = raw_arg.parse()
{
arg
} else {
100_000
};
let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp).await;
{
// Start the profiler later, after we have ingested the data for hash join build-side.
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

handle_streams(amp, tx_l, tx_r, out).await;
}
}
47 changes: 47 additions & 0 deletions src/stream/benches/stream_hash_join_rt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

//! To run this benchmark you can use the following command:
//! ```sh
//! cargo bench --bench stream_hash_join_rt
//! ```
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use futures::executor::block_on;
use risingwave_stream::executor::test_utils::hash_join_executor::*;
use tokio::runtime::Runtime;

risingwave_expr_impl::enable!();

fn bench_hash_join(c: &mut Criterion) {
let mut group = c.benchmark_group("benchmark_hash_join");
group.sample_size(10);

let rt = Runtime::new().unwrap();
for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000] {
let name = format!("hash_join_rt_{}", amp);
group.bench_function(&name, |b| {
b.to_async(&rt).iter_batched(
|| block_on(setup_bench_stream_hash_join(amp)),
|(tx_l, tx_r, out)| handle_streams(amp, tx_l, tx_r, out),
BatchSize::SmallInput,
)
});
}
}

criterion_group!(benches, bench_hash_join);
criterion_main!(benches);
Loading

0 comments on commit 5d72e75

Please sign in to comment.