diff --git a/geotrek/common/tasks.py b/geotrek/common/tasks.py index 7e616a4960..bb466d879c 100644 --- a/geotrek/common/tasks.py +++ b/geotrek/common/tasks.py @@ -1,3 +1,4 @@ +import asyncio import importlib from os.path import join @@ -5,6 +6,7 @@ from celery import Task, shared_task, current_task from django.contrib.auth.models import User from django.conf import settings +from django.db import connections from django.utils.translation import gettext_lazy as _ @@ -123,3 +125,39 @@ def progress_cb(progress, line, eid): 'report': parser.report(output_format='html').replace('$celery_id', current_task.request.id), 'name': current_task.name } + + +@shared_task(name='geotrek.common.handle_notification') +def handle_notification(channel, payload): + print("Do something long......") + print(f"I have treated {channel} {payload}") + + +async def read_async(connection): + + loop = asyncio.get_running_loop() + q = asyncio.Queue() + + def put_notification_in_queue(): + connection.poll() + data = connection.notifies.pop(0) + q.put_nowait(data) + + loop.add_reader(connection, put_notification_in_queue) + + while 1: + notified = await q.get() + print("Notification received: " + str(notified)) + # TODO we need to some nother celery workers to take on in these tasks + # Because at the moment our only celery worker is stuck doing 'await q.get()' + handle_notification.delay(notified.channel, notified.payload) + + +@shared_task(name='geotrek.common.listen_to_notifications') +def listen_to_notifications(): + notifications_connection = connections.create_connection('default') + notifications_connection.connect() + curs = notifications_connection.cursor() + curs.execute("LISTEN testchannel;") + # TODO Close this connection properly + asyncio.run(read_async(notifications_connection.connection))