Skip to content

Commit

Permalink
avoid using exists+fetch and use exceptions instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Omer Lachish committed Feb 27, 2020
1 parent 57b1d4f commit 24ca387
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
17 changes: 8 additions & 9 deletions redash/tasks/queries/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from rq import get_current_job
from rq.job import JobStatus
from rq.timeouts import JobTimeoutException
from rq.exceptions import NoSuchJobError

from redash import models, redis_connection, settings
from redash.query_runner import InterruptException
Expand Down Expand Up @@ -43,24 +44,22 @@ def enqueue_query(
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
if job_id:
logger.info("[%s] Found existing job: %s", query_hash, job_id)
job_exists = Job.exists(job_id)
job_complete = None

if job_exists:
try:
job = Job.fetch(job_id)
job_exists = True
status = job.get_status()
job_complete = status in [JobStatus.FINISHED, JobStatus.FAILED]

if job_complete:
logger.info(
"[%s] job found is complete (%s), removing lock",
query_hash,
status,
)
else:
logger.info("[%s] job found has expired, removing lock", query_hash)
message = "job found is complete (%s)" % status
except NoSuchJobError:
message = "job found has expired"
job_exists = False

if job_complete or not job_exists:
logger.info("[%s] %s, removing lock", query_hash, message)
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
job = None

Expand Down
13 changes: 7 additions & 6 deletions tests/tasks/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from mock import patch, Mock

from rq import Connection
from rq.exceptions import NoSuchJobError

from tests import BaseTestCase
from redash import redis_connection, rq_redis_connection, models
Expand Down Expand Up @@ -33,11 +34,10 @@ def create_job(*args, **kwargs):
return Job(connection=rq_redis_connection)


@patch("redash.tasks.queries.execution.Job.exists", return_value=True)
@patch("redash.tasks.queries.execution.Job.fetch", side_effect=fetch_job)
@patch("redash.tasks.queries.execution.Queue.enqueue", side_effect=create_job)
class TestEnqueueTask(BaseTestCase):
def test_multiple_enqueue_of_same_query(self, enqueue, _, __):
def test_multiple_enqueue_of_same_query(self, enqueue, _):
query = self.factory.create_query()

with Connection(rq_redis_connection):
Expand Down Expand Up @@ -68,7 +68,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _, __):

self.assertEqual(1, enqueue.call_count)

def test_multiple_enqueue_of_expired_job(self, enqueue, _, exists):
def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job):
query = self.factory.create_query()

with Connection(rq_redis_connection):
Expand All @@ -81,7 +81,8 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, _, exists):
{"Username": "Arik", "Query ID": query.id},
)

exists.return_value = False
# "expire" the previous job
fetch_job.side_effect = NoSuchJobError

enqueue_query(
query.query_text,
Expand All @@ -95,7 +96,7 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, _, exists):
self.assertEqual(2, enqueue.call_count)

@patch("redash.settings.dynamic_settings.query_time_limit", return_value=60)
def test_limits_query_time(self, _, enqueue, __, ___):
def test_limits_query_time(self, _, enqueue, __):
query = self.factory.create_query()

with Connection(rq_redis_connection):
Expand All @@ -111,7 +112,7 @@ def test_limits_query_time(self, _, enqueue, __, ___):
_, kwargs = enqueue.call_args
self.assertEqual(60, kwargs.get("job_timeout"))

def test_multiple_enqueue_of_different_query(self, enqueue, _, __):
def test_multiple_enqueue_of_different_query(self, enqueue, _):
query = self.factory.create_query()

with Connection(rq_redis_connection):
Expand Down

0 comments on commit 24ca387

Please sign in to comment.