diff --git a/golem-test-framework/src/components/worker_executor/mod.rs b/golem-test-framework/src/components/worker_executor/mod.rs index 0c4c974d97..f989636a8b 100644 --- a/golem-test-framework/src/components/worker_executor/mod.rs +++ b/golem-test-framework/src/components/worker_executor/mod.rs @@ -94,7 +94,8 @@ fn env_vars( ("GOLEM__COMPONENT_SERVICE__CONFIG__HOST" , &component_service.private_host()), ("GOLEM__COMPONENT_SERVICE__CONFIG__PORT" , &component_service.private_grpc_port().to_string()), ("GOLEM__COMPONENT_SERVICE__CONFIG__ACCESS_TOKEN", "2A354594-7A63-4091-A46B-CC58D379F677"), - ("GOLEM__COMPILED_COMPONENT_SERVICE__TYPE" , "Disabled"), + ("GOLEM__COMPILED_COMPONENT_SERVICE__TYPE", "Local"), + ("GOLEM__COMPILED_COMPONENT_SERVICE__CONFIG__ROOT", "/tmp/ittest-local-object-store/golem"), ("GOLEM__BLOB_STORE_SERVICE__TYPE" , "InMemory"), ("GOLEM__SHARD_MANAGER_SERVICE__TYPE" , "Grpc"), ("GOLEM__SHARD_MANAGER_SERVICE__CONFIG__HOST" , &shard_manager.private_host()), diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index 7ac5170d43..8e134bb417 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -53,3 +53,16 @@ path = "src/benchmarks/cold_start_medium.rs" [[bin]] name = "benchmark_cold_start_large" path = "src/benchmarks/cold_start_large.rs" + +[[bin]] +name = "benchmark_latency_small" +path = "src/benchmarks/latency_small.rs" + +[[bin]] +name = "benchmark_latency_medium" +path = "src/benchmarks/latency_medium.rs" + +[[bin]] +name = "benchmark_latency_large" +path = "src/benchmarks/latency_large.rs" + diff --git a/integration-tests/src/benchmarks/cold_start_large.rs b/integration-tests/src/benchmarks/cold_start_large.rs index 9aac1f5a64..7fb091c0b7 100644 --- a/integration-tests/src/benchmarks/cold_start_large.rs +++ b/integration-tests/src/benchmarks/cold_start_large.rs @@ -13,10 +13,13 @@ // limitations under the License. use async_trait::async_trait; +use golem_common::model::WorkerId; use golem_test_framework::config::{CliParams, TestDependencies}; use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkRecorder}; -use integration_tests::benchmarks::{run_benchmark, run_echo, setup, Context}; +use integration_tests::benchmarks::{ + get_worker_ids, run_benchmark, run_echo, setup, start, Context, +}; struct ColdStartEchoLarge { config: CliParams, @@ -35,13 +38,23 @@ impl Benchmark for ColdStartEchoLarge { } async fn setup_iteration(&self) -> Self::IterationContext { - setup(self.config.clone(), "py-echo").await + setup(self.config.clone(), "py-echo", false).await } - async fn warmup(&self, _: &Self::IterationContext) {} + async fn warmup(&self, context: &Self::IterationContext) { + // warmup with other workers + if let Some(WorkerId { component_id, .. }) = context.worker_ids.clone().first() { + start( + get_worker_ids(context.worker_ids.len(), component_id, "warmup-worker"), + context.deps.clone(), + ) + .await + } + } async fn run(&self, context: &Self::IterationContext, recorder: BenchmarkRecorder) { - run_echo(self.config.benchmark_config.length, context, recorder).await + // config.benchmark_config.length is not used, we want to have only one invocation per worker in this benchmark + run_echo(1, context, recorder).await } async fn cleanup_iteration(&self, context: Self::IterationContext) { diff --git a/integration-tests/src/benchmarks/cold_start_medium.rs b/integration-tests/src/benchmarks/cold_start_medium.rs index f702fc3e35..13d922ecfa 100644 --- a/integration-tests/src/benchmarks/cold_start_medium.rs +++ b/integration-tests/src/benchmarks/cold_start_medium.rs @@ -13,10 +13,13 @@ // limitations under the License. use async_trait::async_trait; +use golem_common::model::WorkerId; use golem_test_framework::config::{CliParams, TestDependencies}; use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkRecorder}; -use integration_tests::benchmarks::{run_benchmark, run_echo, setup, Context}; +use integration_tests::benchmarks::{ + get_worker_ids, run_benchmark, run_echo, setup, start, Context, +}; struct ColdStartEchoMedium { config: CliParams, @@ -35,13 +38,23 @@ impl Benchmark for ColdStartEchoMedium { } async fn setup_iteration(&self) -> Self::IterationContext { - setup(self.config.clone(), "js-echo").await + setup(self.config.clone(), "js-echo", false).await } - async fn warmup(&self, _: &Self::IterationContext) {} + async fn warmup(&self, context: &Self::IterationContext) { + // warmup with other workers + if let Some(WorkerId { component_id, .. }) = context.worker_ids.clone().first() { + start( + get_worker_ids(context.worker_ids.len(), component_id, "warmup-worker"), + context.deps.clone(), + ) + .await + } + } async fn run(&self, context: &Self::IterationContext, recorder: BenchmarkRecorder) { - run_echo(self.config.benchmark_config.length, context, recorder).await + // config.benchmark_config.length is not used, we want to have only one invocation per worker in this benchmark + run_echo(1, context, recorder).await } async fn cleanup_iteration(&self, context: Self::IterationContext) { diff --git a/integration-tests/src/benchmarks/cold_start_small.rs b/integration-tests/src/benchmarks/cold_start_small.rs index c87e15c094..79c1b610a1 100644 --- a/integration-tests/src/benchmarks/cold_start_small.rs +++ b/integration-tests/src/benchmarks/cold_start_small.rs @@ -13,10 +13,13 @@ // limitations under the License. use async_trait::async_trait; +use golem_common::model::WorkerId; use golem_test_framework::config::{CliParams, TestDependencies}; use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkRecorder}; -use integration_tests::benchmarks::{run_benchmark, run_echo, setup, Context}; +use integration_tests::benchmarks::{ + get_worker_ids, run_benchmark, run_echo, setup, start, Context, +}; struct ColdStartEchoSmall { config: CliParams, @@ -35,13 +38,23 @@ impl Benchmark for ColdStartEchoSmall { } async fn setup_iteration(&self) -> Self::IterationContext { - setup(self.config.clone(), "rust-echo").await + setup(self.config.clone(), "rust-echo", false).await } - async fn warmup(&self, _: &Self::IterationContext) {} + async fn warmup(&self, context: &Self::IterationContext) { + // warmup with other workers + if let Some(WorkerId { component_id, .. }) = context.worker_ids.clone().first() { + start( + get_worker_ids(context.worker_ids.len(), component_id, "warmup-worker"), + context.deps.clone(), + ) + .await + } + } async fn run(&self, context: &Self::IterationContext, recorder: BenchmarkRecorder) { - run_echo(self.config.benchmark_config.length, context, recorder).await + // config.benchmark_config.length is not used, we want to have only one invocation per worker in this benchmark + run_echo(1, context, recorder).await } async fn cleanup_iteration(&self, context: Self::IterationContext) { diff --git a/integration-tests/src/benchmarks/latency_large.rs b/integration-tests/src/benchmarks/latency_large.rs new file mode 100644 index 0000000000..dd2277ab11 --- /dev/null +++ b/integration-tests/src/benchmarks/latency_large.rs @@ -0,0 +1,57 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; + +use golem_test_framework::config::{CliParams, TestDependencies}; +use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkRecorder}; +use integration_tests::benchmarks::{run_benchmark, run_echo, setup, warmup_echo, Context}; + +struct WorkerLatencyLarge { + config: CliParams, +} + +#[async_trait] +impl Benchmark for WorkerLatencyLarge { + type IterationContext = Context; + + fn name() -> &'static str { + "latency-large" + } + + async fn create(config: CliParams) -> Self { + Self { config } + } + + async fn setup_iteration(&self) -> Self::IterationContext { + setup(self.config.clone(), "py-echo", true).await + } + + async fn warmup(&self, context: &Self::IterationContext) { + warmup_echo(context).await + } + + async fn run(&self, context: &Self::IterationContext, recorder: BenchmarkRecorder) { + run_echo(self.config.benchmark_config.length, context, recorder).await + } + + async fn cleanup_iteration(&self, context: Self::IterationContext) { + context.deps.kill_all(); + } +} + +#[tokio::main] +async fn main() { + run_benchmark::().await; +} diff --git a/integration-tests/src/benchmarks/latency_medium.rs b/integration-tests/src/benchmarks/latency_medium.rs new file mode 100644 index 0000000000..921ba67c1a --- /dev/null +++ b/integration-tests/src/benchmarks/latency_medium.rs @@ -0,0 +1,57 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; + +use golem_test_framework::config::{CliParams, TestDependencies}; +use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkRecorder}; +use integration_tests::benchmarks::{run_benchmark, run_echo, setup, warmup_echo, Context}; + +struct WorkerLatencyMedium { + config: CliParams, +} + +#[async_trait] +impl Benchmark for WorkerLatencyMedium { + type IterationContext = Context; + + fn name() -> &'static str { + "latency-medium" + } + + async fn create(config: CliParams) -> Self { + Self { config } + } + + async fn setup_iteration(&self) -> Self::IterationContext { + setup(self.config.clone(), "js-echo", true).await + } + + async fn warmup(&self, context: &Self::IterationContext) { + warmup_echo(context).await + } + + async fn run(&self, context: &Self::IterationContext, recorder: BenchmarkRecorder) { + run_echo(self.config.benchmark_config.length, context, recorder).await + } + + async fn cleanup_iteration(&self, context: Self::IterationContext) { + context.deps.kill_all(); + } +} + +#[tokio::main] +async fn main() { + run_benchmark::().await; +} diff --git a/integration-tests/src/benchmarks/latency_small.rs b/integration-tests/src/benchmarks/latency_small.rs new file mode 100644 index 0000000000..16fef10d7b --- /dev/null +++ b/integration-tests/src/benchmarks/latency_small.rs @@ -0,0 +1,57 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; + +use golem_test_framework::config::{CliParams, TestDependencies}; +use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkRecorder}; +use integration_tests::benchmarks::{run_benchmark, run_echo, setup, warmup_echo, Context}; + +struct WorkerLatencySmall { + config: CliParams, +} + +#[async_trait] +impl Benchmark for WorkerLatencySmall { + type IterationContext = Context; + + fn name() -> &'static str { + "latency-small" + } + + async fn create(config: CliParams) -> Self { + Self { config } + } + + async fn setup_iteration(&self) -> Self::IterationContext { + setup(self.config.clone(), "rust-echo", true).await + } + + async fn warmup(&self, context: &Self::IterationContext) { + warmup_echo(context).await + } + + async fn run(&self, context: &Self::IterationContext, recorder: BenchmarkRecorder) { + run_echo(self.config.benchmark_config.length, context, recorder).await + } + + async fn cleanup_iteration(&self, context: Self::IterationContext) { + context.deps.kill_all(); + } +} + +#[tokio::main] +async fn main() { + run_benchmark::().await; +} diff --git a/integration-tests/src/benchmarks/mod.rs b/integration-tests/src/benchmarks/mod.rs index 087b4582a5..e853605b54 100644 --- a/integration-tests/src/benchmarks/mod.rs +++ b/integration-tests/src/benchmarks/mod.rs @@ -17,7 +17,7 @@ use std::time::SystemTime; use clap::Parser; use golem_wasm_rpc::Value; -use golem_common::model::WorkerId; +use golem_common::model::{ComponentId, WorkerId}; use golem_test_framework::config::{CliParams, CliTestDependencies}; use golem_test_framework::dsl::benchmark::{BenchmarkApi, BenchmarkRecorder}; use golem_test_framework::dsl::TestDsl; @@ -28,25 +28,83 @@ pub struct Context { pub worker_ids: Vec, } -pub async fn setup(config: CliParams, component_name: &str) -> Context { +pub fn get_worker_ids(size: usize, component_id: &ComponentId, prefix: &str) -> Vec { + let mut worker_ids = Vec::new(); + for i in 0..size { + let worker_name = format!("{prefix}-{i}"); + worker_ids.push(WorkerId { + component_id: component_id.clone(), + worker_name, + }); + } + worker_ids +} + +pub async fn setup_with( + size: usize, + component_name: &str, + start_workers: bool, + deps: CliTestDependencies, +) -> Context { // Initialize infrastructure - let deps = CliTestDependencies::new(config.clone()).await; // Upload test component let component_id = deps.store_component(component_name).await; - let mut worker_ids = Vec::new(); - // Create 'size' workers - for i in 0..config.benchmark_config.size { - let worker_id = deps - .start_worker(&component_id, &format!("worker-{i}")) - .await; - worker_ids.push(worker_id); + let worker_ids = get_worker_ids(size, &component_id, "worker"); + + if start_workers { + start(worker_ids.clone(), deps.clone()).await } Context { deps, worker_ids } } +pub async fn start(worker_ids: Vec, deps: CliTestDependencies) { + for worker_id in worker_ids { + let _ = deps + .start_worker(&worker_id.component_id, &worker_id.worker_name) + .await; + } +} + +pub async fn setup(config: CliParams, component_name: &str, start_workers: bool) -> Context { + // Initialize infrastructure + let deps = CliTestDependencies::new(config.clone()).await; + + setup_with( + config.benchmark_config.size, + component_name, + start_workers, + deps, + ) + .await +} + +pub async fn warmup_echo(context: &Context) { + let mut fibers = Vec::new(); + for worker_id in &context.worker_ids { + let context_clone = context.clone(); + let worker_id_clone = worker_id.clone(); + let fiber = tokio::task::spawn(async move { + context_clone + .deps + .invoke_and_await( + &worker_id_clone, + "golem:it/api/echo", + vec![Value::String("hello".to_string())], + ) + .await + .expect("invoke_and_await failed"); + }); + fibers.push(fiber); + } + + for fiber in fibers { + fiber.await.expect("fiber failed"); + } +} + pub async fn run_echo(length: usize, context: &Context, recorder: BenchmarkRecorder) { // Invoke each worker a 'length' times in parallel and record the duration let mut fibers = Vec::new(); diff --git a/integration-tests/src/benchmarks/simple_worker_echo.rs b/integration-tests/src/benchmarks/simple_worker_echo.rs index 1ab0022049..ff681596eb 100644 --- a/integration-tests/src/benchmarks/simple_worker_echo.rs +++ b/integration-tests/src/benchmarks/simple_worker_echo.rs @@ -39,7 +39,7 @@ impl Benchmark for SimpleWorkerEcho { } async fn setup_iteration(&self) -> Self::IterationContext { - setup(self.config.clone(), "option-service").await + setup(self.config.clone(), "option-service", true).await } async fn warmup(&self, context: &Self::IterationContext) { diff --git a/integration-tests/src/benchmarks/suspend_worker.rs b/integration-tests/src/benchmarks/suspend_worker.rs index b3bc79ce95..2e790414f6 100644 --- a/integration-tests/src/benchmarks/suspend_worker.rs +++ b/integration-tests/src/benchmarks/suspend_worker.rs @@ -15,19 +15,12 @@ use std::time::SystemTime; use async_trait::async_trait; -use clap::Parser; use golem_wasm_rpc::Value; -use golem_common::model::WorkerId; -use golem_test_framework::config::{CliParams, CliTestDependencies, TestDependencies}; -use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkApi, BenchmarkRecorder}; +use golem_test_framework::config::{CliParams, TestDependencies}; +use golem_test_framework::dsl::benchmark::{Benchmark, BenchmarkRecorder}; use golem_test_framework::dsl::TestDsl; - -#[derive(Clone)] -struct Context { - deps: CliTestDependencies, - worker_ids: Vec, -} +use integration_tests::benchmarks::{run_benchmark, setup, Context}; struct SuspendWorkerLatency { config: CliParams, @@ -46,22 +39,7 @@ impl Benchmark for SuspendWorkerLatency { } async fn setup_iteration(&self) -> Self::IterationContext { - // Initialize infrastructure - let deps = CliTestDependencies::new(self.config.clone()).await; - - // Upload test component - let component_id = deps.store_component("clocks").await; - let mut worker_ids = Vec::new(); - - // Create 'size' workers - for i in 0..self.config.benchmark_config.size { - let worker_id = deps - .start_worker(&component_id, &format!("worker-{i}")) - .await; - worker_ids.push(worker_id); - } - - Self::IterationContext { deps, worker_ids } + setup(self.config.clone(), "clocks", true).await } async fn warmup(&self, context: &Self::IterationContext) { @@ -121,8 +99,5 @@ impl Benchmark for SuspendWorkerLatency { #[tokio::main] async fn main() { - let params = CliParams::parse(); - CliTestDependencies::init_logging(¶ms); - let result = SuspendWorkerLatency::run_benchmark(params).await; - println!("{}", result); + run_benchmark::().await; }