Skip to content

Commit

Permalink
Merge pull request #506 from davidbrochart/async_client
Browse files Browse the repository at this point in the history
Add async API
  • Loading branch information
SylvainCorlay authored Feb 12, 2020
2 parents c1b85ea + e0c8db0 commit 284fea0
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 2 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ python:
- "3.7"
- "3.6"
- "3.5"
- "2.7"
install:
- pip install --upgrade setuptools pip
- pip install --upgrade --upgrade-strategy eager --pre -e .[test] pytest-cov codecov 'coverage<5'
Expand Down
1 change: 1 addition & 0 deletions jupyter_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from .client import KernelClient
from .manager import KernelManager, run_kernel
from .blocking import BlockingKernelClient
from .asynchronous import AsyncKernelClient
from .multikernelmanager import MultiKernelManager
1 change: 1 addition & 0 deletions jupyter_client/asynchronous/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .client import AsyncKernelClient
82 changes: 82 additions & 0 deletions jupyter_client/asynchronous/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Async channels"""

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from queue import Queue, Empty


class ZMQSocketChannel(object):
"""A ZMQ socket in an async API"""
session = None
socket = None
stream = None
_exiting = False
proxy_methods = []

def __init__(self, socket, session, loop=None):
"""Create a channel.
Parameters
----------
socket : :class:`zmq.asyncio.Socket`
The ZMQ socket to use.
session : :class:`session.Session`
The session to use.
loop
Unused here, for other implementations
"""
super(ZMQSocketChannel, self).__init__()

self.socket = socket
self.session = session

async def _recv(self, **kwargs):
msg = await self.socket.recv_multipart(**kwargs)
ident,smsg = self.session.feed_identities(msg)
return self.session.deserialize(smsg)

async def get_msg(self, timeout=None):
""" Gets a message if there is one that is ready. """
if timeout is not None:
timeout *= 1000 # seconds to ms
ready = await self.socket.poll(timeout)

if ready:
return await self._recv()
else:
raise Empty

async def get_msgs(self):
""" Get all messages that are currently ready. """
msgs = []
while True:
try:
msgs.append(await self.get_msg())
except Empty:
break
return msgs

async def msg_ready(self):
""" Is there a message that has been received? """
return bool(await self.socket.poll(timeout=0))

def close(self):
if self.socket is not None:
try:
self.socket.close(linger=0)
except Exception:
pass
self.socket = None
stop = close

def is_alive(self):
return (self.socket is not None)

def send(self, msg):
"""Pass a message to the ZMQ socket to send
"""
self.session.send(self.socket, msg)

def start(self):
pass
Loading

0 comments on commit 284fea0

Please sign in to comment.