Skip to content

Commit

Permalink
Replace gevent-websocket with simple-websocket when using gevent
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Sep 9, 2023
1 parent fae2635 commit 614f564
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 129 deletions.
20 changes: 1 addition & 19 deletions docs/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -521,18 +521,13 @@ Gevent

`Gevent <http://gevent.org>`_ is another asynchronous framework based on
coroutines, very similar to eventlet. An Engine.IO server deployed with
gevent has access to the long-polling transport. If project
`gevent-websocket <https://bitbucket.org/Jeffrey/gevent-websocket/>`_ is
installed, the WebSocket transport is also available. Note that when using the
uWSGI server, the native WebSocket implementation of uWSGI can be used instead
of gevent-websocket (see next section for details on this).
gevent has access to the long-polling and websocket transports.

Instances of class ``engineio.Server`` will automatically use gevent for
asynchronous operations if the library is installed and eventlet is not
installed. To request gevent to be selected explicitly, the ``async_mode``
option can be given in the constructor::

# gevent alone or with gevent-websocket
eio = engineio.Server(async_mode='gevent')

A server configured for gevent is deployed as a regular WSGI application
Expand All @@ -542,15 +537,6 @@ using the provided ``engineio.WSGIApp``::
app = engineio.WSGIApp(eio)
pywsgi.WSGIServer(('', 8000), app).serve_forever()

If the WebSocket transport is installed, then the server must be started as
follows::

from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
app = engineio.WSGIApp(eio)
pywsgi.WSGIServer(('', 8000), app,
handler_class=WebSocketHandler).serve_forever()

Gevent with Gunicorn
~~~~~~~~~~~~~~~~~~~~

Expand All @@ -560,10 +546,6 @@ command to launch the application under gunicorn is shown below::

$ gunicorn -k gevent -w 1 module:app

Or to include WebSocket::

$ gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -w 1 module: app

Same as with eventlet, due to limitations in its load balancing algorithm,
gunicorn can only be used with one worker process, so the ``-w 1`` option is
required. Note that a single gevent worker can handle a large number of
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package_dir =
packages = find:
python_requires = >=3.6
install_requires =
simple-websocket >= 0.10.0

[options.packages.find]
where = src
Expand Down
34 changes: 34 additions & 0 deletions src/engineio/async_drivers/_websocket_wsgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import simple_websocket


class SimpleWebSocketWSGI: # pragma: no cover
"""
This wrapper class provides a threading WebSocket interface that is
compatible with eventlet's implementation.
"""
def __init__(self, handler, server, **kwargs):
self.app = handler
self.server_args = kwargs

def __call__(self, environ, start_response):
self.ws = simple_websocket.Server(environ, **self.server_args)
ret = self.app(self)
if self.ws.mode == 'gunicorn':
raise StopIteration()
return ret

def close(self):
if self.ws.connected:
self.ws.close()

def send(self, message):
try:
return self.ws.send(message)
except simple_websocket.ConnectionClosed:
raise IOError()

def wait(self):
try:
return self.ws.receive()
except simple_websocket.ConnectionClosed:
return None
82 changes: 51 additions & 31 deletions src/engineio/async_drivers/gevent.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from __future__ import absolute_import

import gevent
from gevent import queue
from gevent.event import Event
try:
# use gevent-websocket if installed
import geventwebsocket # noqa
_websocket_available = True
except ImportError:
_websocket_available = False
SimpleWebSocketWSGI = None
except ImportError: # pragma: no cover
# fallback to simple_websocket when gevent-websocket is not installed
from engineio.async_drivers._websocket_wsgi import SimpleWebSocketWSGI


class Thread(gevent.Greenlet): # pragma: no cover
Expand All @@ -22,42 +22,62 @@ def _run(self):
return self.run()


class WebSocketWSGI(object): # pragma: no cover
"""
This wrapper class provides a gevent WebSocket interface that is
compatible with eventlet's implementation.
"""
def __init__(self, handler, server):
self.app = handler
if SimpleWebSocketWSGI is not None:
class WebSocketWSGI(SimpleWebSocketWSGI): # pragma: no cover
"""
This wrapper class provides a gevent WebSocket interface that is
compatible with eventlet's implementation, using the simple-websocket
package.
"""
def __init__(self, handler, server):
# to avoid the requirement that the standard library is
# monkey-patched, here we pass the gevent versions of the
# concurrency and networking classes required by simple-websocket
import gevent.event
import gevent.selectors
super().__init__(handler, server,
thread_class=Thread,
event_class=gevent.event.Event,
selector_class=gevent.selectors.DefaultSelector)
else:
class WebSocketWSGI: # pragma: no cover
"""
This wrapper class provides a gevent WebSocket interface that is
compatible with eventlet's implementation, using the gevent-websocket
package.
"""
def __init__(self, handler, server):
self.app = handler

def __call__(self, environ, start_response):
if 'wsgi.websocket' not in environ:
raise RuntimeError('You need to use the gevent-websocket server. '
'See the Deployment section of the '
'documentation for more information.')
self._sock = environ['wsgi.websocket']
self.environ = environ
self.version = self._sock.version
self.path = self._sock.path
self.origin = self._sock.origin
self.protocol = self._sock.protocol
return self.app(self)
def __call__(self, environ, start_response):
if 'wsgi.websocket' not in environ:
raise RuntimeError('The gevent-websocket server is not '
'configured appropriately. '
'See the Deployment section of the '
'documentation for more information.')
self._sock = environ['wsgi.websocket']
self.environ = environ
self.version = self._sock.version
self.path = self._sock.path
self.origin = self._sock.origin
self.protocol = self._sock.protocol
return self.app(self)

def close(self):
return self._sock.close()
def close(self):
return self._sock.close()

def send(self, message):
return self._sock.send(message)
def send(self, message):
return self._sock.send(message)

def wait(self):
return self._sock.receive()
def wait(self):
return self._sock.receive()


_async = {
'thread': Thread,
'queue': queue.JoinableQueue,
'queue_empty': queue.Empty,
'event': Event,
'websocket': WebSocketWSGI if _websocket_available else None,
'websocket': WebSocketWSGI,
'sleep': gevent.sleep,
}
42 changes: 2 additions & 40 deletions src/engineio/async_drivers/threading.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,19 @@
from __future__ import absolute_import
import queue
import threading
import time

try:
from simple_websocket import Server, ConnectionClosed
_websocket_available = True
except ImportError: # pragma: no cover
_websocket_available = False
from engineio.async_drivers._websocket_wsgi import SimpleWebSocketWSGI


class DaemonThread(threading.Thread): # pragma: no cover
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, daemon=True)


class WebSocketWSGI(object): # pragma: no cover
"""
This wrapper class provides a threading WebSocket interface that is
compatible with eventlet's implementation.
"""
def __init__(self, handler, server):
self.app = handler

def __call__(self, environ, start_response):
self.ws = Server(environ)
ret = self.app(self)
if self.ws.mode == 'gunicorn':
raise StopIteration()
return ret

def close(self):
if self.ws.connected:
self.ws.close()

def send(self, message):
try:
return self.ws.send(message)
except ConnectionClosed:
raise IOError()

def wait(self):
try:
return self.ws.receive()
except ConnectionClosed:
return None


_async = {
'thread': DaemonThread,
'queue': queue.Queue,
'queue_empty': queue.Empty,
'event': threading.Event,
'websocket': WebSocketWSGI if _websocket_available else None,
'websocket': SimpleWebSocketWSGI,
'sleep': time.sleep,
}
40 changes: 1 addition & 39 deletions tests/common/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,10 @@ def test_async_mode_threading(self):

assert s._async['thread'] == async_threading.DaemonThread
assert s._async['queue'] == queue.Queue
assert s._async['websocket'] == async_threading.WebSocketWSGI
assert s._async['websocket'] == async_threading.SimpleWebSocketWSGI
del sys.modules['simple_websocket']
del sys.modules['engineio.async_drivers.threading']

def test_async_mode_threading_without_websocket(self):
s = server.Server(async_mode='threading')
assert s.async_mode == 'threading'

from engineio.async_drivers import threading as async_threading
import queue

assert s._async['thread'] == async_threading.DaemonThread
assert s._async['queue'] == queue.Queue
assert s._async['websocket'] is None
del sys.modules['engineio.async_drivers.threading']

def test_async_mode_eventlet(self):
s = server.Server(async_mode='eventlet')
assert s.async_mode == 'eventlet'
Expand Down Expand Up @@ -229,32 +217,6 @@ def test_async_mode_gevent(self, import_module):
del sys.modules['geventwebsocket']
del sys.modules['engineio.async_drivers.gevent']

@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_without_websocket(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['gevent'].queue = mock.MagicMock()
sys.modules['gevent.queue'] = sys.modules['gevent'].queue
sys.modules['gevent.queue'].JoinableQueue = 'foo'
sys.modules['gevent.queue'].Empty = RuntimeError
sys.modules['gevent.event'] = mock.MagicMock()
sys.modules['gevent.event'].Event = 'bar'
sys.modules['geventwebsocket'] = None
s = server.Server(async_mode='gevent')
assert s.async_mode == 'gevent'

from engineio.async_drivers import gevent as async_gevent

assert s._async['thread'] == async_gevent.Thread
assert s._async['queue'] == 'foo'
assert s._async['queue_empty'] == RuntimeError
assert s._async['event'] == 'bar'
assert s._async['websocket'] is None
del sys.modules['gevent']
del sys.modules['gevent.queue']
del sys.modules['gevent.event']
del sys.modules['geventwebsocket']
del sys.modules['engineio.async_drivers.gevent']

@unittest.skipIf(sys.version_info < (3, 5), 'only for Python 3.5+')
@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_aiohttp(self, import_module):
Expand Down

0 comments on commit 614f564

Please sign in to comment.