Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add celery tasks to listen to postgres NOTIFY #4287

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions geotrek/common/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import importlib

from os.path import join
import sys
from celery import Task, shared_task, current_task
from django.contrib.auth.models import User
from django.conf import settings
from django.db import connections
from django.utils.translation import gettext_lazy as _


Expand Down Expand Up @@ -123,3 +125,39 @@
'report': parser.report(output_format='html').replace('$celery_id', current_task.request.id),
'name': current_task.name
}


@shared_task(name='geotrek.common.handle_notification')
def handle_notification(channel, payload):
print("Do something long......")
print(f"I have treated {channel} {payload}")

Check warning on line 133 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L132-L133

Added lines #L132 - L133 were not covered by tests


async def read_async(connection):

loop = asyncio.get_running_loop()
q = asyncio.Queue()

Check warning on line 139 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L138-L139

Added lines #L138 - L139 were not covered by tests

def put_notification_in_queue():
connection.poll()
data = connection.notifies.pop(0)
q.put_nowait(data)

Check warning on line 144 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L141-L144

Added lines #L141 - L144 were not covered by tests

loop.add_reader(connection, put_notification_in_queue)

Check warning on line 146 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L146

Added line #L146 was not covered by tests

while 1:
notified = await q.get()
print("Notification received: " + str(notified))

Check warning on line 150 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L149-L150

Added lines #L149 - L150 were not covered by tests
# TODO we need to some nother celery workers to take on in these tasks
# Because at the moment our only celery worker is stuck doing 'await q.get()'
handle_notification.delay(notified.channel, notified.payload)

Check warning on line 153 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L153

Added line #L153 was not covered by tests


@shared_task(name='geotrek.common.listen_to_notifications')
def listen_to_notifications():
notifications_connection = connections.create_connection('default')
notifications_connection.connect()
curs = notifications_connection.cursor()
curs.execute("LISTEN testchannel;")

Check warning on line 161 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L158-L161

Added lines #L158 - L161 were not covered by tests
# TODO Close this connection properly
asyncio.run(read_async(notifications_connection.connection))

Check warning on line 163 in geotrek/common/tasks.py

View check run for this annotation

Codecov / codecov/patch

geotrek/common/tasks.py#L163

Added line #L163 was not covered by tests
Loading