Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds startup task to cancel previously running tasks #242

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
8 changes: 8 additions & 0 deletions matcher/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MatcherStatus(Enum):
DEPLOYED = "Deployed"
DEPLOYMENT_ERROR = "Deployment Error"
QUEUED = "Queued"
CANCELLED = "Cancelled"


class MatcherError(Exception):
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
self.assignments_output = assignments_output
self.alternates_output = alternates_output
self.logger = logger
self.match_group = None

def set_assignments(self, assignments):
self.logger.info("Writing assignments to file")
Expand All @@ -91,6 +93,9 @@ def set_status(self, status, message, additional_status_info={}):
)
)

def validate_group(self, group_id):
pass


class Matcher:
"""Main class that coordinates an Encoder and a Solver."""
Expand Down Expand Up @@ -144,6 +149,9 @@ def run(self):
The config note's status field will be set to reflect completion or errors.
"""
try:
self.logger.info("Validating Match Group")
self.datasource.validate_group(self.datasource.match_group)

self.set_status(MatcherStatus.RUNNING)

self.logger.debug("Start encoding")
Expand Down
23 changes: 17 additions & 6 deletions matcher/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import os

import flask


import redis
from celery import Celery

os.environ.setdefault(
Expand Down Expand Up @@ -80,10 +79,22 @@ def create_celery(app):
Initializes a celery application using Flask App
"""

celery = Celery(
app.import_name,
include=["matcher.service.celery_tasks"],
)
celery = Celery(app.import_name, include=["matcher.service.celery_tasks"])
celery.config_from_envvar("CELERY_CONFIG_MODULE")
celery.conf.update(app.config)

return celery


def create_redis(app):
"""
Initializes a redis connection pool
"""
config_pool = redis.ConnectionPool(
host=app.config["REDIS_ADDR"],
port=app.config["REDIS_PORT"],
db=app.config["REDIS_DB"],
decode_responses=True,
)

return config_pool
37 changes: 37 additions & 0 deletions matcher/service/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

import redis
from openreview import openreview
from requests.exceptions import ConnectionError
from urllib3.exceptions import ConnectTimeoutError, RequestError

Expand Down Expand Up @@ -105,3 +107,38 @@ def run_deployment(
raise self.retry(
exc=exc, countdown=300 * (self.request.retries + 1), max_retries=1
)


@celery.task(name="cancel_stale_notes", track_started=True, bind=True)
def cancel_stale_notes(
self, openreview_baseurl, openreview_username, openreview_password
):
print("Cancelling Stale Notes")
from matcher.service.server import redis_pool

redis_conn = redis.Redis(connection_pool=redis_pool)
config_notes = redis_conn.hgetall(name="config_notes")
openreview_client = openreview.Client(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carlosmondra to avoid using the super user to edit the notes, could we have a "matching" user or token? I can modify the invitation so the matching user is writer of all the configuration notes.

baseurl=openreview_baseurl,
username=openreview_username,
password=openreview_password,
)
for note_id, status in config_notes.items():
if status in ["Running", "Deploying", "Queued"]:
config_note = openreview_client.get_note(note_id)
redis_conn.hset(
name="config_notes",
key=note_id,
value=MatcherStatus.CANCELLED.value,
)
config_note.content["status"] = MatcherStatus.CANCELLED.value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some description in the error message? "Matching run was cancelled, please try again"

config_note.content[
"error_message"
] = "Matching run was cancelled, please try again or contact support."
openreview_client.post_note(config_note)
print(
"Config Note {} status set to: {}".format(
config_note.id, config_note.content["status"]
)
)
redis_conn.close()
5 changes: 5 additions & 0 deletions matcher/service/config/celery_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
Queue(
"failure", routing_key="matcher.service.celery_tasks.set_error_status"
),
Queue(
"startup",
routing_key="matcher.service.celery_tasks.cancel_stale_notes",
),
Queue("default"),
)
task_ignore_result = False
broker_url = "redis://localhost:6379/10"
Expand Down
5 changes: 5 additions & 0 deletions matcher/service/config/default.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
LOG_FILE='default.log'
OPENREVIEW_BASEURL='http://localhost:3000'
OPENREVIEW_USERNAME='OpenReview.net'
OPENREVIEW_PASSWORD=''
REDIS_ADDR='localhost'
REDIS_PORT=6379
REDIS_DB=10
9 changes: 9 additions & 0 deletions matcher/service/openreview_interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import re
import openreview
import logging

import redis
from tqdm import tqdm
from matcher.encoder import EncoderError
from matcher.core import MatcherError, MatcherStatus
Expand Down Expand Up @@ -303,6 +305,12 @@ def weight_by_type(self):

def set_status(self, status, message="", additional_status_info={}):
"""Set the status of the config note"""
from matcher.service.server import redis_pool

redis_conn = redis.Redis(connection_pool=redis_pool)
redis_conn.hset(
name="config_notes", key=self.config_note.id, value=status.value
)
self.config_note.content["status"] = status.value
self.config_note.content["error_message"] = message
for key, value in additional_status_info.items():
Expand All @@ -315,6 +323,7 @@ def set_status(self, status, message="", additional_status_info={}):
self.config_note.id, self.config_note.content["status"]
)
)
redis_conn.close()

def set_assignments(self, assignments_by_forum):
"""Helper function for posting assignments returned by the Encoder"""
Expand Down
41 changes: 34 additions & 7 deletions matcher/service/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import flask
import openreview
import redis
from flask_cors import CORS

from .openreview_interface import ConfigNoteInterface
Expand Down Expand Up @@ -51,36 +52,45 @@ def match():
token=token, baseurl=flask.current_app.config["OPENREVIEW_BASEURL"]
)

from matcher.service.server import redis_pool

redis_conn = redis.Redis(connection_pool=redis_pool)

interface = ConfigNoteInterface(
client=openreview_client,
config_note_id=config_note_id,
logger=flask.current_app.logger,
)

interface.validate_group(interface.match_group)
openreview_client.impersonate(interface.venue_id)

if interface.config_note.content["status"] == "Running":
config_note_status = redis_conn.hget(
name="config_notes", key=config_note_id
)
if not config_note_status:
config_note_status = interface.config_note.content["status"]

if config_note_status == "Running":
raise MatcherStatusException("Matcher is already running")
if interface.config_note.content["status"] == "Complete":
if config_note_status == "Complete":
raise MatcherStatusException(
"Match configured by {} is already complete".format(
config_note_id
)
)
if interface.config_note.content["status"] == "Deploying":
if config_note_status == "Deploying":
raise MatcherStatusException(
"Match configured by {} is being deployed".format(
config_note_id
)
)
if interface.config_note.content["status"] == "Deployed":
if config_note_status == "Deployed":
raise MatcherStatusException(
"Match configured by {} is already deployed".format(
config_note_id
)
)
if interface.config_note.content["status"] == "Queued":
if config_note_status == "Queued":
raise MatcherStatusException(
"Match configured by {} is already in queue.".format(
config_note_id
Expand Down Expand Up @@ -178,13 +188,30 @@ def deploy():
token=token, baseurl=flask.current_app.config["OPENREVIEW_BASEURL"]
)

from matcher.service.server import redis_pool

redis_conn = redis.Redis(connection_pool=redis_pool)

interface = ConfigNoteInterface(
client=openreview_client,
config_note_id=config_note_id,
logger=flask.current_app.logger,
)

if interface.config_note.content["status"] not in [
config_note_status = redis_conn.hget(
name="config_notes", key=config_note_id
)
if not config_note_status:
config_note_status = interface.config_note.content["status"]

if config_note_status == "Deploying":
raise MatcherStatusException(
"Match configured by {} is being deployed".format(
config_note_id
)
)

if config_note_status not in [
"Complete",
"Deployment Error",
]:
Expand Down
16 changes: 15 additions & 1 deletion matcher/service/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
from matcher.service import create_app, create_celery
from celery.signals import worker_ready

from matcher.service import create_app, create_celery, create_redis

app = create_app()
celery_app = create_celery(app)
redis_pool = create_redis(app)


@worker_ready.connect
def at_start(sender, **kwargs):
with sender.app.connection() as conn:
task_kwargs = {
"openreview_baseurl": sender.app.conf["OPENREVIEW_BASEURL"],
"openreview_username": sender.app.conf["OPENREVIEW_USERNAME"],
"openreview_password": sender.app.conf["OPENREVIEW_PASSWORD"],
}
sender.app.send_task("cancel_stale_notes", kwargs=task_kwargs)
5 changes: 4 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def openreview_context():
"SUPERUSER_LASTNAME": "User",
"SUPERUSER_TILDE_ID": "~Super_User1",
"SUPERUSER_EMAIL": "[email protected]",
"REDIS_ADDR": "localhost",
"REDIS_PORT": 6379,
"REDIS_DB": 10,
}
)

Expand Down Expand Up @@ -218,7 +221,7 @@ def celery_includes():
@pytest.fixture(scope="session")
def celery_worker_parameters():
return {
"queues": ("default", "matching", "deployment", "failure"),
"queues": ("default", "matching", "deployment", "failure", "startup"),
"perform_ping_check": False,
"concurrency": 4,
}
Expand Down
Loading