Skip to content

Commit

Permalink
test: add benchmark for tonbo on s3
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould authored and ethe committed Nov 19, 2024
1 parent d10083e commit d26bd9f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 5 deletions.
92 changes: 92 additions & 0 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use async_stream::stream;
use fusio_dispatch::FsOptions;
use futures_core::Stream;
use futures_util::StreamExt;
use parquet::data_type::AsBytes;
Expand Down Expand Up @@ -184,6 +185,97 @@ pub trait BenchReader {
) -> impl Stream<Item = ProjectionResult> + 'a;
}

pub struct TonboS3BenchDataBase {
db: tonbo::DB<Customer, TokioExecutor>,
}

impl TonboS3BenchDataBase {
#[allow(dead_code)]
pub fn new(db: tonbo::DB<Customer, TokioExecutor>) -> Self {
TonboS3BenchDataBase { db }
}
}

impl BenchDatabase for TonboS3BenchDataBase {
type W<'db>
= TonboBenchWriteTransaction<'db>
where
Self: 'db;
type R<'db>
= TonboBenchReadTransaction<'db>
where
Self: 'db;

fn db_type_name() -> &'static str {
"tonbo on s3"
}

async fn write_transaction(&self) -> Self::W<'_> {
TonboBenchWriteTransaction {
txn: self.db.transaction().await,
}
}

async fn read_transaction(&self) -> Self::R<'_> {
TonboBenchReadTransaction {
txn: self.db.transaction().await,
}
}

async fn build(path: impl AsRef<Path>) -> Self {
create_dir_all(path.as_ref()).await.unwrap();

let fs_options = FsOptions::S3 {
bucket: "data".to_string(),
credential: Some(fusio::remotes::aws::credential::AwsCredential {
key_id: "user".to_string(),
secret_key: "password".to_string(),
token: None,
}),
endpoint: Some("http://localhost:9000".to_string()),
sign_payload: None,
checksum: None,
region: None,
};

let path = fusio::path::Path::from_filesystem_path(path.as_ref()).unwrap();
let option = DbOption::from(path.clone())
.level_path(
0,
fusio::path::Path::from_url_path("/l0").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
1,
fusio::path::Path::from_url_path("/l1").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
2,
fusio::path::Path::from_url_path("/l2").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
3,
fusio::path::Path::from_url_path("/l3").unwrap(),
fs_options.clone(),
)
.unwrap()
.level_path(
4,
fusio::path::Path::from_url_path("/l4").unwrap(),
fs_options.clone(),
)
.unwrap()
.disable_wal();

TonboS3BenchDataBase::new(tonbo::DB::new(option, TokioExecutor::new()).await.unwrap())
}
}

pub struct TonboBenchDataBase {
db: tonbo::DB<Customer, TokioExecutor>,
}
Expand Down
14 changes: 11 additions & 3 deletions benches/read_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use tokio::{fs, io::AsyncWriteExt};

use crate::common::{
read_tbl, BenchDatabase, BenchReadTransaction, BenchReader, RedbBenchDatabase,
RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, ITERATIONS, NUM_SCAN, READ_TIMES,
RocksdbBenchDatabase, SledBenchDatabase, TonboBenchDataBase, TonboS3BenchDataBase, ITERATIONS,
NUM_SCAN, READ_TIMES,
};

async fn benchmark<T: BenchDatabase + Send + Sync>(
Expand Down Expand Up @@ -152,26 +153,33 @@ async fn main() {

load::<TonboBenchDataBase>(&tbl_path, data_dir.join("tonbo")).await;
load::<RocksdbBenchDatabase>(&tbl_path, data_dir.join("rocksdb")).await;
load::<TonboS3BenchDataBase>(&tbl_path, data_dir.join("tonbo_s3")).await;
}

let tonbo_latency_results = { benchmark::<TonboBenchDataBase>(data_dir.join("tonbo")).await };
let rocksdb_results = { benchmark::<RocksdbBenchDatabase>(data_dir.join("rocksdb")).await };
let tonbo_s3_latency_results =
{ benchmark::<TonboS3BenchDataBase>(data_dir.join("tonbo_s3")).await };

let mut rows: Vec<Vec<String>> = Vec::new();

for (benchmark, _duration) in &tonbo_latency_results {
rows.push(vec![benchmark.to_string()]);
}

for results in [tonbo_latency_results, rocksdb_results] {
for results in [
tonbo_latency_results,
rocksdb_results,
tonbo_s3_latency_results,
] {
for (i, (_benchmark, duration)) in results.iter().enumerate() {
rows[i].push(format!("{}ms", duration.as_millis()));
}
}

let mut table = comfy_table::Table::new();
table.set_width(100);
table.set_header(["", "tonbo", "rocksdb"]);
table.set_header(["", "tonbo", "rocksdb", "tonbo_s3"]);
for row in rows {
table.add_row(row);
}
Expand Down
12 changes: 10 additions & 2 deletions benches/write_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ async fn main() {
let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap();
benchmark::<RocksdbBenchDatabase>(tmp_file.path()).await
};
let tonbo_s3_latency_results = {
let tmp_file: TempDir = tempfile::tempdir_in(&tmpdir).unwrap();
benchmark::<TonboS3BenchDataBase>(tmp_file.path()).await
};

let _ = fs::remove_dir_all(&tmpdir);

Expand All @@ -210,15 +214,19 @@ async fn main() {
rows.push(vec![benchmark.to_string()]);
}

for results in [tonbo_latency_results, rocksdb_results] {
for results in [
tonbo_latency_results,
rocksdb_results,
tonbo_s3_latency_results,
] {
for (i, (_benchmark, duration)) in results.iter().enumerate() {
rows[i].push(format!("{}ms", duration.as_millis()));
}
}

let mut table = comfy_table::Table::new();
table.set_width(100);
table.set_header(["", "tonbo", "rocksdb"]);
table.set_header(["", "tonbo", "rocksdb", "tonbo_s3"]);
for row in rows {
table.add_row(row);
}
Expand Down

0 comments on commit d26bd9f

Please sign in to comment.