Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing with database and pytest #52

Open
husio opened this issue Sep 6, 2019 · 6 comments
Open

Testing with database and pytest #52

husio opened this issue Sep 6, 2019 · 6 comments

Comments

@husio
Copy link

husio commented Sep 6, 2019

I am using pytest-django to test my application and test setup like descripbed in the README file. I have a task that registers a user in database and to test it I have the following code:

# tasks.py
@dramatiq.actor
def register_user(email):
    user = User.objects.create(email=email)
    return user.id

# tests.py
@pytest.mark.django_db
def test_register_user_task(broker, worker):
    tasks.register_user.send("[email protected]")
    broker.join(tasks.register_user.queue_name)
    worker.join()
    assert User.objects.count() == 1

The test fails, because no user can be found. I believe this is because each test is running in its own database isolation. I believe because dramatiq is using threads for its workers, each worker is getting a separate view of the database state, that is not shared with the main test process. Is this correct?

If my understanding is correct, would it make sense to provide a task worker that runs synchronously in the same thread/process as the test? I have wrote a prototype for such approach and it seems to be working as expected. Would it be helpful to include it in either dramatiq or django_dramatiq?

@Bogdanp
Copy link
Owner

Bogdanp commented Sep 7, 2019

IIRC the default mode in tests is to handle db ops inside a transaction and then roll it back. In order for data to be visible outside of the transaction (i.e. in the worker threads), the transaction needs to be committed. What you need to do, I think, is pass the transaction=True parameter to django_db, like so:

@pytest.mark.django_db(transaction=True)
def test_register_user_task(broker, worker):
    tasks.register_user.send("[email protected]")
    broker.join(tasks.register_user.queue_name)
    worker.join()
    assert User.objects.count() == 1

would it make sense to provide a task worker that runs synchronously in the same thread/process as the test

This has been brought up a couple of times, and I'm still not sure exactly where I sit on the issue. On the one hand, yes, the way things work right now does make testing harder than it otherwise would be. On the other, the way things work right now means that the tests better reflect the real world behavior of your application, which, IME, is more valuable than ease of use.

@husio
Copy link
Author

husio commented Sep 9, 2019

I was trying to reproduce my issues with the database using a freshly created Django test application. Background processing, task joining and database operations all work as advertised in my test application. I was not able to reproduce the problem, so I tried the same in my production application. This time everything seems to be working as expected. Adding transaction=True is important and I don't know what I did before that the test was not passing.

I think having a synchronous task worker would be a nice addition. It might be confusing for the user why there are two worker implementations and which one is the right choice for testing. I think this is a library design decision that you can do much better 😉 Please close this issue if you decide to not add synchronous tasks processing.

This is the code that I wrote in order to process the tasks synchronously. Calling wait_for_tasks is equal to processing all queued tasks. I don't think is a proper implementation. I leave it here because maybe it will provide any value for future discussion or reference.

# conftest.py
@pytest.fixture()
def wait_for_task():
    broker = dramatiq.get_broker()
    broker.flush_all()
    worker = SynchronousWorker(broker, worker_timeout=100)
    worker.start()

    def wait_for_task(*task_funcs):
        queue_names = set(fn.queue_name for fn in task_funcs)
        worker.process_sync()
        broker.join(*queue_names, fail_fast=True)

    yield wait_for_task


class SynchronousWorker(worker.Worker):
    def _add_worker(self):
        worker = WorkerLocalThread(
                broker=self.broker,
                consumers=self.consumers,
                work_queue=self.work_queue,
                worker_timeout=self.worker_timeout
                )
        worker.start()
        self.workers.append(worker)

    def process_sync(self):
        for w in self.workers:
            w.process_sync()


class WorkerLocalThread(worker._WorkerThread):
    def start(self):
        # Overwrite this method and avoid creating a separate thread.
        # No super().start()
        pass

    def process_sync(self):
        while not self.work_queue.empty():
            _, message = self.work_queue.get()
            self.process_message(message)
        self.broker.emit_before("worker_thread_shutdown", self)

    def join(self, timeout=0):
        pass

@hassek
Copy link

hassek commented Mar 20, 2020

Hi,

I believe it would be very useful to mention in the documentation the transaction=True, this is not something that obvious and would help many (including me) avoid spending hours debugging the issue.

@Guest007
Copy link

Guest007 commented Apr 30, 2021

How can I use wait_for_task fixture in tests? Please, give an example. Thank you

@Guest007
Copy link

Guest007 commented May 7, 2021

My private working solution:

from dramatiq import actor as dramatiq_actor

class ActorForTest:
    def __init__(self, *args, **kwargs):
        self.fn = args[0]

    def send(self, *args, **kwargs):
        return self.fn(*args, **kwargs)

def actor(*args, actor_class=None, **kwargs):
    if TEST_DRAMATIQ:
        return in_test_actor(*args, **kwargs)
    return dramatiq_actor(*args, actor_class=actor_class, **kwargs)

def in_test_actor(fn=None, *, actor_class=ActorForTest, actor_name=None, queue_name='default', priority=0, broker=None, **options):
    def decorator(fn):
        actor_name = fn.__name__
        return actor_class(
            fn, actor_name=actor_name, queue_name=queue_name,
            priority=priority, broker='broker', options=options,
        )
    if fn is None:
        return decorator
    return decorator(fn)

Where TEST_DRAMATIQ is an environment variable.

@vsevolodgarkusha
Copy link

My solution:

class DramatiqStub(StubBroker):
    def enqueue(self, message, *, delay=None):
        return self.actors[message.actor_name].fn(*message.args, **message.kwargs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants