Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GunicornWebWorker blocking loop with ZMQEventloop #1223

Closed
ssolari opened this issue Sep 29, 2016 · 11 comments
Closed

GunicornWebWorker blocking loop with ZMQEventloop #1223

ssolari opened this issue Sep 29, 2016 · 11 comments
Labels

Comments

@ssolari
Copy link

ssolari commented Sep 29, 2016

I am seeing something similar to MagicStack/uvloop#26 with the ZMQEventloop. Basically just trying to create the ability to asynchronously use zmq to make rpc type calls based on http requests. With web.run_app everything works, but commenting web.run_app and using gunicorn hangs. In the docs it specifically talks about running ZMQ as a background task, so not sure why this doesn't work. Any help is appreciated.

I tried to create the absolute simplest reproducible example of the zmq setup that hangs below.
Is this a case where ZMQ also needs its own specialized worker?

Setup:
pyzmq 15.4.0
aiohttp 1.0.2
python 3.5.2
gunicorn 19.1.0
macosx 10.11.3

import asyncio
import zmq.asyncio
from aiohttp import web

TMP_ENDPOINT = 'tcp://127.0.0.1:12345'


class EchoServer(object):

    def __init__(self):
        loop = asyncio.get_event_loop()
        asyncio.set_event_loop(loop)
        self._context = zmq.asyncio.Context()
        self._poller = zmq.asyncio.Poller()
        self._socket = self._context.socket(zmq.ROUTER)
        self._socket.bind(TMP_ENDPOINT)
        self._poller.register(self._socket, zmq.POLLIN)

        loop.run_until_complete(self.poll_sockets())

    async def poll_sockets(self):

        while True:
            events = await self._poller.poll()
            recv_msg = await self._socket.recv_multipart()
            await self._socket.send_multipart(recv_msg)


class ZmqHandler(object):

    def __init__(self):
        loop = zmq.asyncio.ZMQEventLoop()
        asyncio.set_event_loop(loop)
        self._context = zmq.asyncio.Context()
        self._poller = zmq.asyncio.Poller()
        self._socket = self._context.socket(zmq.DEALER)
        self._socket.connect(TMP_ENDPOINT)
        self._poller.register(self._socket, zmq.POLLIN)
        self._future = None

    async def poll_forever(self):
        while True:
            events = await self._poller.poll()
            await self.handle_events(events)

    async def handle_events(self, events):
        recv_msg = await self._socket.recv_multipart()
        self._future.set_result(recv_msg)

    async def send_msg(self, bytesmsg):
        self._future = asyncio.Future()
        await self._socket.send_multipart([b'', bytesmsg])
        await asyncio.wait_for(self._future, timeout=None)
        return self._future.result()


class MyAiohttpHandler(ZmqHandler):

    def __init__(self):
        super(MyAiohttpHandler, self).__init__()

    async def handle(self, request):
        name = request.match_info.get('name', None)
        result = await self.send_msg(name.encode('utf-8'))
        return web.Response(text=result[-1].decode('utf-8'))


async def start_background_task(app):
    app.loop.create_task(app['async_handler'].poll_forever())


# initialize custom handler with async zmq polling
myhandler = MyAiohttpHandler()

# ensure loop initialized by ZmqHanlder is being used
loop = asyncio.get_event_loop()
assert isinstance(loop, zmq.asyncio.ZMQEventLoop)

# start echo server for experiment
import multiprocessing as mp
p = mp.Process(target=EchoServer)
p.start()

# initialize application
app = web.Application(loop=loop)
app.router.add_route('GET', '/{name}', myhandler.handle)
app['async_handler'] = myhandler
app.on_startup.append(start_background_task) # gunicorn needs something like this?

# commenting this line hangs gunicorn
web.run_app(app, host='127.0.0.1', port=8000, ssl_context=None)
@asvetlov
Copy link
Member

Using zmq.asyncio.ZMQEventLoop is a very bad idea.
Because the loop is based on Poller, which in turn doesn't support many sockets.
zmq may be used with regular lops but its author doesn't want to maintain this mode.

Please just run aiozmq with loop-less transports/streams.

@ssolari
Copy link
Author

ssolari commented Sep 29, 2016

We have a working backend system using pyzmq that talks to itself just fine. The issue is really only where we are interfacing with creating external api's. I'm not against looking switching everything to aiozmq. We just need to look at the work involved there. If the above code could be mapped easily to aiozmq then in principle it should be relatively easy to switch for us.

Hiding the event loop in someways makes the paradigm a little different, so I'm not sure how much we have to think how we have things setup, but I guess there is benefit of hiding the loop and using the asyncio eventloop, with the concept of Transports and Streams.

Is it safe to say a Transport is basically a Socket and could be used in the same way?
The other question would be what is the equivalent of the await poll()? Would it be to set up a monitor and then trigger on event_received()?

Is the issue that gunicorn is opening sockets that the zmq.asyncio.ZMQEventLoop can't handle?

Thanks for the help. We are definitely trying to use all your hard work integrating asyncio.

@asvetlov
Copy link
Member

aiozmq is not a drop-in replacement for pyzmq.asyncio but the usage is similar. Take a look on https://aiozmq.readthedocs.io/en/v0.7.0/stream.html

await poll() is not needed, the loop is ran by loop.run_forever().

ZMQ monitor socket is supported by await stream.read_event().

Returning to zmq.asyncio and gunicorn.
zmq.asyncio can handle all UNIX sockets but it just is not effective for massive TCP socket usage (HTTP server is placed exactly in this category).
Technically possible to write a gunicorn worker class for interoperation with zmq.asyncio.ZMQEventLoop but I don't want to encourage bad practices.

@ssolari
Copy link
Author

ssolari commented Sep 29, 2016

While not incorrect, the docs may be hiding some issues. They lead me to believe that #1092 should have solved all the problems I was having with gunicorn. In the background tasks section

For example the background task could listen to ZeroMQ on zmq.SUB socket, process and forward retrieved messages to clients connected...

The use case above works without gunicorn but not with gunicorn. Or maybe an alert 'caution with the zmq.asyncio.ZMQEventLoop'

@ssolari
Copy link
Author

ssolari commented Sep 29, 2016

re: aiozmq. aiozmq.create_zmq_stream creates a ZmqStream that seems to have a 1-1 correspondence to a zmq.Socket. The advantage of the zmq.Poller() is that multiple sockets are registered with a single poller. Then when we have many sockets (that dynamically come and go) we can have an infinite loop listen_to_all_sockets->handle_events->listen_to_all_sockets ( poll_forever() in my example above). In aiozmq, what object monitors multiple streams simultaneously to handle any events that occur? We need to be able to add and subtract streams from the monitor dynamically.

@asvetlov
Copy link
Member

Looks like by zmq monitor we mean different things.
For me monitor is a special ZMQ socket of type ZMQ_PAIR with very specific behavior: http://api.zeromq.org/4-1:zmq-socket-monitor

If you are talking about regular ZMQ sockets (REQ-REP, PUB-SUB and PUSH-PULL) you should create a aiozmq.Stream per socket. Event loop internally processes their activities, no Poller.poll call is needed.

See also zeromq/pyzmq#894

@ssolari
Copy link
Author

ssolari commented Sep 29, 2016

I see the confusion in the monitor. I was referring to the documentation example where i would like to monitor multiple streams simultaneously:
https://aiozmq.readthedocs.io/en/v0.7.0/examples.html#stream-socket-event-monitor

I'm not sure I understand what you mean saying the event loop internally processes their activities.
In the above example there is a def monitor_stream(stream) function, which is registered via asyncio.Task(monitor_stream(dealer)). I assume you mean that on an event in that stream there is an automatic callback to monitor_stream(stream).

In the above, it seems that a call back function needs to be created for every stream(socket) and then 'registered' as an asyncio.Task with an infinite for loop. This is hard if streams are created dynamically. Or could multiple different streams simply trigger the same monitor_stream? i.e.

dealer1 = yield from aiozmq.create_zmq_stream(zmq.DEALER)
yield from dealer1.transport.enable_monitor()
asyncio.Task(monitor_stream(dealer1))
...
asyncio.Task(monitor_stream(dealer2))

Isn't this the functional equivalent of a poller? Does it make sense to provide the conceptual continuity with zmq.asyncio.Poller() with the benefits of the loopless transport?

In essense an object could be created 'aiozmq.Poller' that implements the monitor_stream(stream) behind the scenes and then simply registers streams, ie.

poller = aiozmq.Poller()
poller.register(dealer1)
poller.register(dealer2)
event = await poller.poll()

where the poller is simply yielding the events from monitor_stream as they arise? Am I missing some core concept or difference here?

@asvetlov
Copy link
Member

One question.
Do you want to receive monitor events about ZMQ socket internal state changing like ZMQ_EVENT_CONNECTED and ZMQ_EVENT_ACCEPTED or just receive socket data payload?

@ssolari
Copy link
Author

ssolari commented Sep 29, 2016

For my purpose we just need to receive socket payload.

On Sep 29, 2016, at 2:20 PM, Andrew Svetlov [email protected] wrote:

One question.
Do you want to receive monitor events about ZMQ socket internal state changing like ZMQ_EVENT_CONNECTED and ZMQ_EVENT_ACCEPTED or just receive socket data payload?


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.

@asvetlov
Copy link
Member

In this case just create a task per ZMQ socket -- it's the most reasonable design.
You might build a custom class with poller-like API on top of this but I see no sense in it.

@asvetlov asvetlov closed this as completed Oct 2, 2016
@lock
Copy link

lock bot commented Oct 29, 2019

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a new issue for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that new issue.

@lock lock bot added the outdated label Oct 29, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 29, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants