Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use gevent and eventlet wait() functions to remove busy-wait #5974

Merged
merged 6 commits into from
Feb 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions celery/backends/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from weakref import WeakKeyDictionary

from kombu.utils.compat import detect_environment
from kombu.utils.objects import cached_property

from celery import states
from celery.exceptions import TimeoutError
Expand Down Expand Up @@ -44,7 +43,7 @@ def start(self):
def stop(self):
pass

def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
def drain_events_until(self, p, timeout=None, interval=1, on_interval=None, wait=None):
wait = wait or self.result_consumer.drain_events
time_start = monotonic()

Expand All @@ -53,7 +52,7 @@ def drain_events_until(self, p, timeout=None, on_interval=None, wait=None):
if timeout and monotonic() - time_start >= timeout:
raise socket.timeout()
try:
yield self.wait_for(p, wait, timeout=1)
yield self.wait_for(p, wait, timeout=interval)
except socket.timeout:
pass
if on_interval:
Expand Down Expand Up @@ -93,28 +92,36 @@ def stop(self):
self._stopped.set()
self._shutdown.wait(THREAD_TIMEOUT_MAX)

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
sleep(0)


@register_drainer('eventlet')
class eventletDrainer(greenletDrainer):

@cached_property
def spawn(self):
from eventlet import spawn
return spawn
def spawn(self, func):
from eventlet import spawn, sleep
g = spawn(func)
sleep(0)
return g

def wait_for(self, p, wait, timeout=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait argument is only used for the non-greenlet (non-gevent, non-eventlet) flow. Sort of confusing to have it around here.

self.start()
if not p.ready:
self._g._exit_event.wait(timeout=timeout)


@register_drainer('gevent')
class geventDrainer(greenletDrainer):

@cached_property
def spawn(self):
from gevent import spawn
return spawn
def spawn(self, func):
from gevent import spawn, sleep
g = spawn(func)
sleep(0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that I needed to sleep(0) in scenarios where we weren't monkeypatching gevent to yield control back to the calling thread.

return g

def wait_for(self, p, wait, timeout=None):
import gevent
self.start()
if not p.ready:
gevent.wait([self._g], timeout=timeout)


class AsyncBackendMixin(object):
Expand Down
211 changes: 211 additions & 0 deletions t/unit/backends/test_asynchronous.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import os
import socket
import time
import threading

import pytest
from case import patch, skip, Mock
from vine import promise

from celery.backends.asynchronous import BaseResultConsumer
from celery.backends.base import Backend
from celery.utils import cached_property


@pytest.fixture(autouse=True)
def setup_eventlet():
# By default eventlet will patch the DNS resolver when imported.
os.environ.update(EVENTLET_NO_GREENDNS='yes')


class DrainerTests(object):
"""
Base test class for the Default / Gevent / Eventlet drainers.
"""

interval = 0.1 # Check every tenth of a second
MAX_TIMEOUT = 10 # Specify a max timeout so it doesn't run forever

def get_drainer(self, environment):
with patch('celery.backends.asynchronous.detect_environment') as d:
d.return_value = environment
backend = Backend(self.app)
consumer = BaseResultConsumer(backend, self.app, backend.accept,
pending_results={},
pending_messages={})
consumer.drain_events = Mock(side_effect=self.result_consumer_drain_events)
return consumer.drainer

@pytest.fixture(autouse=True)
def setup_drainer(self):
raise NotImplementedError

@cached_property
def sleep(self):
"""
Sleep on the event loop.
"""
raise NotImplementedError

def schedule_thread(self, thread):
"""
Set up a thread that runs on the event loop.
"""
raise NotImplementedError

def teardown_thread(self, thread):
"""
Wait for a thread to stop.
"""
raise NotImplementedError

def result_consumer_drain_events(self, timeout=None):
"""
Subclasses should override this method to define the behavior of
drainer.result_consumer.drain_events.
"""
raise NotImplementedError

def test_drain_checks_on_interval(self):
p = promise()

def fulfill_promise_thread():
self.sleep(self.interval * 2)
p('done')

fulfill_thread = self.schedule_thread(fulfill_promise_thread)

on_interval = Mock()
for _ in self.drainer.drain_events_until(p,
on_interval=on_interval,
interval=self.interval,
timeout=self.MAX_TIMEOUT):
pass

self.teardown_thread(fulfill_thread)

assert p.ready, 'Should have terminated with promise being ready'
assert on_interval.call_count < 20, 'Should have limited number of calls to on_interval'

def test_drain_does_not_block_event_loop(self):
"""
This test makes sure that other greenlets can still operate while drain_events_until is
running.
"""
p = promise()
liveness_mock = Mock()

def fulfill_promise_thread():
self.sleep(self.interval * 2)
p('done')

def liveness_thread():
while 1:
if p.ready:
return
self.sleep(self.interval / 10)
liveness_mock()

fulfill_thread = self.schedule_thread(fulfill_promise_thread)
liveness_thread = self.schedule_thread(liveness_thread)

on_interval = Mock()
for _ in self.drainer.drain_events_until(p,
on_interval=on_interval,
interval=self.interval,
timeout=self.MAX_TIMEOUT):
pass

self.teardown_thread(fulfill_thread)
self.teardown_thread(liveness_thread)

assert p.ready, 'Should have terminated with promise being ready'
assert on_interval.call_count < liveness_mock.call_count, \
'Should have served liveness_mock while waiting for event'

def test_drain_timeout(self):
p = promise()
on_interval = Mock()

with pytest.raises(socket.timeout):
for _ in self.drainer.drain_events_until(p,
on_interval=on_interval,
interval=self.interval,
timeout=self.interval * 5):
pass

assert not p.ready, 'Promise should remain un-fulfilled'
assert on_interval.call_count < 20, 'Should have limited number of calls to on_interval'


@skip.unless_module('eventlet')
class test_EventletDrainer(DrainerTests):
@pytest.fixture(autouse=True)
def setup_drainer(self):
self.drainer = self.get_drainer('eventlet')

@cached_property
def sleep(self):
from eventlet import sleep
return sleep

def result_consumer_drain_events(self, timeout=None):
import eventlet
eventlet.sleep(0)

def schedule_thread(self, thread):
import eventlet
g = eventlet.spawn(thread)
eventlet.sleep(0)
return g

def teardown_thread(self, thread):
thread.wait()


class test_Drainer(DrainerTests):
@pytest.fixture(autouse=True)
def setup_drainer(self):
self.drainer = self.get_drainer('default')

@cached_property
def sleep(self):
from time import sleep
return sleep

def result_consumer_drain_events(self, timeout=None):
time.sleep(timeout)

def schedule_thread(self, thread):
t = threading.Thread(target=thread)
t.start()
return t

def teardown_thread(self, thread):
thread.join()


@skip.unless_module('gevent')
class test_GeventDrainer(DrainerTests):
@pytest.fixture(autouse=True)
def setup_drainer(self):
self.drainer = self.get_drainer('gevent')

@cached_property
def sleep(self):
from gevent import sleep
return sleep

def result_consumer_drain_events(self, timeout=None):
import gevent
gevent.sleep(0)

def schedule_thread(self, thread):
import gevent
g = gevent.spawn(thread)
gevent.sleep(0)
return g

def teardown_thread(self, thread):
import gevent
gevent.wait([thread])