Skip to content

Commit

Permalink
config write worker num
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 19, 2024
1 parent aac2e8b commit 4ba2c1b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 30 deletions.
2 changes: 2 additions & 0 deletions src/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use serde::{Deserialize, Serialize};
#[serde(default)]
pub struct Config {
pub port: u16,
pub write_worker_num: usize, // for test
pub metric_engine: MetricEngineConfig,
}

impl Default for Config {
fn default() -> Self {
Self {
port: 5000,
write_worker_num: 4,
metric_engine: MetricEngineConfig::default(),
}
}
Expand Down
65 changes: 35 additions & 30 deletions src/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ pub fn main() {
info!(config = ?config, "Config loaded");

let port = config.port;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build main runtime");
let rt = build_multi_runtime("main", 1);
let manifest_compact_runtime = build_multi_runtime(
"manifest-compact",
config.metric_engine.manifest.background_thread_num,
Expand All @@ -91,7 +88,8 @@ pub fn main() {
StorageConfig::Local(v) => v,
StorageConfig::S3Like(_) => panic!("S3 not support yet"),
};
let write_rt = build_multi_runtime("write", 4);
let write_worker_num = config.write_worker_num;
let write_rt = build_multi_runtime("write", write_worker_num);
let _ = rt.block_on(async move {
let store = Arc::new(LocalFileSystem::new());
let storage = Arc::new(
Expand All @@ -107,7 +105,9 @@ pub fn main() {
.await
.unwrap(),
);
bench_write(write_rt.clone(), storage.clone());

bench_write(write_rt.clone(), write_worker_num, storage.clone());

let app_state = Data::new(AppState { storage });
info!(port, "Start HoraeDB http server...");
HttpServer::new(move || {
Expand Down Expand Up @@ -143,35 +143,40 @@ fn build_schema() -> SchemaRef {
Field::new("value", DataType::Int64, true),
]))
}
fn bench_write(rt: RuntimeRef, storage: TimeMergeStorageRef) {

fn bench_write(rt: RuntimeRef, workers: usize, storage: TimeMergeStorageRef) {
let schema = Arc::new(Schema::new(vec![
Field::new("pk1", DataType::Int64, true),
Field::new("pk2", DataType::Int64, true),
Field::new("pk3", DataType::Int64, true),
Field::new("value", DataType::Int64, true),
]));
rt.spawn(async move {
loop {
let pk1: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let pk2: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let pk3: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let value: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3), Arc::new(value)],
)
.unwrap();
let now = common::now();
if let Err(e) = storage
.write(WriteRequest {
batch,
enable_check: false,
time_range: (now..now + 1).into(),
})
.await
{
error!("write failed, err:{}", e);
for _ in 0..workers {
let storage = storage.clone();
let schema = schema.clone();
rt.spawn(async move {
loop {
let pk1: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let pk2: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let pk3: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let value: Int64Array = repeat_with(rand::random::<i64>).take(1000).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(pk1), Arc::new(pk2), Arc::new(pk3), Arc::new(value)],
)
.unwrap();
let now = common::now();
if let Err(e) = storage
.write(WriteRequest {
batch,
enable_check: false,
time_range: (now..now + 1).into(),
})
.await
{
error!("write failed, err:{}", e);
}
}
}
});
});
}
}

0 comments on commit 4ba2c1b

Please sign in to comment.