Skip to content

Commit

Permalink
Merge pull request #651 from praekeltfoundation/migrate-to-turn-scripts
Browse files Browse the repository at this point in the history
Handle turn rate limits
  • Loading branch information
erikh360 authored Jan 14, 2025
2 parents 8a85de2 + c418195 commit e0caede
Showing 1 changed file with 49 additions and 9 deletions.
58 changes: 49 additions & 9 deletions scripts/migrate_to_turn/update_turn_contacts_queue.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,73 @@
import asyncio
import csv
import json
import os
import sys
import time
from datetime import datetime, timedelta
from urllib.parse import urljoin

import aiohttp

from scripts.migrate_to_rapidpro.retry_requests import request

WORKER_COUNT = 3

TURN_URL = "https://whatsapp-praekelt-cloud.turn.io"


async def update_turn_contact_details(session, row, target):
wa_id = row.pop("wa_id")

async def update_turn_contact_details(session, wa_id, data, target):
url = urljoin(TURN_URL, f"/v1/contacts/{wa_id}/profile")
headers = {
"Authorization": f"Bearer {os.environ['TURN_TOKEN']}",
"content-type": "application/json",
"Accept": "application/vnd.v1+json",
}
await request(session, url, "PATCH", headers, row, target)
status, reset_time = await request(session, url, "PATCH", headers, data, target)

if status == 429:
sleep_until(reset_time)
await update_turn_contact_details(session, wa_id, data, target)


def sleep_until(reset_time):
target = datetime.fromtimestamp(int(str(reset_time).split(".")[0]))
delta = target - datetime.now()
if delta > timedelta(0):
time.sleep(delta.total_seconds())
return True


async def request(session, url, method, headers, data, target):
func = getattr(session, method.lower())
async with func(url, headers=headers, json=data) as response:
response_body = await response.text()

if response.status == 429:
return response.status, response.headers["x-ratelimit-reset"]

request_data = {
"request": {
"url": response.request_info.url.human_repr(),
"method": response.request_info.method,
"headers": dict(response.request_info.headers),
"json": data,
},
"response": {
"status": response.status,
"headers": dict(response.headers),
"body": response_body,
},
}

target.write(json.dumps(request_data))
target.write("\n")

return response.status, None


async def worker(name, queue):
while True:
session, row, target = await queue.get()
await update_turn_contact_details(session, row, target)
session, wa_id, data, target = await queue.get()
await update_turn_contact_details(session, wa_id, data, target)
queue.task_done()


Expand All @@ -43,7 +82,8 @@ async def main(filename, target):
tasks.append(task)

for row in reader:
update = (session, row, target)
wa_id = row.pop("wa_id")
update = (session, wa_id, row, target)
await queue.put(update)

await queue.join()
Expand Down

0 comments on commit e0caede

Please sign in to comment.