-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiplexing command batches and control signals #408
Conversation
# Repeatedly wait on one of `(batch_task, signal_task)` to finish. | ||
# If it's the first one, retrieve a batch from its result and handle it. | ||
# If it's the second -- retrieve and handle a signal. | ||
# Any finished task is replaced with a new one, so there are always two. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if batch_task is None: | ||
batch_task = loop.create_task(handler.__anext__()) | ||
if signal_task is None: | ||
signal_task = loop.create_task(instance.control_queue.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should remove ServiceInstance.get_control_signal
then ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed in 648c114
yapapi/executor/services.py
Outdated
if batch_task: | ||
batch_task.cancel() | ||
await batch_task | ||
if signal_task: | ||
signal_task.cancel() | ||
await signal_task | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should go before the above logger message, and maybe a new .debug
one should be added above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, 648c114
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments above ^
@@ -299,7 +290,7 @@ async def _run_instance(self, ctx: WorkContext): | |||
instance = ServiceInstance(service=self._service_class(self, ctx)) | |||
self.__instances.append(instance) | |||
|
|||
logger.info(f"{instance.service} commissioned") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR implements processing of a stream of command batches generated by a service state handler, interleaved with a stream control signals.
Resolves #396