Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform cleanup on job timeouts #4681

Merged
merged 5 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from redash import settings
from redash.utils import json_loads
from rq.timeouts import JobTimeoutException

logger = logging.getLogger(__name__)

__all__ = [
"BaseQueryRunner",
"BaseHTTPQueryRunner",
"InterruptException",
"JobTimeoutException",
"BaseSQLQueryRunner",
"TYPE_DATETIME",
"TYPE_BOOLEAN",
Expand Down
10 changes: 2 additions & 8 deletions redash/query_runner/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,10 @@ def run_query(self, query, user):
}
json_data = json_dumps(data, ignore_nan=True)
error = None
except (KeyboardInterrupt, InterruptException):
except Exception:
if cursor.query_id:
cursor.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as ex:
if cursor.query_id:
cursor.cancel()
error = str(ex)
json_data = None
raise

return json_data, error

Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/axibase_tsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ def run_query(self, query, user):
except SQLException as e:
json_data = None
error = e.content
except (KeyboardInterrupt, InterruptException):
except (KeyboardInterrupt, InterruptException, JobTimeoutException):
sql.cancel_query(query_id)
error = "Query cancelled by user."
json_data = None
raise

return json_data, error

Expand Down
3 changes: 0 additions & 3 deletions redash/query_runner/azure_kusto.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ def run_query(self, query, user):
error = err.args[1][0]["error"]["@message"]
except (IndexError, KeyError):
error = err.args[1]
except KeyboardInterrupt:
json_data = None
error = "Query cancelled by user."

return json_data, error

Expand Down
3 changes: 0 additions & 3 deletions redash/query_runner/big_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,6 @@ def run_query(self, query, user):
error = json_loads(e.content)["error"]["message"]
else:
error = e.content
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None

return json_data, error

Expand Down
77 changes: 36 additions & 41 deletions redash/query_runner/cass.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,47 +93,42 @@ def get_schema(self, get_stats=False):

def run_query(self, query, user):
connection = None
try:
if self.configuration.get("username", "") and self.configuration.get(
"password", ""
):
auth_provider = PlainTextAuthProvider(
username="{}".format(self.configuration.get("username", "")),
password="{}".format(self.configuration.get("password", "")),
)
connection = Cluster(
[self.configuration.get("host", "")],
auth_provider=auth_provider,
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
else:
connection = Cluster(
[self.configuration.get("host", "")],
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
session = connection.connect()
session.set_keyspace(self.configuration["keyspace"])
session.default_timeout = self.configuration.get("timeout", 10)
logger.debug("Cassandra running query: %s", query)
result = session.execute(query)

column_names = result.column_names

columns = self.fetch_columns([(c, "string") for c in column_names])

rows = [dict(zip(column_names, row)) for row in result]

data = {"columns": columns, "rows": rows}
json_data = json_dumps(data, cls=CassandraJSONEncoder)

error = None
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None

return json_data, error

if self.configuration.get("username", "") and self.configuration.get(
"password", ""
):
auth_provider = PlainTextAuthProvider(
username="{}".format(self.configuration.get("username", "")),
password="{}".format(self.configuration.get("password", "")),
)
connection = Cluster(
[self.configuration.get("host", "")],
auth_provider=auth_provider,
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
else:
connection = Cluster(
[self.configuration.get("host", "")],
port=self.configuration.get("port", ""),
protocol_version=self.configuration.get("protocol", 3),
)
session = connection.connect()
session.set_keyspace(self.configuration["keyspace"])
session.default_timeout = self.configuration.get("timeout", 10)
logger.debug("Cassandra running query: %s", query)
result = session.execute(query)

column_names = result.column_names

columns = self.fetch_columns([(c, "string") for c in column_names])

rows = [dict(zip(column_names, row)) for row in result]

data = {"columns": columns, "rows": rows}
json_data = json_dumps(data, cls=CassandraJSONEncoder)

return json_data, None


class ScyllaDB(Cassandra):
Expand Down
11 changes: 4 additions & 7 deletions redash/query_runner/couchbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,12 @@ def call_service(self, query, user):
raise Exception("Couchbase connection error")

def run_query(self, query, user):
try:
result = self.call_service(query, user)
result = self.call_service(query, user)

rows, columns = parse_results(result.json()["results"])
data = {"columns": columns, "rows": rows}
rows, columns = parse_results(result.json()["results"])
data = {"columns": columns, "rows": rows}

return json_dumps(data), None
except KeyboardInterrupt:
return None, "Query cancelled by user."
return json_dumps(data), None

@classmethod
def name(cls):
Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ def run_query(self, query, user):
except ibm_db_dbi.DatabaseError as e:
error = str(e)
json_data = None
except (KeyboardInterrupt, InterruptException):
except (KeyboardInterrupt, InterruptException, JobTimeoutException):
connection.cancel()
error = "Query cancelled by user."
json_data = None
raise
finally:
connection.close()

Expand Down
19 changes: 8 additions & 11 deletions redash/query_runner/drill.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,17 @@ def configuration_schema(cls):
def run_query(self, query, user):
drill_url = os.path.join(self.configuration["url"], "query.json")

try:
payload = {"queryType": "SQL", "query": query}
payload = {"queryType": "SQL", "query": query}

response, error = self.get_response(
drill_url, http_method="post", json=payload
)
if error is not None:
return None, error
response, error = self.get_response(
drill_url, http_method="post", json=payload
)
if error is not None:
return None, error

results = parse_response(response.json())
results = parse_response(response.json())

return json_dumps(results), None
except KeyboardInterrupt:
return None, "Query cancelled by user."
return json_dumps(results), None

def get_schema(self, get_stats=False):

Expand Down
8 changes: 2 additions & 6 deletions redash/query_runner/dynamodb_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,10 @@ def run_query(self, query, user):
e.lineno, e.column, e.line
)
json_data = None
except (SyntaxError, RuntimeError) as e:
error = str(e)
json_data = None
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
if engine and engine.connection:
engine.connection.cancel()
error = "Query cancelled by user."
json_data = None
raise

return json_data, error

Expand Down
8 changes: 2 additions & 6 deletions redash/query_runner/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,6 @@ def run_query(self, query, user):
raise Exception("Advanced queries are not supported")

json_data = json_dumps({"columns": result_columns, "rows": result_rows})
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(
Expand Down Expand Up @@ -497,10 +494,9 @@ def run_query(self, query, user):
)

json_data = json_dumps({"columns": result_columns, "rows": result_rows})
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
logger.exception(e)
error = "Query cancelled by user."
json_data = None
raise
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(
Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/hive_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,10 @@ def run_query(self, query, user):
data = {"columns": columns, "rows": rows}
json_data = json_dumps(data)
error = None
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
if connection:
connection.cancel()
error = "Query cancelled by user."
json_data = None
raise
except DatabaseError as e:
try:
error = e.args[0].status.errorMessage
Expand Down
5 changes: 2 additions & 3 deletions redash/query_runner/impala_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ def run_query(self, query, user):
except RPCError as e:
json_data = None
error = "Metastore Error [%s]" % str(e)
except KeyboardInterrupt:
except (KeyboardInterrupt, JobTimeoutException):
connection.cancel()
error = "Query cancelled by user."
json_data = None
raise
finally:
if connection:
connection.close()
Expand Down
59 changes: 28 additions & 31 deletions redash/query_runner/jql.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,44 +166,41 @@ def __init__(self, configuration):
def run_query(self, query, user):
jql_url = "{}/rest/api/2/search".format(self.configuration["url"])

try:
query = json_loads(query)
query_type = query.pop("queryType", "select")
field_mapping = FieldMapping(query.pop("fieldMapping", {}))

if query_type == "count":
query["maxResults"] = 1
query["fields"] = ""
else:
query["maxResults"] = query.get("maxResults", 1000)
query = json_loads(query)
query_type = query.pop("queryType", "select")
field_mapping = FieldMapping(query.pop("fieldMapping", {}))

response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error
if query_type == "count":
query["maxResults"] = 1
query["fields"] = ""
else:
query["maxResults"] = query.get("maxResults", 1000)

data = response.json()
response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error

if query_type == "count":
results = parse_count(data)
else:
results = parse_issues(data, field_mapping)
index = data["startAt"] + data["maxResults"]
data = response.json()

while data["total"] > index:
query["startAt"] = index
response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error
if query_type == "count":
results = parse_count(data)
else:
results = parse_issues(data, field_mapping)
index = data["startAt"] + data["maxResults"]

while data["total"] > index:
query["startAt"] = index
response, error = self.get_response(jql_url, params=query)
if error is not None:
return None, error

data = response.json()
index = data["startAt"] + data["maxResults"]
data = response.json()
index = data["startAt"] + data["maxResults"]

addl_results = parse_issues(data, field_mapping)
results.merge(addl_results)
addl_results = parse_issues(data, field_mapping)
results.merge(addl_results)

return results.to_json(), None
except KeyboardInterrupt:
return None, "Query cancelled by user."
return results.to_json(), None


register(JiraJQL)
Loading