Skip to content

Commit

Permalink
Updating tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed May 2, 2024
1 parent b729d7c commit 3942029
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 237 deletions.
2 changes: 1 addition & 1 deletion mula/scheduler/queues/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .boefje import BoefjePriorityQueue
from .errors import InvalidPrioritizedItemError, NotAllowedError, QueueEmptyError, QueueFullError
from .errors import InvalidItemError, ItemNotFoundError, NotAllowedError, QueueEmptyError, QueueFullError
from .normalizer import NormalizerPriorityQueue
from .pq import PriorityQueue
4 changes: 2 additions & 2 deletions mula/scheduler/queues/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ class NotAllowedError(Exception):
pass


class InvalidPrioritizedItemError(ValueError):
class InvalidItemError(ValueError):
pass


class QueueFullError(Full):
pass


class PrioritizedItemNotFoundError(Exception):
class ItemNotFoundError(Exception):
pass
31 changes: 14 additions & 17 deletions mula/scheduler/queues/pq.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@

from scheduler import models, storage

from .errors import (
InvalidPrioritizedItemError,
NotAllowedError,
PrioritizedItemNotFoundError,
QueueEmptyError,
QueueFullError,
)
from .errors import InvalidItemError, ItemNotFoundError, NotAllowedError, QueueEmptyError, QueueFullError


def with_lock(method):
Expand Down Expand Up @@ -54,6 +48,9 @@ class PriorityQueue(abc.ABC):
pq_store:
A PriorityQueueStore instance that will be used to store the items
in a persistent way.
lock:
A threading.Lock instance that will be used to lock the queue
operations.
"""

def __init__(
Expand Down Expand Up @@ -134,20 +131,20 @@ def push(self, task: models.Task) -> models.Task | None:
Raises:
NotAllowedError: If the item is not allowed to be pushed.
InvalidPrioritizedItemError: If the item is not valid.
InvalidItemError: If the item is not valid.
QueueFullError: If the queue is full.
PrioritizedItemNotFoundError: If the item is not found on the queue.
ItemNotFoundError: If the item is not found on the queue.
"""
if not isinstance(task, models.Task):
raise InvalidPrioritizedItemError("The item is not a PrioritizedItem")
raise InvalidItemError("The item is not a PrioritizedItem")

if not self._is_valid_item(task.data):
raise InvalidPrioritizedItemError(f"PrioritizedItem must be of type {self.item_type}")
raise InvalidItemError(f"PrioritizedItem must be of type {self.item_type}")

if not task.priority:
raise InvalidPrioritizedItemError("PrioritizedItem must have a priority")
raise InvalidItemError("PrioritizedItem must have a priority")

if self.full() and task.priority > 1:
raise QueueFullError(f"Queue {self.pq_id} is full.")
Expand All @@ -156,7 +153,7 @@ def push(self, task: models.Task) -> models.Task | None:
# that item by the implementation of the queue. We don't do this by
# the item itself or its hash because this might have been changed
# and we might need to update that.
item_on_queue = self.get_p_item_by_identifier(task)
item_on_queue = self.get_item_by_identifier(task)

# Item on queue and data changed
item_changed = item_on_queue and task.data != item_on_queue.data
Expand Down Expand Up @@ -199,14 +196,14 @@ def push(self, task: models.Task) -> models.Task | None:
if not item_on_queue:
identifier = self.create_hash(task)
task.hash = identifier
task.status = models.TaskStatus.QUEUED.value
task.status = models.TaskStatus.QUEUED
item_db = self.pq_store.push(self.pq_id, task)
else:
self.pq_store.update(self.pq_id, task)
item_db = self.get_p_item_by_identifier(task)
item_db = self.get_item_by_identifier(task)

if not item_db:
raise PrioritizedItemNotFoundError(f"Item {task} not found in datastore {self.pq_id}")
raise ItemNotFoundError(f"Item {task} not found in datastore {self.pq_id}")

return item_db

Expand Down Expand Up @@ -287,7 +284,7 @@ def is_item_on_queue_by_hash(self, item_hash: str) -> bool:
return item is not None

@with_lock
def get_p_item_by_identifier(self, task: models.Task) -> models.Task | None:
def get_item_by_identifier(self, task: models.Task) -> models.Task | None:
"""Get an item from the queue by its identifier.
Args:
Expand Down
124 changes: 69 additions & 55 deletions mula/scheduler/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,32 @@ class Scheduler(abc.ABC):
ctx:
Application context of shared data (e.g. configuration, external
services connections).
enabled:
Whether the scheduler is enabled or not.
scheduler_id:
The id of the scheduler.
queue:
A queues.PriorityQueue instance
threads:
A dict of ThreadRunner instances, used for running processes
concurrently.
stop_event: A threading.Event object used for communicating a stop
event across threads.
deadline_ranker:
A ranker that calculates the deadline of a task.
callback:
A callback function to call when the scheduler is stopped.
scheduler_id:
The id of the scheduler.
enabled:
Whether the scheduler is enabled or not.
last_activity:
The last activity of the scheduler.
listeners:
A dict of connector.Listener instances, used for listening to
external events.
lock:
A threading.Lock instance used for locking
stop_event_threads:
A threading.Event object used for communicating a stop
event across threads.
threads:
A dict of ThreadRunner instances, used for running processes
concurrently.
executor:
A concurrent.futures.ThreadPoolExecutor instance used for
running tasks concurrently.
"""

def __init__(
Expand All @@ -59,6 +71,8 @@ def __init__(
The id of the scheduler.
queue:
A queues.PriorityQueue instance
callback:
A callback function to call when the scheduler is stopped.
max_tries:
The maximum number of retries for an item to be pushed to
the queue.
Expand Down Expand Up @@ -118,36 +132,36 @@ def run_in_thread(

self.threads.append(t)

def push_items_to_queue(self, p_items: list[models.Task]) -> None:
def push_items_to_queue(self, items: list[models.Task]) -> None:
"""Push multiple PrioritizedItems to the queue.
Args:
p_items: The list PrioritzedItem to add to the queue.
items: The list PrioritzedItem to add to the queue.
"""
count = 0
for p_item in p_items:
for item in items:
try:
self.push_item_to_queue(p_item)
self.push_item_to_queue(item)
except (
queues.errors.NotAllowedError,
queues.errors.QueueFullError,
queues.errors.InvalidPrioritizedItemError,
queues.errors.InvalidItemError,
):
self.logger.debug(
"Unable to push item %s to queue %s",
p_item.id,
item.id,
self.queue.pq_id,
p_item_id=p_item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)
continue
except Exception as exc:
self.logger.error(
"Unable to push item %s to queue %s",
p_item.id,
item.id,
self.queue.pq_id,
p_item_id=p_item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)
Expand All @@ -157,14 +171,14 @@ def push_items_to_queue(self, p_items: list[models.Task]) -> None:

def push_item_to_queue_with_timeout(
self,
p_item: models.Task,
item: models.Task,
max_tries: int = 5,
timeout: int = 1,
) -> None:
"""Push an item to the queue, with a timeout.
Args:
p_item: The item to push to the queue.
item: The item to push to the queue.
timeout: The timeout in seconds.
max_tries: The maximum number of tries. Set to -1 for infinite tries.
Expand All @@ -185,32 +199,32 @@ def push_item_to_queue_with_timeout(
if tries >= max_tries and max_tries != -1:
raise queues.errors.QueueFullError()

self.push_item_to_queue(p_item)
self.push_item_to_queue(item)

def push_item_to_queue(self, p_item: models.Task) -> None:
def push_item_to_queue(self, item: models.Task) -> None:
"""Push a PrioritizedItem to the queue.
Args:
p_item: The PrioritizedItem to push to the queue.
item: The PrioritizedItem to push to the queue.
"""
if not self.is_enabled():
self.logger.warning(
"Scheduler is disabled, not pushing item to queue %s",
self.queue.pq_id,
p_item_id=p_item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)
raise queues.errors.NotAllowedError("Scheduler is disabled")

try:
p_item.status = models.TaskStatus.QUEUED.value
p_item = self.queue.push(p_item)
item.status = models.TaskStatus.QUEUED.value
item = self.queue.push(item)
except queues.errors.NotAllowedError as exc:
self.logger.warning(
"Not allowed to push to queue %s",
self.queue.pq_id,
p_item_id=p_item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)
Expand All @@ -219,17 +233,17 @@ def push_item_to_queue(self, p_item: models.Task) -> None:
self.logger.warning(
"Queue %s is full, not pushing new items",
self.queue.pq_id,
p_item_id=p_item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
queue_qsize=self.queue.qsize(),
scheduler_id=self.scheduler_id,
)
raise exc
except queues.errors.InvalidPrioritizedItemError as exc:
except queues.errors.InvalidItemError as exc:
self.logger.warning(
"Invalid prioritized item %s",
p_item.id,
p_item_id=p_item.id,
item.id,
item_id=item.id,
queue_id=self.queue.pq_id,
queue_qsize=self.queue.qsize(),
scheduler_id=self.scheduler_id,
Expand All @@ -238,29 +252,29 @@ def push_item_to_queue(self, p_item: models.Task) -> None:

self.logger.debug(
"Pushed item %s to queue %s with priority %s ",
p_item.id,
item.id,
self.queue.pq_id,
p_item.priority,
p_item_id=p_item.id,
item.priority,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)

self.post_push(p_item)
self.post_push(item)

def post_push(self, p_item: models.Task) -> None:
def post_push(self, item: models.Task) -> None:
"""When a boefje task is being added to the queue. We
persist a task to the datastore with the status QUEUED.
Args:
p_item: The prioritized item from the priority queue.
item: The prioritized item from the priority queue.
"""
self.last_activity = datetime.now(timezone.utc)

# Create TaskSchema
#
# Do we have a schema for this task?
schema_db = self.ctx.datastores.schema_store.get_schema_by_hash(p_item.hash)
schema_db = self.ctx.datastores.schema_store.get_schema_by_hash(item.hash)
if schema_db is None:
schema_db = self.ctx.datastores.schema_store.create_schema(
models.TaskSchema(
Expand All @@ -271,8 +285,8 @@ def post_push(self, p_item: models.Task) -> None:
)
)

p_item.schema_id = schema_db.id
self.ctx.datastores.task_store.update_task(p_item)
item.schema_id = schema_db.id
self.ctx.datastores.task_store.update_task(item)

def pop_item_from_queue(self, filters: storage.filters.FilterRequest | None = None) -> models.Task | None:
"""Pop an item from the queue.
Expand All @@ -294,42 +308,42 @@ def pop_item_from_queue(self, filters: storage.filters.FilterRequest | None = No
raise queues.errors.NotAllowedError("Scheduler is disabled")

try:
p_item = self.queue.pop(filters)
item = self.queue.pop(filters)
except queues.QueueEmptyError as exc:
raise exc

if p_item is not None:
if item is not None:
self.logger.debug(
"Popped item %s from queue %s with priority %s",
p_item.id,
item.id,
self.queue.pq_id,
p_item.priority,
p_item_id=p_item.id,
item.priority,
item_id=item.id,
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)

self.post_pop(p_item)
self.post_pop(item)

return p_item
return item

def post_pop(self, p_item: models.Task) -> None:
def post_pop(self, item: models.Task) -> None:
"""When a boefje task is being removed from the queue. We
persist a task to the datastore with the status RUNNING
Args:
p_item: The prioritized item from the priority queue.
item: The prioritized item from the priority queue.
"""
# Update task
task = self.ctx.datastores.task_store.get_task(str(p_item.id))
task = self.ctx.datastores.task_store.get_task(str(item.id))
if task is None:
self.logger.warning(
"PrioritizedItem %s popped from %s, task %s not found in datastore, could not update task status",
p_item.id,
item.id,
self.queue.pq_id,
p_item.data.get("id"),
p_item_id=p_item.id,
task_id=p_item.data.get("id"),
item.data.get("id"),
item_id=item.id,
task_id=item.data.get("id"),
queue_id=self.queue.pq_id,
scheduler_id=self.scheduler_id,
)
Expand All @@ -355,7 +369,7 @@ def _calculate_deadline(task: models.Task):
"Unable to calculate deadline for schedule %s. Disabling schedule",
schema.id,
schema_id=schema.id,
task_hash=task.p_item.hash,
task_hash=task.item.hash,
scheduler_id=self.scheduler_id,
)
schema.enabled = False
Expand Down
Loading

0 comments on commit 3942029

Please sign in to comment.