Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiplexing command batches and control signals #408

Merged
merged 4 commits into from
May 28, 2021
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 55 additions & 36 deletions yapapi/executor/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


from .. import rest
from ..executor import Golem, Job, Task
from ..executor import Golem, Job
from ..executor.ctx import WorkContext
from ..payload import Payload
from ..props import NodeInfo
Expand Down Expand Up @@ -294,53 +294,72 @@ def _get_handler(instance: ServiceInstance):
return handler()

async def _run_instance(self, ctx: WorkContext):

loop = asyncio.get_event_loop()
instance = ServiceInstance(service=self._service_class(self, ctx))
self.__instances.append(instance)

logger.info(f"{instance.service} commissioned")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


handler = self._get_handler(instance)
batch = None

batch_task: Optional[asyncio.Task] = None
signal_task: Optional[asyncio.Task] = None

while handler:
try:
if batch:
r = yield batch
fr = loop.create_future()
fr.set_result(await r)
batch = await handler.asend(fr)
else:
batch = await handler.__anext__()
except StopAsyncIteration:
instance.service_state.lifecycle()
handler = self._get_handler(instance)
batch = None
logger.debug(f"{instance.service} state changed to {instance.state.value}")

# TODO
#
# two potential issues:
# * awaiting a batch makes us lose an ability to interpret a signal (await on generator won't return)
# * we may be losing a `batch` when we act on the control signal
#
# potential solution:
# * use `aiostream.stream.merge`

ctl = instance.get_control_signal()
if ctl == ControlSignal.stop:
if instance.state == ServiceState.running:
instance.service_state.stop()
else:
instance.service_state.terminate()

logger.debug(f"{instance.service} state changed to {instance.state.value}")

handler = self._get_handler(instance)
batch = None
# Repeatedly wait on one of `(batch_task, signal_task)` to finish.
# If it's the first one, retrieve a batch from its result and handle it.
# If it's the second -- retrieve and handle a signal.
# Any finished task is replaced with a new one, so there are always two.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL :D

always two


if batch_task is None:
batch_task = loop.create_task(handler.__anext__())
if signal_task is None:
signal_task = loop.create_task(instance.control_queue.get())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should remove ServiceInstance.get_control_signal then ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed in 648c114


done, _ = await asyncio.wait(
(batch_task, signal_task), return_when=asyncio.FIRST_COMPLETED
)

if batch_task in done:
# Process a batch
try:
batch = batch_task.result()
fut_result = yield batch
result = await fut_result
wrapped_results = loop.create_future()
wrapped_results.set_result(result)
batch_task = loop.create_task(handler.asend(wrapped_results))
except StopAsyncIteration:
instance.service_state.lifecycle()
handler = self._get_handler(instance)
logger.debug(f"{instance.service} state changed to {instance.state.value}")
batch_task = None

if signal_task in done:
# Process a signal
ctl = signal_task.result()
logger.debug(f"Processing control signal {ctl}")
if ctl == ControlSignal.stop:
if instance.state == ServiceState.running:
instance.service_state.stop()
else:
instance.service_state.terminate()
handler = self._get_handler(instance)
if batch_task:
batch_task.cancel()
batch_task = None
signal_task = None

logger.info(f"{instance.service} decomissioned")

if batch_task:
batch_task.cancel()
await batch_task
if signal_task:
signal_task.cancel()
await signal_task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should go before the above logger message, and maybe a new .debug one should be added above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, 648c114

async def spawn_instance(self):
logger.debug("spawning instance within %s", self)
spawned = False
Expand Down