Skip to content

Commit

Permalink
Make worker_fork as part of all
Browse files Browse the repository at this point in the history
  • Loading branch information
afsalthaj committed Jan 9, 2025
1 parent 583b9ce commit 66edbbb
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 45 deletions.
11 changes: 3 additions & 8 deletions golem-worker-executor-base/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::model::{InterruptKind, LastError, ListDirectoryResult, ReadFileResult
use crate::services::events::Event;
use crate::services::worker_activator::{DefaultWorkerActivator, LazyWorkerActivator};
use crate::services::worker_event::WorkerEventReceiver;
use crate::services::worker_fork::{DefaultWorkerFork, WorkerFork};
use crate::services::worker_fork::{DefaultWorkerFork, WorkerForkService};
use crate::services::{
All, HasActiveWorkers, HasAll, HasComponentService, HasEvents, HasOplogService, HasPlugins,
HasPromiseService, HasRunningWorkerEnumerationService, HasShardManagerService, HasShardService,
Expand Down Expand Up @@ -142,7 +142,6 @@ pub struct WorkerExecutorImpl<
> {
/// Reference to all the initialized services
services: Svcs,
worker_fork: Arc<dyn WorkerFork + Sync + Send>,
ctx: PhantomData<Ctx>,
}

Expand All @@ -152,7 +151,6 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
fn clone(&self) -> Self {
Self {
services: self.services.clone(),
worker_fork: self.worker_fork.clone(),
ctx: PhantomData,
}
}
Expand All @@ -169,12 +167,8 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
lazy_worker_activator: Arc<LazyWorkerActivator<Ctx>>,
port: u16,
) -> Result<Self, Error> {
let worker_fork: Arc<dyn WorkerFork + Sync + Send> =
Arc::new(DefaultWorkerFork::new(services.clone()));

let worker_executor = WorkerExecutorImpl {
services: services.clone(),
worker_fork,
ctx: PhantomData,
};
let worker_activator = Arc::new(DefaultWorkerActivator::new(services.clone()));
Expand Down Expand Up @@ -460,7 +454,8 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +

let owned_source_worker_id = OwnedWorkerId::new(&account_id, &source_worker_id);

self.worker_fork
self.services
.worker_fork_service()
.fork(
&owned_source_worker_id,
&owned_target_worker_id.worker_id,
Expand Down
17 changes: 17 additions & 0 deletions golem-worker-executor-base/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub trait HasConfig {
fn config(&self) -> Arc<golem_config::GolemConfig>;
}

pub trait HasWorkerForkService {
fn worker_fork_service(&self) -> Arc<dyn worker_fork::WorkerForkService + Send + Sync>;
}

pub trait HasWorkerService {
fn worker_service(&self) -> Arc<dyn worker::WorkerService + Send + Sync>;
}
Expand Down Expand Up @@ -154,6 +158,7 @@ pub trait HasAll<Ctx: WorkerCtx>:
HasActiveWorkers<Ctx>
+ HasComponentService
+ HasConfig
+ HasWorkerForkService
+ HasWorkerService
+ HasWorkerEnumerationService
+ HasRunningWorkerEnumerationService
Expand Down Expand Up @@ -182,6 +187,7 @@ impl<
T: HasActiveWorkers<Ctx>
+ HasComponentService
+ HasConfig
+ HasWorkerForkService
+ HasWorkerService
+ HasWorkerEnumerationService
+ HasRunningWorkerEnumerationService
Expand Down Expand Up @@ -215,6 +221,7 @@ pub struct All<Ctx: WorkerCtx> {
runtime: Handle,
component_service: Arc<dyn component::ComponentService + Send + Sync>,
shard_manager_service: Arc<dyn shard_manager::ShardManagerService + Send + Sync>,
worker_fork: Arc<dyn worker_fork::WorkerForkService + Send + Sync>,
worker_service: Arc<dyn worker::WorkerService + Send + Sync>,
worker_enumeration_service: Arc<dyn worker_enumeration::WorkerEnumerationService + Send + Sync>,
running_worker_enumeration_service:
Expand Down Expand Up @@ -249,6 +256,7 @@ impl<Ctx: WorkerCtx> Clone for All<Ctx> {
runtime: self.runtime.clone(),
component_service: self.component_service.clone(),
shard_manager_service: self.shard_manager_service.clone(),
worker_fork: self.worker_fork.clone(),
worker_service: self.worker_service.clone(),
worker_enumeration_service: self.worker_enumeration_service.clone(),
running_worker_enumeration_service: self.running_worker_enumeration_service.clone(),
Expand Down Expand Up @@ -280,6 +288,7 @@ impl<Ctx: WorkerCtx> All<Ctx> {
runtime: Handle,
component_service: Arc<dyn component::ComponentService + Send + Sync>,
shard_manager_service: Arc<dyn shard_manager::ShardManagerService + Send + Sync>,
worker_fork: Arc<dyn worker_fork::WorkerForkService + Send + Sync>,
worker_service: Arc<dyn worker::WorkerService + Send + Sync>,
worker_enumeration_service: Arc<
dyn worker_enumeration::WorkerEnumerationService + Send + Sync,
Expand Down Expand Up @@ -314,6 +323,7 @@ impl<Ctx: WorkerCtx> All<Ctx> {
runtime,
component_service,
shard_manager_service,
worker_fork,
worker_service,
worker_enumeration_service,
running_worker_enumeration_service,
Expand Down Expand Up @@ -343,6 +353,7 @@ impl<Ctx: WorkerCtx> All<Ctx> {
this.runtime(),
this.component_service(),
this.shard_manager_service(),
this.worker_fork_service(),
this.worker_service(),
this.worker_enumeration_service(),
this.running_worker_enumeration_service(),
Expand Down Expand Up @@ -403,6 +414,12 @@ impl<Ctx: WorkerCtx, T: UsesAllDeps<Ctx = Ctx>> HasConfig for T {
}
}

impl<Ctx: WorkerCtx, T: UsesAllDeps<Ctx = Ctx>> HasWorkerForkService for T {
fn worker_fork_service(&self) -> Arc<dyn worker_fork::WorkerForkService + Send + Sync> {
self.all().worker_fork.clone()
}
}

impl<Ctx: WorkerCtx, T: UsesAllDeps<Ctx = Ctx>> HasWorkerService for T {
fn worker_service(&self) -> Arc<dyn worker::WorkerService + Send + Sync> {
self.all().worker_service.clone()
Expand Down
Loading

0 comments on commit 66edbbb

Please sign in to comment.