From dee867553cdc0d3cb52998946cc5da901d179d60 Mon Sep 17 00:00:00 2001 From: Simeon Romanov Date: Tue, 31 Oct 2023 20:45:19 +0300 Subject: [PATCH 1/4] fix bench --- Cargo.toml | 8 ++- src/benchmark/bin.rs | 117 +++++++++++++++++++++++----------------- src/benchmark/writer.rs | 17 +++--- 3 files changed, 84 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ce5ad33b55..ab9ef2cb87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,10 @@ codegen-units = 1 [dependencies] # Don't update without checking for backwards compatibility!!! -ahash = "=0.7.6" +# ahash = "=0.7.6" + +# ahash = 0.7.6 release +ahash = { git = 'https://github.com/tkaitchuck/aHash/', rev = 'e77cab8c1e15bfc9f54dfd28bd8820c2a7bb27c4' } anyhow = "1.0" async-trait = "0.1" @@ -50,13 +53,14 @@ async-lock = "2.7" # Benchmark only dependencies clap = { version = "3.2", optional = true } env_logger = { version = "0.9", optional = true } +rand = { version = "0.8", optional = true } [dependencies.tokio] version = "1.28" features = ["fs", "io-util", "sync", "time", "rt", "macros", "rt-multi-thread"] [features] -benchmark = ["dep:clap", "dep:env_logger"] +benchmark = ["dep:clap", "dep:env_logger", "dep:rand"] async-io-rio = ["dep:rio"] [lib] diff --git a/src/benchmark/bin.rs b/src/benchmark/bin.rs index f4f1b97f84..6fe5888b7e 100644 --- a/src/benchmark/bin.rs +++ b/src/benchmark/bin.rs @@ -23,7 +23,7 @@ mod prelude { stream::{FuturesUnordered, StreamExt}, }; pub(crate) use log::{Level, LevelFilter}; - pub(crate) use pearl::{Builder, Key, Storage}; + pub(crate) use pearl::{BlobRecordTimestamp, Builder, Key, RefKey, Storage}; pub(crate) use rand::{rngs::ThreadRng, RngCore}; pub(crate) use std::{ io::Write, @@ -73,62 +73,63 @@ async fn start_app() { info!("Start write cycle"); let (tx, rx) = channel::(1024); - let writer = Arc::new(writer); let mut counter = 0; let futures_limit: usize = matches.value_of("futures_limit").unwrap().parse().unwrap(); let prepared = (0..futures_limit).map(|_| generator.next().unwrap()); - - let mut futures_pool: FuturesUnordered<_> = prepared - .into_iter() - .map(|(key, data)| { - let ltx = tx.clone(); - counter += 1; - writer.write(key, data, ltx) - }) - .collect(); - println!( - "{:<10}{:<10}{:<10}{:<10}{:<10}", - "Completed", "Active", "Limit", "Total", "%" - ); - let write_limit = limit * 1000 / value_size_kb; - let mut prev_p = 0; - while futures_pool.next().await.is_some() { - debug!("#{}/{} future ready", counter, futures_pool.len()); - let percent = counter * 1000 / write_limit; - if prev_p != percent { - print!( - "\r{:<10}{:<10}{:<10}{:<10}{:<10}", - counter, - futures_pool.len(), - futures_limit, - write_limit, - percent / 10 - ); - if percent % 50 == 0 { - println!(); - } - } - prev_p = percent; - if futures_pool.len() < futures_limit { - if let Some((key, data)) = generator.next() { + { + let arc_writer = Arc::new(&writer); + let mut futures_pool: FuturesUnordered<_> = prepared + .into_iter() + .map(|(key, data)| { let ltx = tx.clone(); counter += 1; - futures_pool.push(writer.write(key.into(), data, ltx)); + arc_writer.write(key, data, ltx) + }) + .collect(); + println!( + "{:<10}{:<10}{:<10}{:<10}{:<10}", + "Completed", "Active", "Limit", "Total", "%" + ); + let write_limit = limit * 1000 / value_size_kb; + let mut prev_p = 0; + while futures_pool.next().await.is_some() { + debug!("#{}/{} future ready", counter, futures_pool.len()); + let percent = counter * 1000 / write_limit; + if prev_p != percent { + print!( + "\r{:<10}{:<10}{:<10}{:<10}{:<10}", + counter, + futures_pool.len(), + futures_limit, + write_limit, + percent / 10 + ); + if percent % 50 == 0 { + println!(); + } } + prev_p = percent; + if futures_pool.len() < futures_limit { + if let Some((key, data)) = generator.next() { + let ltx = tx.clone(); + counter += 1; + futures_pool.push(arc_writer.write(key.into(), data, ltx)); + } + } + debug!("#{}/{} next await", counter, futures_pool.len()); } - debug!("#{}/{} next await", counter, futures_pool.len()); - } - info!("start await "); - let _ = rx - .take(counter as usize) - .map(|r| statistics.add(r)) - .collect::>() - .await; - info!("end await "); - statistics.display(); + info!("start await "); + let _ = rx + .take(counter as usize) + .map(|r| statistics.add(r)) + .collect::>() + .await; + info!("end await "); + statistics.display(); + } writer.close().await; } @@ -206,8 +207,21 @@ fn init_logger() { #[derive(Debug, Default, PartialOrd, Ord, PartialEq, Eq, Clone)] pub struct Key128(Vec); -impl Key for Key128 { +#[derive(PartialEq, Eq, PartialOrd, Ord)] +pub struct RefKeyType<'a>(&'a [u8]); + +impl<'a> From<&'a [u8]> for RefKeyType<'a> { + fn from(v: &'a [u8]) -> Self { + Self(v) + } +} + +impl<'a> RefKey<'a> for RefKeyType<'a> {} + +impl<'a> Key<'a> for Key128 { const LEN: u16 = 8; + const MEM_SIZE: usize = 16 * 8; + type Ref = RefKeyType<'a>; } impl AsRef for Key128 { @@ -223,6 +237,13 @@ impl From> for Key128 { } } +impl<'a> From<&'a [u8]> for Key128 { + fn from(v: &'a [u8]) -> Self { + assert_eq!(Self::LEN as usize, v.len()); + Self(v.to_vec()) + } +} + impl AsRef<[u8]> for Key128 { fn as_ref(&self) -> &[u8] { self.0.as_ref() diff --git a/src/benchmark/writer.rs b/src/benchmark/writer.rs index 72eb7ee679..3982de768e 100644 --- a/src/benchmark/writer.rs +++ b/src/benchmark/writer.rs @@ -1,11 +1,10 @@ use super::prelude::*; -use pearl::rio; -pub struct Writer { +pub struct Writer Key<'a> + 'static> { storage: Storage, } -impl Writer { +impl Key<'a>> Writer { pub fn new( tmp_dir: &Path, max_blob_size: u64, @@ -22,8 +21,7 @@ impl Writer { builder = builder.allow_duplicates(); } - let rio = rio::new().unwrap(); - let storage = builder.enable_aio(rio).build().unwrap(); + let storage = builder.build().unwrap(); Self { storage } } @@ -35,14 +33,17 @@ impl Writer { let kbuf: &[u8] = key.as_ref().as_ref(); let mut report = Report::new(kbuf.len(), data.len()); let now = Instant::now(); - self.storage.write(key, data).await.unwrap(); + self.storage + .write(key, data.into(), BlobRecordTimestamp::now()) + .await + .unwrap(); debug!("write finished"); report.set_latency(now); tx.try_send(report).unwrap(); debug!("report sent"); } - pub async fn close(&self) { - self.storage.clone().close().await.unwrap(); + pub async fn close(self) { + self.storage.close().await.unwrap(); } } From 6c9c1b7ec281dee70eab61309e7e530560ad6e0d Mon Sep 17 00:00:00 2001 From: Simeon Romanov Date: Tue, 31 Oct 2023 20:49:39 +0300 Subject: [PATCH 2/4] update Changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34f668ed8d..202d0464bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ Pearl changelog #### Fixed - +- Benchmark build error (#296) #### Updated From c1ff8563dcf0717f2b02ed03d5d29aa3aac2e47d Mon Sep 17 00:00:00 2001 From: Simeon Romanov Date: Tue, 31 Oct 2023 21:09:17 +0300 Subject: [PATCH 3/4] add explicit io_driver setting --- src/benchmark/writer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/benchmark/writer.rs b/src/benchmark/writer.rs index 3982de768e..09c469108b 100644 --- a/src/benchmark/writer.rs +++ b/src/benchmark/writer.rs @@ -15,7 +15,8 @@ impl Key<'a>> Writer { .blob_file_name_prefix("benchmark") .max_blob_size(max_blob_size) .max_data_in_blob(max_data_in_blob) - .work_dir(tmp_dir.join("pearl_benchmark")); + .work_dir(tmp_dir.join("pearl_benchmark")) + .set_io_driver(pearl::IoDriver::new()); if allow_duplicates { info!("duplicates allowed"); builder = builder.allow_duplicates(); From b07b25bc3e0c3ee48755a93453378caa57d6be55 Mon Sep 17 00:00:00 2001 From: Simeon Romanov Date: Sat, 16 Dec 2023 15:03:42 +0300 Subject: [PATCH 4/4] post review fixes --- .github/workflows/build.yml | 5 +++++ src/benchmark/bin.rs | 6 ++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 841aa06e89..54bff34747 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,6 +29,11 @@ jobs: with: command: build args: --features async-io-rio + - name: Build benchmarks + uses: actions-rs/cargo@v1 + with: + command: build + args: --features benchmark build-windows: runs-on: windows-latest diff --git a/src/benchmark/bin.rs b/src/benchmark/bin.rs index 6fe5888b7e..ad4ce79ac9 100644 --- a/src/benchmark/bin.rs +++ b/src/benchmark/bin.rs @@ -29,7 +29,6 @@ mod prelude { io::Write, ops::Add, path::{Path, PathBuf}, - sync::Arc, time::{Duration, Instant}, }; } @@ -79,13 +78,12 @@ async fn start_app() { let prepared = (0..futures_limit).map(|_| generator.next().unwrap()); { - let arc_writer = Arc::new(&writer); let mut futures_pool: FuturesUnordered<_> = prepared .into_iter() .map(|(key, data)| { let ltx = tx.clone(); counter += 1; - arc_writer.write(key, data, ltx) + writer.write(key, data, ltx) }) .collect(); println!( @@ -115,7 +113,7 @@ async fn start_app() { if let Some((key, data)) = generator.next() { let ltx = tx.clone(); counter += 1; - futures_pool.push(arc_writer.write(key.into(), data, ltx)); + futures_pool.push(writer.write(key.into(), data, ltx)); } } debug!("#{}/{} next await", counter, futures_pool.len());