Skip to content

Commit

Permalink
Address CR suggestions (minor)
Browse files Browse the repository at this point in the history
  • Loading branch information
azawlocki committed May 28, 2021
1 parent 915b015 commit 648c114
Showing 1 changed file with 12 additions and 16 deletions.
28 changes: 12 additions & 16 deletions yapapi/executor/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,6 @@ class ServiceInstance:
def state(self) -> ServiceState:
return self.service_state.current_state

def get_control_signal(self) -> Optional[ControlSignal]:
try:
return self.control_queue.get_nowait()
except asyncio.QueueEmpty:
return None

def send_control_signal(self, signal: ControlSignal):
self.control_queue.put_nowait(signal)


class Cluster(AsyncContextManager):
"""
Expand Down Expand Up @@ -299,7 +290,7 @@ async def _run_instance(self, ctx: WorkContext):
instance = ServiceInstance(service=self._service_class(self, ctx))
self.__instances.append(instance)

logger.info(f"{instance.service} commissioned")
logger.info("%s commissioned", instance.service)

handler = self._get_handler(instance)

Expand Down Expand Up @@ -332,26 +323,29 @@ async def _run_instance(self, ctx: WorkContext):
batch_task = loop.create_task(handler.asend(wrapped_results))
except StopAsyncIteration:
instance.service_state.lifecycle()
handler = self._get_handler(instance)
logger.debug(f"{instance.service} state changed to {instance.state.value}")
handler = None
batch_task = None

if signal_task in done:
# Process a signal
ctl = signal_task.result()
logger.debug(f"Processing control signal {ctl}")
logger.debug("Processing control signal %s", ctl)
if ctl == ControlSignal.stop:
if instance.state == ServiceState.running:
instance.service_state.stop()
else:
instance.service_state.terminate()
handler = self._get_handler(instance)
handler = None
if batch_task:
batch_task.cancel()
batch_task = None
signal_task = None

logger.info(f"{instance.service} decomissioned")
if handler is None:
handler = self._get_handler(instance)
logger.debug("%s state changed to %s", instance.service, instance.state.value)

logger.debug("No handler for %s in state %s", instance.service, instance.state.value)

if batch_task:
batch_task.cancel()
Expand All @@ -360,6 +354,8 @@ async def _run_instance(self, ctx: WorkContext):
signal_task.cancel()
await signal_task

logger.info("%s decomissioned", instance.service)

async def spawn_instance(self):
logger.debug("spawning instance within %s", self)
spawned = False
Expand Down Expand Up @@ -430,7 +426,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->

def stop_instance(self, service: Service):
instance = self.__get_service_instance(service)
instance.send_control_signal(ControlSignal.stop)
instance.control_queue.put_nowait(ControlSignal.stop)

def spawn_instances(self, num_instances: Optional[int] = None) -> None:
"""
Expand Down

0 comments on commit 648c114

Please sign in to comment.