diff --git a/src/server/src/config.rs b/src/server/src/config.rs index 3e3680a864..41f39d2555 100644 --- a/src/server/src/config.rs +++ b/src/server/src/config.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; #[serde(default)] pub struct Config { pub port: u16, + pub write_worker_num: usize, // for test pub metric_engine: MetricEngineConfig, } @@ -29,6 +30,7 @@ impl Default for Config { fn default() -> Self { Self { port: 5000, + write_worker_num: 4, metric_engine: MetricEngineConfig::default(), } } diff --git a/src/server/src/main.rs b/src/server/src/main.rs index 476683fbdd..0644f5a50c 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -74,10 +74,7 @@ pub fn main() { info!(config = ?config, "Config loaded"); let port = config.port; - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("build main runtime"); + let rt = build_multi_runtime("main", 1); let manifest_compact_runtime = build_multi_runtime( "manifest-compact", config.metric_engine.manifest.background_thread_num, @@ -91,7 +88,8 @@ pub fn main() { StorageConfig::Local(v) => v, StorageConfig::S3Like(_) => panic!("S3 not support yet"), }; - let write_rt = build_multi_runtime("write", 4); + let write_worker_num = config.write_worker_num; + let write_rt = build_multi_runtime("write", write_worker_num); let _ = rt.block_on(async move { let store = Arc::new(LocalFileSystem::new()); let storage = Arc::new( @@ -107,7 +105,9 @@ pub fn main() { .await .unwrap(), ); - bench_write(write_rt.clone(), storage.clone()); + + bench_write(write_rt.clone(), write_worker_num, storage.clone()); + let app_state = Data::new(AppState { storage }); info!(port, "Start HoraeDB http server..."); HttpServer::new(move || { @@ -143,35 +143,40 @@ fn build_schema() -> SchemaRef { Field::new("value", DataType::Int64, true), ])) } -fn bench_write(rt: RuntimeRef, storage: TimeMergeStorageRef) { + +fn bench_write(rt: RuntimeRef, workers: usize, storage: TimeMergeStorageRef) { let schema = Arc::new(Schema::new(vec![ Field::new("pk1", DataType::Int64, true), Field::new("pk2", DataType::Int64, true), Field::new("pk3", DataType::Int64, true), Field::new("value", DataType::Int64, true), ])); - rt.spawn(async move { - loop { - let pk1: Int64Array = repeat_with(rand::random::).take(1000).collect(); - let pk2: Int64Array = repeat_with(rand::random::).take(1000).collect(); - let pk3: Int64Array = repeat_with(rand::random::).take(1000).collect(); - let value: Int64Array = repeat_with(rand::random::).take(1000).collect(); - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3), Arc::new(value)], - ) - .unwrap(); - let now = common::now(); - if let Err(e) = storage - .write(WriteRequest { - batch, - enable_check: false, - time_range: (now..now + 1).into(), - }) - .await - { - error!("write failed, err:{}", e); + for _ in 0..workers { + let storage = storage.clone(); + let schema = schema.clone(); + rt.spawn(async move { + loop { + let pk1: Int64Array = repeat_with(rand::random::).take(1000).collect(); + let pk2: Int64Array = repeat_with(rand::random::).take(1000).collect(); + let pk3: Int64Array = repeat_with(rand::random::).take(1000).collect(); + let value: Int64Array = repeat_with(rand::random::).take(1000).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3), Arc::new(value)], + ) + .unwrap(); + let now = common::now(); + if let Err(e) = storage + .write(WriteRequest { + batch, + enable_check: false, + time_range: (now..now + 1).into(), + }) + .await + { + error!("write failed, err:{}", e); + } } - } - }); + }); + } }