diff --git a/yapapi/services.py b/yapapi/services.py index dbf93058e..e38605d19 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -438,6 +438,13 @@ def started_successfully(self) -> bool: return ServiceState.running in self.visited_states +class ClusterState(statemachine.StateMachine): + running = statemachine.State("running", initial=True) + stopping = statemachine.State("stopping") + + stop = running.to(stopping) + + class Cluster(AsyncContextManager): """Golem's sub-engine used to spawn and control instances of a single :class:`Service`.""" @@ -483,6 +490,8 @@ def __init__( self._network: Optional[Network] = network + self.__state = ClusterState() + @property def expiration(self) -> datetime: """Return the expiration datetime for agreements related to services in this :class:`Cluster`.""" @@ -761,13 +770,14 @@ async def _worker( service=self._service_class(self, work_context, network_node=node, **params) # type: ignore ) try: - instance_batches = self._run_instance(instance) - try: - await self._engine.process_batches( - self._job.id, agreement.id, activity, instance_batches - ) - except StopAsyncIteration: - pass + if self._state == ClusterState.running: + instance_batches = self._run_instance(instance) + try: + await self._engine.process_batches( + self._job.id, agreement.id, activity, instance_batches + ) + except StopAsyncIteration: + pass self.emit( events.TaskFinished( @@ -781,7 +791,7 @@ async def _worker( await self._engine.accept_payments_for_agreement(self._job.id, agreement.id) await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False) - while instance is None: + while instance is None and self._state == ClusterState.running: agreement_id = None await asyncio.sleep(1.0) task = await self._engine.start_worker(self._job, _worker) @@ -884,5 +894,12 @@ def spawn_instances( def stop(self): """Signal the whole :class:`Cluster` to stop.""" + self.__state.stop() + for s in self.instances: self.stop_instance(s) + + @property + def _state(self): + """Current state of the Cluster.""" + return self.__state.current_state