Skip to content

Commit

Permalink
reduce FP style in refresh_queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Omer Lachish committed Feb 26, 2020
1 parent fea7814 commit ef2eb39
Showing 1 changed file with 50 additions and 59 deletions.
109 changes: 50 additions & 59 deletions redash/tasks/queries/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,82 +27,73 @@ def empty_schedules():
logger.info("Deleted %d schedules.", len(queries))


def _skip_unrefreshable_queries(queries):
for query in queries:
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logger.info("Disabled refresh queries.")
continue
elif query.org.is_disabled:
logger.debug("Skipping refresh of %s because org is disabled.", query.id)
continue
elif query.data_source is None:
logger.debug(
"Skipping refresh of %s because the datasource is none.", query.id
)
continue
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
def _should_refresh_query(query):
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logger.info("Disabled refresh queries.")
return False
elif query.org.is_disabled:
logger.debug("Skipping refresh of %s because org is disabled.", query.id)
return False
elif query.data_source is None:
logger.debug("Skipping refresh of %s because the datasource is none.", query.id)
return False
elif query.data_source.paused:
logger.debug(
"Skipping refresh of %s because datasource - %s is paused (%s).",
query.id,
query.data_source.name,
query.data_source.pause_reason,
)
return False
else:
return True


def _apply_default_parameters(query):
parameters = {p["name"]: p.get("value") for p in query.parameters}
if any(parameters):
try:
return query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
continue
track_failure(query, error)
raise
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
raise
else:
return query.query_text

yield query

def refresh_queries():
logger.info("Refreshing queries...")
enqueued = []
for query in models.Query.outdated_queries():
if not _should_refresh_query(query):
continue

def _apply_default_parameters(queries):
for query in queries:
parameters = {p["name"]: p.get("value") for p in query.parameters}
if any(parameters):
try:
query.query_text = query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(
query.id, str(e)
)
track_failure(query, error)
continue
except QueryDetachedFromDataSourceError as e:
error = (
"Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource."
).format(query.id, e.query_id)
track_failure(query, error)
continue

yield query


def _enqueue_queries(queries):
for query in queries:
try:
enqueue_query(
query.query_text,
_apply_default_parameters(query),
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"Query ID": query.id, "Username": "Scheduled"},
)

yield query
enqueued.append(query)
except Exception as e:
logging.info("Could not enqueue query %d due to %s", query.id, repr(e))


def refresh_queries():
logger.info("Refreshing queries...")

outdated = models.Query.outdated_queries()
refreshable = _skip_unrefreshable_queries(outdated)
queries = _apply_default_parameters(refreshable)
enqueued = list(_enqueue_queries(queries))

status = {
"outdated_queries_count": len(enqueued),
"last_refresh_at": time.time(),
"query_ids": json_dumps([q.id for q in enqueued])
"query_ids": json_dumps([q.id for q in enqueued]),
}

redis_connection.hmset("redash:status", status)
Expand Down

0 comments on commit ef2eb39

Please sign in to comment.