The simple and slick way to run async tasks in Django.
- Django-friendly API
- Easy to start and stop
Based on django-queue.
With a healthy mix of vegetables, such as Celery and Carrot aleady in the midst, what does django-leek
bring?
The most "lightweight" library so far has "install Redis" as step one. Although, Redis is a fantastic software, sometimes you just want a simple way of offload the webserver and run a task async, such as sending an email.
Here django-leek
comes to the rescue. Usage and architecture cannot be simpler, and with so few moving parts, it should be very stable, although it's still not battle tested as e.g. Celery.
With django-leek
you can get up and running quickly The more complex distributed queues can wait until the website has a lot of traffic, and the scalability is really required.
-
Install
django-leek
with pip$ pip install django-leek
-
Add
django_leek
toINSTALLED_APPS
in yoursettings.py
file. -
Create tables needed
$ manange.py migrate
-
Make sure the django-leek server is running.
$ python manage.py leek
-
Go nuts
leek = Leek() @leek.task def send_mail(to): do_what_ever() send_mail.offload(to='[email protected]')
You can also use the "old" as found in
django-queue
push_task_to_queue(send_mail, to='[email protected]')
-
It's easy to unit test code that in production offloads work to the Leek server.
def _invoke(a_callable, *args, **kwargs):
- a_callable(*args, **kwargs) @patch('django_leek.api.push_task_to_queue', _invoke) def test_mytest(): send_mail.offload(to='[email protected]') # now runs synchronously, like a normal function ```
There is a test application you can play around with when developing on django-leek
. Example:
./manage.sh test_app runserver
- Starts the test app./manage.sh test_app leek
- Starts a leek instance for the test app./manage.sh django_leek test
- Run test suite.
In a nutshell, a python SocketServer runs in the background, listening on a tcp socket. SocketServer gets the request to run a task from it's socket, puts the task on a Queue. A Worker thread picks tasks from this Queue, and runs the tasks one by one.
- Python SocketServer that listens to a tcp socket.
- A Worker thread.
- A python Queue
The workflow that runs an async task:
- When
SocketServer
starts, it initializes theWorker
thread. SocketServer
listens to requests.- When
SocketServer
receives a request - a callables with args and kwargs - it puts the request on a pythonQueue
. - The
Worker
thread picks a task from theQueue
. - The
Worker
thread runs the task.
Depends on the traffic: SocketServer is simple, but solid, and as the site gets more traffic, it's possible to move the django-queue server to another machine, separate database etc. At some point, probably, it's better to pick Celery. Until then, django-leek is a simple, solid, and no-hustle solution.
To change the default django-queue settings, add a LEEK
dictionary to your project main settings.py
file.
This is the dictionary and the defaults:
LEEK = {
'bind': "localhost:8002",
'host': "localhost",
'port': 8002}
bind
The leek server will bind here.
host
The django server will connect to this host when notifying leek of jobs.
port
The django server will connect to this port when notifying leek of jobs.
The following models are used.
QueuedTasks
The model saves every tasks pushed to the queue.
The task is pickled as a tasks_queue.tasks.Task
object, which is a simple class with a callable
,args
and kwargs
attributes, and one method: run()
SuccessTasks
The Worker thread saves to this model the task_id
of every task that was carried out successfuly. task_id is the task's QueuedTasks
id.
FailedTasks
After the Worker tries to run a task and it fails by raising an exception, the Worker saves it to this model. The failed taks is saved by the task_id
, with the exception message. Only the exception from the last run is saved.
According to your project needs, you can purge tasks that the Worker completed successfuly.
The SQL to delete these tasks:
DELETE queued,success
FROM tasks_queue_queuedtasks queued
INNER JOIN tasks_queue_successtasks success
ON success.task_id = queued.id;
In a similar way, delete the failed tasks. You can run a cron script, or other script, to purge the tasks.
-
Checkout master branch
-
Make sure virtual environment is activated.
source venv/bin/activate
-
Make sure version in
setup.py
is correct.grep version setup.py
-
Make sure setuptools, twine, and wheel are installed and up to date
pip install "setuptools>=38.6.0" "twine>=1.11.0" "wheel>=0.31.0"
-
Clean out any old dist packages.
rm -r dist/
-
Build source and wheel dists.
python setup.py sdist bdist_wheel
-
Upload to PyPI
twine upload dist/*
-
Profit!
Aviah, Silvia Scalisi and Samuel Carlsson
See contributors for full list.