Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use message queue for abort_queues #853

Merged
merged 1 commit into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ async def execute_request(self, stream, ident, parent):
self.log.debug("%s", reply_msg)

if not silent and reply_msg['content']['status'] == 'error' and stop_on_error:
await self._abort_queues()
self._abort_queues()

def do_execute(self, code, silent, store_history=True,
user_expressions=None, allow_stdin=False):
Expand Down Expand Up @@ -974,13 +974,31 @@ def _topic(self, topic):

_aborting = Bool(False)

async def _abort_queues(self):
self.shell_stream.flush()
def _abort_queues(self):
# while this flag is true,
# execute requests will be aborted
self._aborting = True
self.log.info("Aborting queue")

# flush streams, so all currently waiting messages
# are added to the queue
self.shell_stream.flush()

# Callback to signal that we are done aborting
def stop_aborting():
self.log.info("Finishing abort")
self._aborting = False
asyncio.get_event_loop().call_later(self.stop_on_error_timeout, stop_aborting)

# put the stop-aborting event on the message queue
# so that all messages already waiting in the queue are aborted
# before we reset the flag
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)

# if we have a delay, give messages this long to arrive on the queue
# before we stop aborting requests
asyncio.get_event_loop().call_later(
self.stop_on_error_timeout, schedule_stop_aborting
)

def _send_abort_reply(self, stream, msg, idents):
"""Send a reply to an aborted request"""
Expand Down
22 changes: 15 additions & 7 deletions ipykernel/tests/test_message_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,23 @@ def test_execute_stop_on_error():
"""execute request should not abort execution queue with stop_on_error False"""
flush_channels()

fail = '\n'.join([
# sleep to ensure subsequent message is waiting in the queue to be aborted
'import time',
'time.sleep(0.5)',
'raise ValueError',
])
fail = "\n".join(
[
# sleep to ensure subsequent message is waiting in the queue to be aborted
# async sleep to ensure coroutines are processing while this happens
"import asyncio",
"await asyncio.sleep(1)",
"raise ValueError()",
]
)
KC.execute(code=fail)
KC.execute(code='print("Hello")')
KC.get_shell_msg(timeout=TIMEOUT)
KC.execute(code='print("world")')
reply = KC.get_shell_msg(timeout=TIMEOUT)
print(reply)
reply = KC.get_shell_msg(timeout=TIMEOUT)
assert reply["content"]["status"] == "aborted"
# second message, too
reply = KC.get_shell_msg(timeout=TIMEOUT)
assert reply['content']['status'] == 'aborted'

Expand Down