Skip to content

Commit

Permalink
Refactor queue_task to take a collection_id
Browse files Browse the repository at this point in the history
  • Loading branch information
stchris committed Jun 25, 2024
1 parent 394fda5 commit e35b318
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 20 deletions.
16 changes: 8 additions & 8 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,22 +757,22 @@ def get_rabbitmq_connection():
raise RuntimeError("Could not connect to RabbitMQ")


def get_task_count(collection, redis_conn) -> int:
def get_task_count(collection_id, redis_conn) -> int:
"""Get the total task count for a given dataset."""
status = Dataset.get_active_dataset_status(conn=redis_conn)
try:
collection = status["datasets"][dataset_from_collection(collection)]
collection = status["datasets"][str(collection_id)]
total = collection["finished"] + collection["running"] + collection["pending"]
except KeyError:
total = 0
return total


def get_priority(collection, redis_conn) -> int:
def get_priority(collection_id, redis_conn) -> int:
"""
Priority buckets for tasks based on the total (pending + running) task count.
"""
total_task_count = get_task_count(collection, redis_conn)
total_task_count = get_task_count(collection_id, redis_conn)
if total_task_count < 500:
return randrange(7, 9)
elif total_task_count < 10000:
Expand All @@ -788,12 +788,12 @@ def dataset_from_collection(collection):


def queue_task(
rmq_conn, redis_conn, collection, stage, job_id=None, context=None, **payload
rmq_conn, redis_conn, collection_id, stage, job_id=None, context=None, **payload
):
task_id = uuid.uuid4().hex
priority = get_priority(collection, redis_conn)
priority = get_priority(collection_id, redis_conn)
body = {
"collection_id": dataset_from_collection(collection),
"collection_id": collection_id,
"job_id": job_id or uuid.uuid4().hex,
"task_id": task_id,
"operation": stage,
Expand All @@ -812,7 +812,7 @@ def queue_task(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE, priority=priority
),
)
dataset = Dataset(conn=redis_conn, name=dataset_from_collection(collection))
dataset = Dataset(conn=redis_conn, name=str(collection_id))
dataset.add_task(task_id, stage)
except (pika.exceptions.UnroutableError, pika.exceptions.AMQPConnectionError):
log.exception("Error while queuing task")
Expand Down
22 changes: 10 additions & 12 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

from servicelayer.taskqueue import get_priority, get_task_count, queue_task

from unittest.mock import Mock


class CountingWorker(Worker):
def dispatch_task(self, task: Task) -> Task:
Expand Down Expand Up @@ -194,15 +192,15 @@ def test_get_priority_bucket():
redis = get_fakeredis()
rmq = get_rabbitmq_connection()
flush_queues(rmq, redis, ["index"])
collection = Mock(id=1)
collection_id = 1

assert get_task_count(collection, redis) == 0
assert get_priority(collection, redis) in (7, 8)
assert get_task_count(collection_id, redis) == 0
assert get_priority(collection_id, redis) in (7, 8)

queue_task(rmq, redis, collection, "index")
queue_task(rmq, redis, collection_id, "index")

assert get_task_count(collection, redis) == 1
assert get_priority(collection, redis) in (7, 8)
assert get_task_count(collection_id, redis) == 1
assert get_priority(collection_id, redis) in (7, 8)

with patch.object(
Dataset,
Expand Down Expand Up @@ -230,8 +228,8 @@ def test_get_priority_bucket():
},
},
):
assert get_task_count(collection, redis) == 9999
assert get_priority(collection, redis) in (4, 5, 6)
assert get_task_count(collection_id, redis) == 9999
assert get_priority(collection_id, redis) in (4, 5, 6)

with patch.object(
Dataset,
Expand Down Expand Up @@ -259,5 +257,5 @@ def test_get_priority_bucket():
},
},
):
assert get_task_count(collection, redis) == 10001
assert get_priority(collection, redis) in (1, 2, 3)
assert get_task_count(collection_id, redis) == 10001
assert get_priority(collection_id, redis) in (1, 2, 3)

0 comments on commit e35b318

Please sign in to comment.