From 9178f38482678701e6ab0767404f12d267a68dcb Mon Sep 17 00:00:00 2001 From: Simeon Romanov <68448737+archeoss@users.noreply.github.com> Date: Tue, 19 Dec 2023 01:28:24 +0300 Subject: [PATCH] 296 fix benchmark build errors (#297) * fix bench * update Changelog * add explicit io_driver setting * post review fixes --- .github/workflows/build.yml | 5 ++ CHANGELOG.md | 1 + Cargo.toml | 3 +- src/benchmark/bin.rs | 117 +++++++++++++++++++++--------------- src/benchmark/writer.rs | 20 +++--- 5 files changed, 87 insertions(+), 59 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 841aa06e89..54bff34747 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,6 +29,11 @@ jobs: with: command: build args: --features async-io-rio + - name: Build benchmarks + uses: actions-rs/cargo@v1 + with: + command: build + args: --features benchmark build-windows: runs-on: windows-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a6f6390f8..deafc8083e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Pearl changelog #### Fixed +- Fixed benchmark build error (#296) - Fixed build warnings (#300) - Fix the build by adding yanked aHash implementation (#302) diff --git a/Cargo.toml b/Cargo.toml index 749ce6280f..c32978a0b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,13 +47,14 @@ async-lock = "2.7" # Benchmark only dependencies clap = { version = "3.2", optional = true } env_logger = { version = "0.9", optional = true } +rand = { version = "0.8", optional = true } [dependencies.tokio] version = "1.28" features = ["fs", "io-util", "sync", "time", "rt", "macros", "rt-multi-thread"] [features] -benchmark = ["dep:clap", "dep:env_logger"] +benchmark = ["dep:clap", "dep:env_logger", "dep:rand"] async-io-rio = ["dep:rio"] [lib] diff --git a/src/benchmark/bin.rs b/src/benchmark/bin.rs index f4f1b97f84..ad4ce79ac9 100644 --- a/src/benchmark/bin.rs +++ b/src/benchmark/bin.rs @@ -23,13 +23,12 @@ mod prelude { stream::{FuturesUnordered, StreamExt}, }; pub(crate) use log::{Level, LevelFilter}; - pub(crate) use pearl::{Builder, Key, Storage}; + pub(crate) use pearl::{BlobRecordTimestamp, Builder, Key, RefKey, Storage}; pub(crate) use rand::{rngs::ThreadRng, RngCore}; pub(crate) use std::{ io::Write, ops::Add, path::{Path, PathBuf}, - sync::Arc, time::{Duration, Instant}, }; } @@ -73,62 +72,62 @@ async fn start_app() { info!("Start write cycle"); let (tx, rx) = channel::(1024); - let writer = Arc::new(writer); let mut counter = 0; let futures_limit: usize = matches.value_of("futures_limit").unwrap().parse().unwrap(); let prepared = (0..futures_limit).map(|_| generator.next().unwrap()); - - let mut futures_pool: FuturesUnordered<_> = prepared - .into_iter() - .map(|(key, data)| { - let ltx = tx.clone(); - counter += 1; - writer.write(key, data, ltx) - }) - .collect(); - println!( - "{:<10}{:<10}{:<10}{:<10}{:<10}", - "Completed", "Active", "Limit", "Total", "%" - ); - let write_limit = limit * 1000 / value_size_kb; - let mut prev_p = 0; - while futures_pool.next().await.is_some() { - debug!("#{}/{} future ready", counter, futures_pool.len()); - let percent = counter * 1000 / write_limit; - if prev_p != percent { - print!( - "\r{:<10}{:<10}{:<10}{:<10}{:<10}", - counter, - futures_pool.len(), - futures_limit, - write_limit, - percent / 10 - ); - if percent % 50 == 0 { - println!(); - } - } - prev_p = percent; - if futures_pool.len() < futures_limit { - if let Some((key, data)) = generator.next() { + { + let mut futures_pool: FuturesUnordered<_> = prepared + .into_iter() + .map(|(key, data)| { let ltx = tx.clone(); counter += 1; - futures_pool.push(writer.write(key.into(), data, ltx)); + writer.write(key, data, ltx) + }) + .collect(); + println!( + "{:<10}{:<10}{:<10}{:<10}{:<10}", + "Completed", "Active", "Limit", "Total", "%" + ); + let write_limit = limit * 1000 / value_size_kb; + let mut prev_p = 0; + while futures_pool.next().await.is_some() { + debug!("#{}/{} future ready", counter, futures_pool.len()); + let percent = counter * 1000 / write_limit; + if prev_p != percent { + print!( + "\r{:<10}{:<10}{:<10}{:<10}{:<10}", + counter, + futures_pool.len(), + futures_limit, + write_limit, + percent / 10 + ); + if percent % 50 == 0 { + println!(); + } } + prev_p = percent; + if futures_pool.len() < futures_limit { + if let Some((key, data)) = generator.next() { + let ltx = tx.clone(); + counter += 1; + futures_pool.push(writer.write(key.into(), data, ltx)); + } + } + debug!("#{}/{} next await", counter, futures_pool.len()); } - debug!("#{}/{} next await", counter, futures_pool.len()); - } - info!("start await "); - let _ = rx - .take(counter as usize) - .map(|r| statistics.add(r)) - .collect::>() - .await; - info!("end await "); - statistics.display(); + info!("start await "); + let _ = rx + .take(counter as usize) + .map(|r| statistics.add(r)) + .collect::>() + .await; + info!("end await "); + statistics.display(); + } writer.close().await; } @@ -206,8 +205,21 @@ fn init_logger() { #[derive(Debug, Default, PartialOrd, Ord, PartialEq, Eq, Clone)] pub struct Key128(Vec); -impl Key for Key128 { +#[derive(PartialEq, Eq, PartialOrd, Ord)] +pub struct RefKeyType<'a>(&'a [u8]); + +impl<'a> From<&'a [u8]> for RefKeyType<'a> { + fn from(v: &'a [u8]) -> Self { + Self(v) + } +} + +impl<'a> RefKey<'a> for RefKeyType<'a> {} + +impl<'a> Key<'a> for Key128 { const LEN: u16 = 8; + const MEM_SIZE: usize = 16 * 8; + type Ref = RefKeyType<'a>; } impl AsRef for Key128 { @@ -223,6 +235,13 @@ impl From> for Key128 { } } +impl<'a> From<&'a [u8]> for Key128 { + fn from(v: &'a [u8]) -> Self { + assert_eq!(Self::LEN as usize, v.len()); + Self(v.to_vec()) + } +} + impl AsRef<[u8]> for Key128 { fn as_ref(&self) -> &[u8] { self.0.as_ref() diff --git a/src/benchmark/writer.rs b/src/benchmark/writer.rs index 72eb7ee679..09c469108b 100644 --- a/src/benchmark/writer.rs +++ b/src/benchmark/writer.rs @@ -1,11 +1,10 @@ use super::prelude::*; -use pearl::rio; -pub struct Writer { +pub struct Writer Key<'a> + 'static> { storage: Storage, } -impl Writer { +impl Key<'a>> Writer { pub fn new( tmp_dir: &Path, max_blob_size: u64, @@ -16,14 +15,14 @@ impl Writer { .blob_file_name_prefix("benchmark") .max_blob_size(max_blob_size) .max_data_in_blob(max_data_in_blob) - .work_dir(tmp_dir.join("pearl_benchmark")); + .work_dir(tmp_dir.join("pearl_benchmark")) + .set_io_driver(pearl::IoDriver::new()); if allow_duplicates { info!("duplicates allowed"); builder = builder.allow_duplicates(); } - let rio = rio::new().unwrap(); - let storage = builder.enable_aio(rio).build().unwrap(); + let storage = builder.build().unwrap(); Self { storage } } @@ -35,14 +34,17 @@ impl Writer { let kbuf: &[u8] = key.as_ref().as_ref(); let mut report = Report::new(kbuf.len(), data.len()); let now = Instant::now(); - self.storage.write(key, data).await.unwrap(); + self.storage + .write(key, data.into(), BlobRecordTimestamp::now()) + .await + .unwrap(); debug!("write finished"); report.set_latency(now); tx.try_send(report).unwrap(); debug!("report sent"); } - pub async fn close(&self) { - self.storage.clone().close().await.unwrap(); + pub async fn close(self) { + self.storage.close().await.unwrap(); } }