A simple worker task queue with async support
pip install async_q
- Submit asynchronously task
- Task routing
- Distributed async worker process
In scenarios where you need to execute I/O-bound tasks asynchronously within a single application, the "async_q" library provides a straightforward solution. This use case illustrates how to create and utilize the library for such purposes.
Before using the "async_q" library, ensure you have Python 3.8 and Redis (Version 5.0 to current) installed, along with the required dependencies.
Begin by creating an instance of the AsyncTaskQueue
to set up your application. In this example, we will name our app "async_q_app" and configure it to use Redis as the message broker.
# main_app.py
from async_q import AsyncTaskQueue, RedisBuilder
# Define Async Task Queue App
async_q_app = AsyncTaskQueue(
redis_builder=RedisBuilder(
port='6379',
)
)
Next, define a task function that will be submitted to the queue for processing. In this example, we have a task function named my_task
. This function simulates I/O waiting with a specified delay.
# my_task.py
import logging
import asyncio
from async_q import submit_task
# For initializing the app
from main_app import async_q_app
async def my_task(idx, delay=2, *args, **kwargs):
# Simulate I/O waiting
await asyncio.sleep(delay)
logging.info(f'{idx} has finished the task. Task ID: {kwargs.get("task_id")}')
To submit tasks for processing, you can use the submit_task
function. In this example, we submit 20 tasks to be processed by the queue.
if __name__ == '__main__':
for i in range(20):
submit_task(my_task, kwargs={'idx': i, 'delay': 10})
In the previous example, the submit_task
function also includes a queue
argument with the default value set to default
. This offers flexibility in case you want to specify a different queue for a particular task, but if no queue is provided, it will use the default value of default
.
if __name__ == '__main__':
for i in range(20):
# Submit a task with a default queue value
submit_task(my_task, kwargs={'idx': i, 'delay': 10, 'queue': 'default'})
In order to efficiently handle incoming tasks, it's crucial to set up and launch worker processes. The level of concurrency can be precisely determined by employing the -c
flag. In the following example, we will initiate five worker processes that target the async_q_app
module specified in main_app.py
. By default, these workers will exclusively process tasks from the default
queue.
$ python -m async_q -a main_app.py:async_q_app -c 5
However, you also have the flexibility to assign specific workers to process tasks from different queues, as demonstrated below:
$ python -m async_q -a main_app.py:async_q_app -c 5 -q mail_ps
By making these adjustments, you can tailor the behavior of your worker processes to suit your application's specific requirements.
With the worker processes running, you can now submit tasks for processing. Use the following command to execute the my_task.py
script, which submits tasks to the queue.
$ python my_task.py
The worker processes will asynchronously process the submitted tasks with the specified delays. You can monitor the progress and completion of tasks through log messages generated by the my_task
function. The "Async Queue" library is suitable for I/O-bound workloads that benefit from asynchronous processing within a single application.
- Test and check back compatibility