diff --git a/golem-worker-executor-base/src/grpc.rs b/golem-worker-executor-base/src/grpc.rs index c678654ed2..a18af5bf64 100644 --- a/golem-worker-executor-base/src/grpc.rs +++ b/golem-worker-executor-base/src/grpc.rs @@ -67,7 +67,7 @@ use crate::model::{InterruptKind, LastError, ListDirectoryResult, ReadFileResult use crate::services::events::Event; use crate::services::worker_activator::{DefaultWorkerActivator, LazyWorkerActivator}; use crate::services::worker_event::WorkerEventReceiver; -use crate::services::worker_fork::{DefaultWorkerFork, WorkerFork}; +use crate::services::worker_fork::{DefaultWorkerFork, WorkerForkService}; use crate::services::{ All, HasActiveWorkers, HasAll, HasComponentService, HasEvents, HasOplogService, HasPlugins, HasPromiseService, HasRunningWorkerEnumerationService, HasShardManagerService, HasShardService, @@ -142,7 +142,6 @@ pub struct WorkerExecutorImpl< > { /// Reference to all the initialized services services: Svcs, - worker_fork: Arc, ctx: PhantomData, } @@ -152,7 +151,6 @@ impl + UsesAllDeps + Send + Sync + fn clone(&self) -> Self { Self { services: self.services.clone(), - worker_fork: self.worker_fork.clone(), ctx: PhantomData, } } @@ -169,12 +167,8 @@ impl + UsesAllDeps + Send + Sync + lazy_worker_activator: Arc>, port: u16, ) -> Result { - let worker_fork: Arc = - Arc::new(DefaultWorkerFork::new(services.clone())); - let worker_executor = WorkerExecutorImpl { services: services.clone(), - worker_fork, ctx: PhantomData, }; let worker_activator = Arc::new(DefaultWorkerActivator::new(services.clone())); @@ -460,7 +454,8 @@ impl + UsesAllDeps + Send + Sync + let owned_source_worker_id = OwnedWorkerId::new(&account_id, &source_worker_id); - self.worker_fork + self.services + .worker_fork_service() .fork( &owned_source_worker_id, &owned_target_worker_id.worker_id, diff --git a/golem-worker-executor-base/src/services/mod.rs b/golem-worker-executor-base/src/services/mod.rs index d68ff06701..4314e69444 100644 --- a/golem-worker-executor-base/src/services/mod.rs +++ b/golem-worker-executor-base/src/services/mod.rs @@ -63,6 +63,10 @@ pub trait HasConfig { fn config(&self) -> Arc; } +pub trait HasWorkerForkService { + fn worker_fork_service(&self) -> Arc; +} + pub trait HasWorkerService { fn worker_service(&self) -> Arc; } @@ -154,6 +158,7 @@ pub trait HasAll: HasActiveWorkers + HasComponentService + HasConfig + + HasWorkerForkService + HasWorkerService + HasWorkerEnumerationService + HasRunningWorkerEnumerationService @@ -182,6 +187,7 @@ impl< T: HasActiveWorkers + HasComponentService + HasConfig + + HasWorkerForkService + HasWorkerService + HasWorkerEnumerationService + HasRunningWorkerEnumerationService @@ -215,6 +221,7 @@ pub struct All { runtime: Handle, component_service: Arc, shard_manager_service: Arc, + worker_fork: Arc, worker_service: Arc, worker_enumeration_service: Arc, running_worker_enumeration_service: @@ -249,6 +256,7 @@ impl Clone for All { runtime: self.runtime.clone(), component_service: self.component_service.clone(), shard_manager_service: self.shard_manager_service.clone(), + worker_fork: self.worker_fork.clone(), worker_service: self.worker_service.clone(), worker_enumeration_service: self.worker_enumeration_service.clone(), running_worker_enumeration_service: self.running_worker_enumeration_service.clone(), @@ -280,6 +288,7 @@ impl All { runtime: Handle, component_service: Arc, shard_manager_service: Arc, + worker_fork: Arc, worker_service: Arc, worker_enumeration_service: Arc< dyn worker_enumeration::WorkerEnumerationService + Send + Sync, @@ -314,6 +323,7 @@ impl All { runtime, component_service, shard_manager_service, + worker_fork, worker_service, worker_enumeration_service, running_worker_enumeration_service, @@ -343,6 +353,7 @@ impl All { this.runtime(), this.component_service(), this.shard_manager_service(), + this.worker_fork_service(), this.worker_service(), this.worker_enumeration_service(), this.running_worker_enumeration_service(), @@ -403,6 +414,12 @@ impl> HasConfig for T { } } +impl> HasWorkerForkService for T { + fn worker_fork_service(&self) -> Arc { + self.all().worker_fork.clone() + } +} + impl> HasWorkerService for T { fn worker_service(&self) -> Arc { self.all().worker_service.clone() diff --git a/golem-worker-executor-base/src/services/worker_fork.rs b/golem-worker-executor-base/src/services/worker_fork.rs index 07a3d0bdc3..aa99ce7fb3 100644 --- a/golem-worker-executor-base/src/services/worker_fork.rs +++ b/golem-worker-executor-base/src/services/worker_fork.rs @@ -13,23 +13,49 @@ // limitations under the License. use std::marker::PhantomData; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; -use crate::error::GolemError; use crate::metrics::workers::record_worker_call; use crate::model::ExecutionStatus; use crate::services::oplog::CommitLevel; -use crate::services::{HasAll, HasOplog}; -use crate::worker::Worker; -use crate::workerctx::WorkerCtx; -use async_trait::async_trait; +use crate::services::rpc::{DirectWorkerInvocationRpc, RemoteInvocationRpc, Rpc}; +use crate::services::{rpc, HasAll, HasOplog, HasWorkerForkService}; use golem_common::model::oplog::{OplogIndex, OplogIndexRange}; -use golem_common::model::{ - AccountId, OwnedWorkerId, Timestamp, WorkerId, WorkerMetadata, WorkerStatusRecord, +use golem_common::model::{AccountId, Timestamp, WorkerMetadata, WorkerStatusRecord}; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; +use std::sync::Arc; + +use async_trait::async_trait; +use bincode::{Decode, Encode}; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; +use golem_wasm_rpc::WitValue; +use tokio::runtime::Handle; +use tracing::debug; + +use super::file_loader::FileLoader; +use crate::error::GolemError; +use crate::services::events::Events; +use crate::services::oplog::plugin::OplogProcessorPlugin; +use crate::services::plugins::Plugins; +use crate::services::shard::ShardService; +use crate::services::worker_proxy::{WorkerProxy, WorkerProxyError}; +use crate::services::{ + active_workers, blob_store, component, golem_config, key_value, oplog, promise, scheduler, + shard, shard_manager, worker, worker_activator, worker_enumeration, HasActiveWorkers, + HasBlobStoreService, HasComponentService, HasConfig, HasEvents, HasExtraDeps, HasFileLoader, + HasKeyValueService, HasOplogProcessorPlugin, HasOplogService, HasPlugins, HasPromiseService, + HasRpc, HasRunningWorkerEnumerationService, HasSchedulerService, HasShardManagerService, + HasShardService, HasWasmtimeEngine, HasWorkerActivator, HasWorkerEnumerationService, + HasWorkerProxy, HasWorkerService, }; +use crate::worker::Worker; +use crate::workerctx::WorkerCtx; +use golem_common::model::component::ComponentOwner; +use golem_common::model::{IdempotencyKey, OwnedWorkerId, TargetWorkerId, WorkerId}; #[async_trait] -pub trait WorkerFork { +pub trait WorkerForkService { async fn fork( &self, source_worker_id: &OwnedWorkerId, @@ -38,17 +64,288 @@ pub trait WorkerFork { ) -> Result<(), GolemError>; } -#[derive(Clone)] -pub struct DefaultWorkerFork> { - all: Svcs, - ctx: PhantomData, +pub struct DefaultWorkerFork { + rpc: Arc, + active_workers: Arc>, + engine: Arc, + linker: Arc>, + runtime: Handle, + component_service: Arc, + shard_manager_service: Arc, + worker_service: Arc, + worker_proxy: Arc, + worker_enumeration_service: Arc, + running_worker_enumeration_service: + Arc, + promise_service: Arc, + golem_config: Arc, + shard_service: Arc, + key_value_service: Arc, + blob_store_service: Arc, + oplog_service: Arc, + scheduler_service: Arc, + worker_activator: Arc + Send + Sync>, + events: Arc, + file_loader: Arc, + plugins: Arc< + dyn Plugins<::PluginOwner, Ctx::PluginScope> + + Send + + Sync, + >, + oplog_processor_plugin: Arc, + extra_deps: Ctx::ExtraDeps, +} + +impl HasEvents for DefaultWorkerFork { + fn events(&self) -> Arc { + self.events.clone() + } +} + +impl HasActiveWorkers for DefaultWorkerFork { + fn active_workers(&self) -> Arc> { + self.active_workers.clone() + } +} + +impl HasComponentService for DefaultWorkerFork { + fn component_service(&self) -> Arc { + self.component_service.clone() + } +} + +impl HasConfig for DefaultWorkerFork { + fn config(&self) -> Arc { + self.golem_config.clone() + } } -impl> DefaultWorkerFork { - pub fn new(all: Svcs) -> Self { +impl HasWorkerService for DefaultWorkerFork { + fn worker_service(&self) -> Arc { + self.worker_service.clone() + } +} + +impl HasWorkerEnumerationService for DefaultWorkerFork { + fn worker_enumeration_service( + &self, + ) -> Arc { + self.worker_enumeration_service.clone() + } +} + +impl HasRunningWorkerEnumerationService for DefaultWorkerFork { + fn running_worker_enumeration_service( + &self, + ) -> Arc { + self.running_worker_enumeration_service.clone() + } +} + +impl HasPromiseService for DefaultWorkerFork { + fn promise_service(&self) -> Arc { + self.promise_service.clone() + } +} + +impl HasWasmtimeEngine for DefaultWorkerFork { + fn engine(&self) -> Arc { + self.engine.clone() + } + + fn linker(&self) -> Arc> { + self.linker.clone() + } + + fn runtime(&self) -> Handle { + self.runtime.clone() + } +} + +impl HasKeyValueService for DefaultWorkerFork { + fn key_value_service(&self) -> Arc { + self.key_value_service.clone() + } +} + +impl HasBlobStoreService for DefaultWorkerFork { + fn blob_store_service(&self) -> Arc { + self.blob_store_service.clone() + } +} + +impl HasSchedulerService for DefaultWorkerFork { + fn scheduler_service(&self) -> Arc { + self.scheduler_service.clone() + } +} + +impl HasOplogService for DefaultWorkerFork { + fn oplog_service(&self) -> Arc { + self.oplog_service.clone() + } +} + +impl HasWorkerForkService for DefaultWorkerFork { + fn worker_fork_service(&self) -> Arc { + Arc::new(self.clone()) + } +} + +impl HasRpc for DefaultWorkerFork { + fn rpc(&self) -> Arc { + self.rpc.clone() + } +} + +impl HasExtraDeps for DefaultWorkerFork { + fn extra_deps(&self) -> Ctx::ExtraDeps { + self.extra_deps.clone() + } +} + +impl HasShardService for DefaultWorkerFork { + fn shard_service(&self) -> Arc { + self.shard_service.clone() + } +} + +impl HasShardManagerService for DefaultWorkerFork { + fn shard_manager_service(&self) -> Arc { + self.shard_manager_service.clone() + } +} + +impl HasWorkerActivator for DefaultWorkerFork { + fn worker_activator(&self) -> Arc + Send + Sync> { + self.worker_activator.clone() + } +} + +impl HasWorkerProxy for DefaultWorkerFork { + fn worker_proxy(&self) -> Arc { + self.worker_proxy.clone() + } +} + +impl HasFileLoader for DefaultWorkerFork { + fn file_loader(&self) -> Arc { + self.file_loader.clone() + } +} + +impl + HasPlugins<::PluginOwner, Ctx::PluginScope> + for DefaultWorkerFork +{ + fn plugins( + &self, + ) -> Arc< + dyn Plugins<::PluginOwner, Ctx::PluginScope> + + Send + + Sync, + > { + self.plugins.clone() + } +} + +impl HasOplogProcessorPlugin for DefaultWorkerFork { + fn oplog_processor_plugin(&self) -> Arc { + self.oplog_processor_plugin.clone() + } +} + +impl Clone for DefaultWorkerFork { + fn clone(&self) -> Self { Self { - all, - ctx: PhantomData, + rpc: self.rpc.clone(), + active_workers: self.active_workers.clone(), + engine: self.engine.clone(), + linker: self.linker.clone(), + runtime: self.runtime.clone(), + component_service: self.component_service.clone(), + shard_manager_service: self.shard_manager_service.clone(), + worker_service: self.worker_service.clone(), + worker_proxy: self.worker_proxy.clone(), + worker_enumeration_service: self.worker_enumeration_service.clone(), + running_worker_enumeration_service: self.running_worker_enumeration_service.clone(), + promise_service: self.promise_service.clone(), + golem_config: self.golem_config.clone(), + shard_service: self.shard_service.clone(), + key_value_service: self.key_value_service.clone(), + blob_store_service: self.blob_store_service.clone(), + oplog_service: self.oplog_service.clone(), + scheduler_service: self.scheduler_service.clone(), + worker_activator: self.worker_activator.clone(), + events: self.events.clone(), + file_loader: self.file_loader.clone(), + plugins: self.plugins.clone(), + oplog_processor_plugin: self.oplog_processor_plugin.clone(), + extra_deps: self.extra_deps.clone(), + } + } +} + +impl DefaultWorkerFork { + pub fn new( + rpc: Arc, + active_workers: Arc>, + engine: Arc, + linker: Arc>, + runtime: Handle, + component_service: Arc, + shard_manager_service: Arc, + worker_service: Arc, + worker_proxy: Arc, + worker_enumeration_service: Arc< + dyn worker_enumeration::WorkerEnumerationService + Send + Sync, + >, + running_worker_enumeration_service: Arc< + dyn worker_enumeration::RunningWorkerEnumerationService + Send + Sync, + >, + promise_service: Arc, + golem_config: Arc, + shard_service: Arc, + key_value_service: Arc, + blob_store_service: Arc, + oplog_service: Arc, + scheduler_service: Arc, + worker_activator: Arc + Send + Sync>, + events: Arc, + file_loader: Arc, + plugins: Arc< + dyn Plugins<::PluginOwner, Ctx::PluginScope> + + Send + + Sync, + >, + oplog_processor_plugin: Arc, + extra_deps: Ctx::ExtraDeps, + ) -> Self { + Self { + rpc, + active_workers, + engine, + linker, + runtime, + component_service, + shard_manager_service, + worker_service, + worker_proxy, + worker_enumeration_service, + running_worker_enumeration_service, + promise_service, + golem_config, + shard_service, + key_value_service, + blob_store_service, + oplog_service, + scheduler_service, + worker_activator, + events, + file_loader, + plugins, + oplog_processor_plugin, + extra_deps, } } @@ -69,7 +366,7 @@ impl> DefaultWorkerFork { let owned_target_worker_id = OwnedWorkerId::new(account_id, target_worker_id); - let target_metadata = self.all.worker_service().get(&owned_target_worker_id).await; + let target_metadata = self.worker_service.get(&owned_target_worker_id).await; // We allow forking only if the target worker does not exist if target_metadata.is_some() { @@ -77,12 +374,11 @@ impl> DefaultWorkerFork { } // We assume the source worker belongs to this executor - self.all.shard_service().check_worker(source_worker_id)?; + self.shard_service.check_worker(source_worker_id)?; let owned_source_worker_id = OwnedWorkerId::new(account_id, source_worker_id); - self.all - .worker_service() + self.worker_service .get(&owned_source_worker_id) .await .ok_or(GolemError::worker_not_found(source_worker_id.clone()))?; @@ -92,9 +388,7 @@ impl> DefaultWorkerFork { } #[async_trait] -impl + Send + Sync + 'static> WorkerFork - for DefaultWorkerFork -{ +impl WorkerForkService for DefaultWorkerFork { async fn fork( &self, source_worker_id: &OwnedWorkerId, @@ -115,15 +409,9 @@ impl + Send + Sync + 'static> WorkerFork let target_worker_id = owned_target_worker_id.worker_id.clone(); let account_id = owned_target_worker_id.account_id.clone(); - let source_worker_instance = Worker::get_or_create_suspended( - &self.all, - &owned_source_worker_id, - None, - None, - None, - None, - ) - .await?; + let source_worker_instance = + Worker::get_or_create_suspended(&self, &owned_source_worker_id, None, None, None, None) + .await?; let source_worker_metadata = source_worker_instance.get_metadata().await?; @@ -152,8 +440,7 @@ impl + Send + Sync + 'static> WorkerFork ))?; let new_oplog = self - .all - .oplog_service() + .oplog_service .create( &owned_target_worker_id, target_initial_oplog_entry, @@ -179,8 +466,7 @@ impl + Send + Sync + 'static> WorkerFork // as we need to make sure as it may live in another worker executor, // depending on sharding. // This will replay until the fork point in the forked worker - self.all - .worker_proxy() + self.worker_proxy .resume(&target_worker_id, true) .await .map_err(|err| { diff --git a/golem-worker-executor/src/lib.rs b/golem-worker-executor/src/lib.rs index 57dba3806a..5d65de235b 100644 --- a/golem-worker-executor/src/lib.rs +++ b/golem-worker-executor/src/lib.rs @@ -44,6 +44,7 @@ use golem_worker_executor_base::services::worker_activator::WorkerActivator; use golem_worker_executor_base::services::worker_enumeration::{ RunningWorkerEnumerationService, WorkerEnumerationService, }; +use golem_worker_executor_base::services::worker_fork::DefaultWorkerFork; use golem_worker_executor_base::services::worker_proxy::WorkerProxy; use golem_worker_executor_base::services::{plugins, All}; use golem_worker_executor_base::wasi_host::create_linker; @@ -139,6 +140,33 @@ impl Bootstrap for ServerBootstrap { additional_deps.clone(), )); + let worker_fork = Arc::new(DefaultWorkerFork::new( + rpc.clone(), + active_workers.clone(), + engine.clone(), + linker.clone(), + runtime.clone(), + component_service.clone(), + shard_manager_service.clone(), + worker_service.clone(), + worker_proxy.clone(), + worker_enumeration_service.clone(), + running_worker_enumeration_service.clone(), + promise_service.clone(), + golem_config.clone(), + shard_service.clone(), + key_value_service.clone(), + blob_store_service.clone(), + oplog_service.clone(), + scheduler_service.clone(), + worker_activator.clone(), + events.clone(), + file_loader.clone(), + plugins.clone(), + oplog_processor_plugin.clone(), + additional_deps.clone(), + )); + Ok(All::new( active_workers, engine, @@ -146,6 +174,7 @@ impl Bootstrap for ServerBootstrap { runtime.clone(), component_service, shard_manager_service, + worker_fork, worker_service, worker_enumeration_service, running_worker_enumeration_service,