Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute Queries in RQ #4413

Merged
merged 28 commits into from
Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ed925d5
enforce hard limits on non-responsive work horses by workers
Nov 13, 2019
859fe2a
move differences from Worker to helper methods to help make the speci…
Nov 13, 2019
d120100
move HardLimitingWorker to redash/tasks
Nov 14, 2019
86b9075
Merge branch 'master' into hard-time-limit
Nov 14, 2019
1fa6abf
move schedule.py to /tasks
Nov 16, 2019
1251b9b
explain the motivation for HardLimitingWorker
Nov 16, 2019
4ae624b
pleasing CodeClimate
Nov 17, 2019
9cfd453
pleasing CodeClimate
Nov 17, 2019
9c855cb
port query execution to RQ
Nov 18, 2019
0c8f0b4
get rid of argsrepr
Nov 18, 2019
768f0f6
avoid star imports
Nov 26, 2019
916557d
Merge branch 'master' into execute-query-in-rq
Nov 28, 2019
5983541
Merge branch 'hard-time-limit' into execute-query-in-rq
Nov 28, 2019
edd656f
allow queries to be cancelled in RQ
Nov 28, 2019
1d8af9c
return QueryExecutionErrors as job results
Nov 28, 2019
a96ee82
fix TestTaskEnqueue and QueryExecutorTests
Dec 2, 2019
37a74ea
remove Celery monitoring
Dec 4, 2019
faf5166
get rid of QueryTask and use RQ jobs directly (with a job serializer)
Dec 4, 2019
66f3db9
Revert "remove Celery monitoring"
Dec 4, 2019
045cb96
reduce occurences of the word 'task'
Dec 4, 2019
d486cbb
use Worker, Queue and Job instead of spreading names that share behav…
Dec 4, 2019
8b1a471
remove locks for failed jobs as well
Dec 5, 2019
75bbbf8
did I not commit that colon? oh my
Dec 5, 2019
0637080
push the redis connection to RQ's stack on every request to avoid ver…
Dec 5, 2019
f086b81
use a connection context for tests
Dec 9, 2019
fd3417b
black it up
Dec 30, 2019
d71eff5
Merge branch 'master' into execute-query-in-rq
Dec 30, 2019
33af0b3
run RQ on all queues when running in Cypress
Dec 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .circleci/docker-compose.cypress.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ services:
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "default periodic schemas"
celery_worker:
build: ../
command: celery_worker
Expand Down
12 changes: 11 additions & 1 deletion redash/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ def __init__(self, *args, **kwargs):


def create_app():
from . import authentication, extensions, handlers, limiter, mail, migrate, security
from . import (
authentication,
extensions,
handlers,
limiter,
mail,
migrate,
security,
tasks,
)
from .handlers.webpack import configure_webpack
from .metrics import request as request_metrics
from .models import db, users
Expand All @@ -47,5 +56,6 @@ def create_app():
configure_webpack(app)
extensions.init_app(app)
users.init_app(app)
tasks.init_app(app)

return app
9 changes: 5 additions & 4 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

from click import argument
from flask.cli import AppGroup
from rq import Connection, Worker
from rq import Connection
from sqlalchemy.orm import configure_mappers

from redash import rq_redis_connection
from redash.schedule import (
from redash.tasks import Worker
from redash.tasks.schedule import (
rq_scheduler,
schedule_periodic_jobs,
periodic_job_definitions,
Expand All @@ -34,10 +35,10 @@ def worker(queues):
configure_mappers()

if not queues:
queues = ["periodic", "emails", "default", "schemas"]
queues = ["scheduled_queries", "queries", "periodic", "emails", "default", "schemas"]

with Connection(rq_redis_connection):
w = Worker(queues, log_job_description=False)
w = Worker(queues, log_job_description=False, job_monitoring_interval=5)
w.work()


Expand Down
12 changes: 7 additions & 5 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
require_permission,
require_any_of_permission,
view_only,
view_only,
)
from redash.tasks import QueryTask
from redash.tasks import Job
from redash.tasks.queries import enqueue_query
from redash.utils import (
collect_parameters_from_request,
Expand All @@ -35,6 +36,7 @@
serialize_query_result,
serialize_query_result_to_dsv,
serialize_query_result_to_xlsx,
serialize_job,
)


Expand Down Expand Up @@ -119,7 +121,7 @@ def run_query(query, parameters, data_source, query_id, max_age=0):
"Query ID": query_id,
},
)
return {"job": job.to_dict()}
return serialize_job(job)


def get_download_filename(query_result, query, filetype):
Expand Down Expand Up @@ -441,12 +443,12 @@ def get(self, job_id, query_id=None):
"""
Retrieve info about a running query job.
"""
job = QueryTask(job_id=job_id)
return {"job": job.to_dict()}
job = Job.fetch(job_id)
return serialize_job(job)

def delete(self, job_id):
"""
Cancel a query job in progress.
"""
job = QueryTask(job_id=job_id)
job = Job.fetch(job_id)
job.cancel()
6 changes: 3 additions & 3 deletions redash/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_queues_status():
},
**{
queue.name: {"size": len(queue)}
for queue in Queue.all(connection=rq_redis_connection)
for queue in Queue.all()
},
}

Expand Down Expand Up @@ -166,7 +166,7 @@ def rq_queues():
"started": fetch_jobs(q, StartedJobRegistry(queue=q).get_job_ids()),
"queued": len(q.job_ids),
}
for q in Queue.all(connection=rq_redis_connection)
for q in Queue.all()
}


Expand All @@ -189,7 +189,7 @@ def rq_workers():
"failed_jobs": w.failed_job_count,
"total_working_time": w.total_working_time,
}
for w in Worker.all(connection=rq_redis_connection)
for w in Worker.all()
]


Expand Down
38 changes: 38 additions & 0 deletions redash/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from funcy import project

from flask_login import current_user
from rq.job import JobStatus
from rq.timeouts import JobTimeoutException

from redash import models
from redash.permissions import has_access, view_only
Expand Down Expand Up @@ -263,3 +265,39 @@ def serialize_dashboard(obj, with_widgets=False, user=None, with_favorite_state=
d["is_favorite"] = models.Favorite.is_favorite(current_user.id, obj)

return d


def serialize_job(job):
# TODO: this is mapping to the old Job class statuses. Need to update the client side and remove this
STATUSES = {
JobStatus.QUEUED: 1,
JobStatus.STARTED: 2,
JobStatus.FINISHED: 3,
JobStatus.FAILED: 4,
}

job_status = job.get_status()
if job.is_started:
updated_at = job.started_at or 0
else:
updated_at = 0

status = STATUSES[job_status]

if isinstance(job.result, Exception):
error = str(job.result)
status = 4
elif job.is_cancelled:
error = "Query execution cancelled."
else:
error = ""

return {
"job": {
"id": job.id,
"updated_at": updated_at,
"status": status,
"error": error,
"query_result_id": job.result if job.is_finished and not error else None,
}
}
12 changes: 11 additions & 1 deletion redash/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
purge_failed_jobs,
)
from .queries import (
QueryTask,
enqueue_query,
execute_query,
refresh_queries,
Expand All @@ -16,3 +15,14 @@
)
from .alerts import check_alerts_for_query
from .failure_report import send_aggregated_errors
from .worker import Worker, Queue, Job
from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions

from redash import rq_redis_connection
from rq.connections import push_connection, pop_connection


def init_app(app):
app.before_request(lambda: push_connection(rq_redis_connection))
app.teardown_request(lambda _: pop_connection())

2 changes: 1 addition & 1 deletion redash/tasks/queries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
cleanup_query_results,
empty_schedules,
)
from .execution import QueryTask, execute_query, enqueue_query
from .execution import execute_query, enqueue_query
Loading