Skip to content

Commit

Permalink
Execute Queries in RQ (#4413)
Browse files Browse the repository at this point in the history
* enforce hard limits on non-responsive work horses by workers

* move differences from Worker to helper methods to help make the specialization clearer

* move HardLimitingWorker to redash/tasks

* move schedule.py to /tasks

* explain the motivation for HardLimitingWorker

* pleasing CodeClimate

* pleasing CodeClimate

* port query execution to RQ

* get rid of argsrepr

* avoid star imports

* allow queries to be cancelled in RQ

* return QueryExecutionErrors as job results

* fix TestTaskEnqueue and QueryExecutorTests

* remove Celery monitoring

* get rid of QueryTask and use RQ jobs directly (with a job serializer)

* Revert "remove Celery monitoring"

This reverts commit 37a74ea.

* reduce occurences of the word 'task'

* use Worker, Queue and Job instead of spreading names that share behavior details

* remove locks for failed jobs as well

* did I not commit that colon? oh my

* push the redis connection to RQ's stack on every request to avoid verbose connection setting

* use a connection context for tests

* black it up

* run RQ on all queues when running in Cypress
  • Loading branch information
Omer Lachish authored Dec 30, 2019
1 parent ff34ded commit 329e859
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 283 deletions.
1 change: 0 additions & 1 deletion .circleci/docker-compose.cypress.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ services:
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "default periodic schemas"
celery_worker:
build: ../
command: celery_worker
Expand Down
12 changes: 11 additions & 1 deletion redash/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ def __init__(self, *args, **kwargs):


def create_app():
from . import authentication, extensions, handlers, limiter, mail, migrate, security
from . import (
authentication,
extensions,
handlers,
limiter,
mail,
migrate,
security,
tasks,
)
from .handlers.webpack import configure_webpack
from .metrics import request as request_metrics
from .models import db, users
Expand All @@ -47,5 +56,6 @@ def create_app():
configure_webpack(app)
extensions.init_app(app)
users.init_app(app)
tasks.init_app(app)

return app
9 changes: 5 additions & 4 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

from click import argument
from flask.cli import AppGroup
from rq import Connection, Worker
from rq import Connection
from sqlalchemy.orm import configure_mappers

from redash import rq_redis_connection
from redash.schedule import (
from redash.tasks import Worker
from redash.tasks.schedule import (
rq_scheduler,
schedule_periodic_jobs,
periodic_job_definitions,
Expand All @@ -34,10 +35,10 @@ def worker(queues):
configure_mappers()

if not queues:
queues = ["periodic", "emails", "default", "schemas"]
queues = ["scheduled_queries", "queries", "periodic", "emails", "default", "schemas"]

with Connection(rq_redis_connection):
w = Worker(queues, log_job_description=False)
w = Worker(queues, log_job_description=False, job_monitoring_interval=5)
w.work()


Expand Down
12 changes: 7 additions & 5 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
require_permission,
require_any_of_permission,
view_only,
view_only,
)
from redash.tasks import QueryTask
from redash.tasks import Job
from redash.tasks.queries import enqueue_query
from redash.utils import (
collect_parameters_from_request,
Expand All @@ -35,6 +36,7 @@
serialize_query_result,
serialize_query_result_to_dsv,
serialize_query_result_to_xlsx,
serialize_job,
)


Expand Down Expand Up @@ -119,7 +121,7 @@ def run_query(query, parameters, data_source, query_id, max_age=0):
"Query ID": query_id,
},
)
return {"job": job.to_dict()}
return serialize_job(job)


def get_download_filename(query_result, query, filetype):
Expand Down Expand Up @@ -441,12 +443,12 @@ def get(self, job_id, query_id=None):
"""
Retrieve info about a running query job.
"""
job = QueryTask(job_id=job_id)
return {"job": job.to_dict()}
job = Job.fetch(job_id)
return serialize_job(job)

def delete(self, job_id):
"""
Cancel a query job in progress.
"""
job = QueryTask(job_id=job_id)
job = Job.fetch(job_id)
job.cancel()
6 changes: 3 additions & 3 deletions redash/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_queues_status():
},
**{
queue.name: {"size": len(queue)}
for queue in Queue.all(connection=rq_redis_connection)
for queue in Queue.all()
},
}

Expand Down Expand Up @@ -166,7 +166,7 @@ def rq_queues():
"started": fetch_jobs(q, StartedJobRegistry(queue=q).get_job_ids()),
"queued": len(q.job_ids),
}
for q in Queue.all(connection=rq_redis_connection)
for q in Queue.all()
}


Expand All @@ -189,7 +189,7 @@ def rq_workers():
"failed_jobs": w.failed_job_count,
"total_working_time": w.total_working_time,
}
for w in Worker.all(connection=rq_redis_connection)
for w in Worker.all()
]


Expand Down
38 changes: 38 additions & 0 deletions redash/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from funcy import project

from flask_login import current_user
from rq.job import JobStatus
from rq.timeouts import JobTimeoutException

from redash import models
from redash.permissions import has_access, view_only
Expand Down Expand Up @@ -263,3 +265,39 @@ def serialize_dashboard(obj, with_widgets=False, user=None, with_favorite_state=
d["is_favorite"] = models.Favorite.is_favorite(current_user.id, obj)

return d


def serialize_job(job):
# TODO: this is mapping to the old Job class statuses. Need to update the client side and remove this
STATUSES = {
JobStatus.QUEUED: 1,
JobStatus.STARTED: 2,
JobStatus.FINISHED: 3,
JobStatus.FAILED: 4,
}

job_status = job.get_status()
if job.is_started:
updated_at = job.started_at or 0
else:
updated_at = 0

status = STATUSES[job_status]

if isinstance(job.result, Exception):
error = str(job.result)
status = 4
elif job.is_cancelled:
error = "Query execution cancelled."
else:
error = ""

return {
"job": {
"id": job.id,
"updated_at": updated_at,
"status": status,
"error": error,
"query_result_id": job.result if job.is_finished and not error else None,
}
}
12 changes: 11 additions & 1 deletion redash/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
purge_failed_jobs,
)
from .queries import (
QueryTask,
enqueue_query,
execute_query,
refresh_queries,
Expand All @@ -16,3 +15,14 @@
)
from .alerts import check_alerts_for_query
from .failure_report import send_aggregated_errors
from .worker import Worker, Queue, Job
from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions

from redash import rq_redis_connection
from rq.connections import push_connection, pop_connection


def init_app(app):
app.before_request(lambda: push_connection(rq_redis_connection))
app.teardown_request(lambda _: pop_connection())

2 changes: 1 addition & 1 deletion redash/tasks/queries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
cleanup_query_results,
empty_schedules,
)
from .execution import QueryTask, execute_query, enqueue_query
from .execution import execute_query, enqueue_query
Loading

0 comments on commit 329e859

Please sign in to comment.