Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed task are hanging the test #53

Open
husio opened this issue Sep 12, 2019 · 0 comments
Open

Failed task are hanging the test #53

husio opened this issue Sep 12, 2019 · 0 comments

Comments

@husio
Copy link

husio commented Sep 12, 2019

When running a test that executes a background task that fails, test is hanging forever.

I am expecting that when calling broker.join with fail_fast=True, exception raised in the task will be propagated in the test into the broker.join call. Instead the test never finish.

Here is an example code, I can share the whole example Django project if needed.

# myapp/tests.py
@pytest.mark.django_db(transaction=True)
def test_register_user_task_using_invalid_input(broker, worker):
    tasks.register_user.send("")
    broker.join(tasks.register_user.queue_name, fail_fast=True)
    worker.join()
    assert User.objects.count() == 1

# myapp/tasks.py
@dramatiq.actor
def register_user(email):
    if not email:
        raise Exception("empty")
    user = User.objects.create(email=email)
    return user.id
platform linux -- Python 3.7.3, pytest-5.1.2, py-1.8.0, pluggy-0.13.0
Django settings: testapp.settings (from environment variable)
rootdir: /home/piotr/testapp, inifile: setup.cfg
plugins: django-3.5.1
.venv/lib/python3.7/site-packages/_pytest/python.py:170: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

broker = <dramatiq.brokers.stub.StubBroker object at 0x7efd94f00320>, worker = <dramatiq.worker.Worker object at 0x7efd8efc3c50>

    @pytest.mark.django_db(transaction=True)
    def test_register_user_task_using_invalid_input(broker, worker):
        tasks.register_user.send("")
>       broker.join(tasks.register_user.queue_name, fail_fast=True)

myapp/tests.py:27: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dramatiq.brokers.stub.StubBroker object at 0x7efd94f00320>, queue_name = 'default'

    def join(self, queue_name, *, fail_fast=False, timeout=None):
        """Wait for all the messages on the given queue to be
        processed.  This method is only meant to be used in tests
        to wait for all the messages in a queue to be processed.
    
        Raises:
          QueueJoinTimeout: When the timeout elapses.
          QueueNotFound: If the given queue was never declared.
    
        Parameters:
          queue_name(str): The queue to wait on.
          fail_fast(bool): When this is True and any message gets
            dead-lettered during the join, then an exception will be
            raised.  This will be True by default starting with
            version 2.0.
          timeout(Optional[int]): The max amount of time, in
            milliseconds, to wait on this queue.
        """
        try:
            queues = [
                self.queues[queue_name],
                self.queues[dq_name(queue_name)],
            ]
        except KeyError:
            raise QueueNotFound(queue_name)
    
        deadline = timeout and time.monotonic() + timeout / 1000
        while True:
            for queue in queues:
                timeout = deadline and deadline - time.monotonic()
>               join_queue(queue, timeout=timeout)

.venv/lib/python3.7/site-packages/dramatiq/brokers/stub.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

queue = <queue.Queue object at 0x7efd94ca4ac8>, timeout = None

    def join_queue(queue, timeout=None):
        """The join() method of standard queues in Python doesn't support
        timeouts.  This implements the same functionality as that method,
        with optional timeout support, by depending the internals of
        Queue.
    
        Raises:
          QueueJoinTimeout: When the timeout is reached.
    
        Parameters:
          timeout(Optional[float])
        """
        with queue.all_tasks_done:
            while queue.unfinished_tasks:
>               finished_in_time = queue.all_tasks_done.wait(timeout=timeout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant