From c4181953a1603ec555aeee800e173d13fb14bea8 Mon Sep 17 00:00:00 2001 From: erikh360 Date: Tue, 14 Jan 2025 15:41:52 +0200 Subject: [PATCH] Handle turn rate limits --- .../update_turn_contacts_queue.py | 58 ++++++++++++++++--- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/scripts/migrate_to_turn/update_turn_contacts_queue.py b/scripts/migrate_to_turn/update_turn_contacts_queue.py index 02bea725..d29d1e44 100644 --- a/scripts/migrate_to_turn/update_turn_contacts_queue.py +++ b/scripts/migrate_to_turn/update_turn_contacts_queue.py @@ -1,34 +1,73 @@ import asyncio import csv +import json import os import sys +import time +from datetime import datetime, timedelta from urllib.parse import urljoin import aiohttp -from scripts.migrate_to_rapidpro.retry_requests import request - WORKER_COUNT = 3 TURN_URL = "https://whatsapp-praekelt-cloud.turn.io" -async def update_turn_contact_details(session, row, target): - wa_id = row.pop("wa_id") - +async def update_turn_contact_details(session, wa_id, data, target): url = urljoin(TURN_URL, f"/v1/contacts/{wa_id}/profile") headers = { "Authorization": f"Bearer {os.environ['TURN_TOKEN']}", "content-type": "application/json", "Accept": "application/vnd.v1+json", } - await request(session, url, "PATCH", headers, row, target) + status, reset_time = await request(session, url, "PATCH", headers, data, target) + + if status == 429: + sleep_until(reset_time) + await update_turn_contact_details(session, wa_id, data, target) + + +def sleep_until(reset_time): + target = datetime.fromtimestamp(int(str(reset_time).split(".")[0])) + delta = target - datetime.now() + if delta > timedelta(0): + time.sleep(delta.total_seconds()) + return True + + +async def request(session, url, method, headers, data, target): + func = getattr(session, method.lower()) + async with func(url, headers=headers, json=data) as response: + response_body = await response.text() + + if response.status == 429: + return response.status, response.headers["x-ratelimit-reset"] + + request_data = { + "request": { + "url": response.request_info.url.human_repr(), + "method": response.request_info.method, + "headers": dict(response.request_info.headers), + "json": data, + }, + "response": { + "status": response.status, + "headers": dict(response.headers), + "body": response_body, + }, + } + + target.write(json.dumps(request_data)) + target.write("\n") + + return response.status, None async def worker(name, queue): while True: - session, row, target = await queue.get() - await update_turn_contact_details(session, row, target) + session, wa_id, data, target = await queue.get() + await update_turn_contact_details(session, wa_id, data, target) queue.task_done() @@ -43,7 +82,8 @@ async def main(filename, target): tasks.append(task) for row in reader: - update = (session, row, target) + wa_id = row.pop("wa_id") + update = (session, wa_id, row, target) await queue.put(update) await queue.join()