diff --git a/neovim/api/nvim.py b/neovim/api/nvim.py index e34c48d0..e800c20d 100644 --- a/neovim/api/nvim.py +++ b/neovim/api/nvim.py @@ -322,6 +322,20 @@ def new_highlight_source(self): """Return new src_id for use with Buffer.add_highlight.""" return self.current.buffer.add_highlight("", 0, src_id=0) + def _error_wrapper(self, fn, call_point, *args, **kwargs): + if fn is None: + return None + def handler(): + try: + fn(*args, **kwargs) + except Exception as err: + msg = ("error caught while executing async callback:\n" + "{0!r}\n{1}\n \nthe call was requested at\n{2}" + .format(err, format_exc_skip(1, 5), call_point)) + self._err_cb(msg) + raise + return handler + def async_call(self, fn, *args, **kwargs): """Schedule `fn` to be called by the event loop soon. @@ -333,18 +347,33 @@ def async_call(self, fn, *args, **kwargs): that shouldn't block neovim. """ call_point = ''.join(format_stack(None, 5)[:-1]) + handler = self._error_wrapper(fn, call_point, *args, **kwargs) - def handler(): - try: - fn(*args, **kwargs) - except Exception as err: - msg = ("error caught while executing async callback:\n" - "{0!r}\n{1}\n \nthe call was requested at\n{2}" - .format(err, format_exc_skip(1, 5), call_point)) - self._err_cb(msg) - raise self._session.threadsafe_call(handler) + def poll_fd(self, fd, on_readable=None, on_writable=None, greenlet=True): + """ + Invoke callbacks when the fd is ready for reading and/or writing. if + `on_readable` is not None, it should be callback, which will be invoked + (with no arguments) when the fd is ready for writing. Similarily if + `on_writable` is not None it will be invoked when the fd is ready for + writing. + + Only one callback (of each kind) can be registered on the same fd at a + time. If both readability and writability should be monitored, both + callbacks must be registered by the same `poll_fd` call. + + By default, the function is invoked in a greenlet, just like a callback + scheduled by async_call. + + Returns a function that deactivates the callback(s). + """ + call_point = ''.join(format_stack(None, 5)[:-1]) + on_readable = self._error_wrapper(on_readable, call_point) + on_writable = self._error_wrapper(on_writable, call_point) + return self._session.poll_fd(fd, on_readable, on_writable, greenlet) + + class Buffers(object): diff --git a/neovim/msgpack_rpc/async_session.py b/neovim/msgpack_rpc/async_session.py index aa95088c..b2ac35e4 100644 --- a/neovim/msgpack_rpc/async_session.py +++ b/neovim/msgpack_rpc/async_session.py @@ -32,6 +32,10 @@ def threadsafe_call(self, fn): """Wrapper around `MsgpackStream.threadsafe_call`.""" self._msgpack_stream.threadsafe_call(fn) + def poll_fd(self, fd, on_readable, on_writable): + """Wrapper around `BaseEventLoop.poll_fd`.""" + return self._msgpack_stream.poll_fd(fd, on_readable, on_writable) + def request(self, method, args, response_cb): """Send a msgpack-rpc request to Nvim. diff --git a/neovim/msgpack_rpc/event_loop/asyncio.py b/neovim/msgpack_rpc/event_loop/asyncio.py index 1b281354..7a4eafc1 100644 --- a/neovim/msgpack_rpc/event_loop/asyncio.py +++ b/neovim/msgpack_rpc/event_loop/asyncio.py @@ -113,6 +113,18 @@ def _stop(self): def _threadsafe_call(self, fn): self._loop.call_soon_threadsafe(fn) + def _poll_fd(self, fd, on_readable, on_writable): + if on_readable is not None: + self._loop.add_reader(fd, on_readable) + if on_writable is not None: + self._loop.add_writer(fd, on_writable) + def cancel(): + if on_readable is not None: + self._loop.remove_reader(fd) + if on_writable is not None: + self._loop.remove_writer(fd) + return cancel + def _setup_signals(self, signals): if os.name == 'nt': # add_signal_handler is not supported in win32 diff --git a/neovim/msgpack_rpc/event_loop/base.py b/neovim/msgpack_rpc/event_loop/base.py index a05299e9..ba9dbe00 100644 --- a/neovim/msgpack_rpc/event_loop/base.py +++ b/neovim/msgpack_rpc/event_loop/base.py @@ -121,6 +121,24 @@ def threadsafe_call(self, fn): """ self._threadsafe_call(fn) + def poll_fd(self, fd, on_readable=None, on_writable=None): + """ + Invoke callbacks when the fd is ready for reading and/or writing. if + `on_readable` is not None, it should be callback, which will be invoked + (with no arguments) when the fd is ready for writing. Similarily if + `on_writable` is not None it will be invoked when the fd is ready for + writing. + + Only one callback (of each kind) can be registered on the same fd at a + time. If both readability and writability should be monitored, both + callbacks must be registered by the same `poll_fd` call. + + Returns a function that deactivates the callback(s). + """ + if on_readable is None and on_writable is None: + raise ValueError("poll_fd: At least one of `on_readable` and `on_writable` must be present") + return self._poll_fd(fd, on_readable, on_writable) + def run(self, data_cb): """Run the event loop.""" if self._error: diff --git a/neovim/msgpack_rpc/event_loop/uv.py b/neovim/msgpack_rpc/event_loop/uv.py index 73daab42..1d6a7454 100644 --- a/neovim/msgpack_rpc/event_loop/uv.py +++ b/neovim/msgpack_rpc/event_loop/uv.py @@ -106,6 +106,22 @@ def _on_async(self, handle): while self._callbacks: self._callbacks.popleft()() + def _poll_fd(self, fd, on_readable, on_writable): + poll = pyuv.Poll(self._loop, fd) + events = 0 + if on_readable is not None: + events |= pyuv.UV_READABLE + if on_writable is not None: + events |= pyuv.UV_WRITABLE + def callback(poll_handle, evts, errorno): + if evts & pyuv.UV_READABLE: + on_readable() + if evts & pyuv.UV_WRITABLE: + on_writable() + + poll.start(events, callback) + return poll.stop + def _setup_signals(self, signals): self._signal_handles = [] diff --git a/neovim/msgpack_rpc/msgpack_stream.py b/neovim/msgpack_rpc/msgpack_stream.py index 62f85597..d0cf6825 100644 --- a/neovim/msgpack_rpc/msgpack_stream.py +++ b/neovim/msgpack_rpc/msgpack_stream.py @@ -28,6 +28,10 @@ def threadsafe_call(self, fn): """Wrapper around `BaseEventLoop.threadsafe_call`.""" self._event_loop.threadsafe_call(fn) + def poll_fd(self, fd, on_readable, on_writable): + """Wrapper around `BaseEventLoop.poll_fd`.""" + return self._event_loop.poll_fd(fd, on_readable, on_writable) + def send(self, msg): """Queue `msg` for sending to Nvim.""" debug('sent %s', msg) diff --git a/neovim/msgpack_rpc/session.py b/neovim/msgpack_rpc/session.py index e2a49946..695cd774 100644 --- a/neovim/msgpack_rpc/session.py +++ b/neovim/msgpack_rpc/session.py @@ -28,8 +28,10 @@ def __init__(self, async_session): self._is_running = False self._setup_exception = None - def threadsafe_call(self, fn, *args, **kwargs): - """Wrapper around `AsyncSession.threadsafe_call`.""" + def _wrap_greenlet(self, fn, *args, **kwargs): + if fn is None: + return None + def handler(): try: fn(*args, **kwargs) @@ -41,8 +43,23 @@ def greenlet_wrapper(): gr = greenlet.greenlet(handler) gr.switch() + return greenlet_wrapper + + def threadsafe_call(self, fn, *args, **kwargs): + """Wrapper around `AsyncSession.threadsafe_call`.""" + + greenlet_wrapper = self._wrap_greenlet(fn, *args, **kwargs) self._async_session.threadsafe_call(greenlet_wrapper) + def poll_fd(self, fd, on_readable, on_writable, greenlet=True): + """Wrapper around `AsyncSession.threadsafe_call`.""" + if greenlet: + on_readable = self._wrap_greenlet(on_readable) + on_writable = self._wrap_greenlet(on_writable) + + return self._async_session.poll_fd(fd, on_readable, on_writable) + + def next_message(self): """Block until a message(request or notification) is available.