Skip to content

Commit

Permalink
Merge pull request #408 from golemfactory/az/batch+signal-multiplexer
Browse files Browse the repository at this point in the history
Multiplexing command batches and control signals
  • Loading branch information
azawlocki authored May 28, 2021
2 parents e51c4d2 + 648c114 commit 112c3ba
Showing 1 changed file with 61 additions and 46 deletions.
107 changes: 61 additions & 46 deletions yapapi/executor/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


from .. import rest
from ..executor import Golem, Job, Task
from ..executor import Golem, Job
from ..executor.ctx import WorkContext
from ..payload import Payload
from ..props import NodeInfo
Expand Down Expand Up @@ -181,15 +181,6 @@ class ServiceInstance:
def state(self) -> ServiceState:
return self.service_state.current_state

def get_control_signal(self) -> Optional[ControlSignal]:
try:
return self.control_queue.get_nowait()
except asyncio.QueueEmpty:
return None

def send_control_signal(self, signal: ControlSignal):
self.control_queue.put_nowait(signal)


class Cluster(AsyncContextManager):
"""
Expand Down Expand Up @@ -294,52 +285,76 @@ def _get_handler(instance: ServiceInstance):
return handler()

async def _run_instance(self, ctx: WorkContext):

loop = asyncio.get_event_loop()
instance = ServiceInstance(service=self._service_class(self, ctx))
self.__instances.append(instance)

logger.info(f"{instance.service} commissioned")
logger.info("%s commissioned", instance.service)

handler = self._get_handler(instance)
batch = None

batch_task: Optional[asyncio.Task] = None
signal_task: Optional[asyncio.Task] = None

while handler:
try:
if batch:
r = yield batch
fr = loop.create_future()
fr.set_result(await r)
batch = await handler.asend(fr)
else:
batch = await handler.__anext__()
except StopAsyncIteration:
instance.service_state.lifecycle()
handler = self._get_handler(instance)
batch = None
logger.debug(f"{instance.service} state changed to {instance.state.value}")

# TODO
#
# two potential issues:
# * awaiting a batch makes us lose an ability to interpret a signal (await on generator won't return)
# * we may be losing a `batch` when we act on the control signal
#
# potential solution:
# * use `aiostream.stream.merge`

ctl = instance.get_control_signal()
if ctl == ControlSignal.stop:
if instance.state == ServiceState.running:
instance.service_state.stop()
else:
instance.service_state.terminate()

logger.debug(f"{instance.service} state changed to {instance.state.value}")
# Repeatedly wait on one of `(batch_task, signal_task)` to finish.
# If it's the first one, retrieve a batch from its result and handle it.
# If it's the second -- retrieve and handle a signal.
# Any finished task is replaced with a new one, so there are always two.

if batch_task is None:
batch_task = loop.create_task(handler.__anext__())
if signal_task is None:
signal_task = loop.create_task(instance.control_queue.get())

done, _ = await asyncio.wait(
(batch_task, signal_task), return_when=asyncio.FIRST_COMPLETED
)

if batch_task in done:
# Process a batch
try:
batch = batch_task.result()
fut_result = yield batch
result = await fut_result
wrapped_results = loop.create_future()
wrapped_results.set_result(result)
batch_task = loop.create_task(handler.asend(wrapped_results))
except StopAsyncIteration:
instance.service_state.lifecycle()
handler = None
batch_task = None

if signal_task in done:
# Process a signal
ctl = signal_task.result()
logger.debug("Processing control signal %s", ctl)
if ctl == ControlSignal.stop:
if instance.state == ServiceState.running:
instance.service_state.stop()
else:
instance.service_state.terminate()
handler = None
if batch_task:
batch_task.cancel()
batch_task = None
signal_task = None

if handler is None:
handler = self._get_handler(instance)
batch = None
logger.debug("%s state changed to %s", instance.service, instance.state.value)

logger.debug("No handler for %s in state %s", instance.service, instance.state.value)

if batch_task:
batch_task.cancel()
await batch_task
if signal_task:
signal_task.cancel()
await signal_task

logger.info(f"{instance.service} decomissioned")
logger.info("%s decomissioned", instance.service)

async def spawn_instance(self):
logger.debug("spawning instance within %s", self)
Expand Down Expand Up @@ -411,7 +426,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->

def stop_instance(self, service: Service):
instance = self.__get_service_instance(service)
instance.send_control_signal(ControlSignal.stop)
instance.control_queue.put_nowait(ControlSignal.stop)

def spawn_instances(self, num_instances: Optional[int] = None) -> None:
"""
Expand Down

0 comments on commit 112c3ba

Please sign in to comment.