From 6160a3fdffdcebe618191462633414c0dff7de30 Mon Sep 17 00:00:00 2001 From: Bogdan Date: Mon, 13 Mar 2017 13:54:38 -0700 Subject: [PATCH] Implement stop query functionality. (#2387) * Implement stop query functionality. * Address comments --- superset/assets/javascripts/SqlLab/actions.js | 18 ++++++++++++++++++ .../SqlLab/components/SqlEditor.jsx | 2 +- superset/db_engine_specs.py | 19 ++++++++++++++++++- superset/db_engines/presto.py | 19 +++++++++++++++++++ superset/sql_lab.py | 7 +++++++ superset/utils.py | 2 +- superset/views/core.py | 14 ++++++++++++++ 7 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 superset/db_engines/presto.py diff --git a/superset/assets/javascripts/SqlLab/actions.js b/superset/assets/javascripts/SqlLab/actions.js index d7d20f49d3e1c..c6d48445fa890 100644 --- a/superset/assets/javascripts/SqlLab/actions.js +++ b/superset/assets/javascripts/SqlLab/actions.js @@ -142,6 +142,24 @@ export function runQuery(query) { }; } +export function postStopQuery(query) { + return function (dispatch) { + const stopQueryUrl = '/superset/stop_query/'; + const stopQueryRequestData = { client_id: query.id }; + $.ajax({ + type: 'POST', + dataType: 'json', + url: stopQueryUrl, + data: stopQueryRequestData, + success() { + if (!query.runAsync) { + dispatch(stopQuery(query)); + } + }, + }); + }; +} + export function setDatabases(databases) { return { type: SET_DATABASES, databases }; } diff --git a/superset/assets/javascripts/SqlLab/components/SqlEditor.jsx b/superset/assets/javascripts/SqlLab/components/SqlEditor.jsx index aacb21681257f..cc72b7774d021 100644 --- a/superset/assets/javascripts/SqlLab/components/SqlEditor.jsx +++ b/superset/assets/javascripts/SqlLab/components/SqlEditor.jsx @@ -83,7 +83,7 @@ class SqlEditor extends React.PureComponent { this.props.actions.setActiveSouthPaneTab('Results'); } stopQuery() { - this.props.actions.stopQuery(this.props.latestQuery); + this.props.actions.postStopQuery(this.props.latestQuery); } createTableAs() { this.startQuery(true, true); diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 6b9efc3bcd421..533ae74e927c3 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -29,6 +29,7 @@ from sqlalchemy import select from sqlalchemy.sql import text from superset.utils import SupersetTemplateException +from superset.utils import QueryStatus from flask_babel import lazy_gettext as _ Grain = namedtuple('Grain', 'name label function') @@ -272,6 +273,12 @@ class PrestoEngineSpec(BaseEngineSpec): "date_add('day', 1, CAST({col} AS TIMESTAMP))))"), ) + @classmethod + def patch(cls): + from pyhive import presto + from superset.db_engines import presto as patched_presto + presto.Cursor.cancel = patched_presto.cancel + @classmethod def sql_preprocessor(cls, sql): return sql.replace('%', '%%') @@ -342,6 +349,12 @@ def handle_cursor(cls, cursor, query, session): while polled: # Update the object and wait for the kill signal. stats = polled.get('stats', {}) + + query = session.query(type(query)).filter_by(id=query.id).one() + if query.status == QueryStatus.STOPPED: + cursor.cancel() + break + if stats: completed_splits = float(stats.get('completedSplits')) total_splits = float(stats.get('totalSplits')) @@ -566,13 +579,17 @@ def progress(cls, logs): def handle_cursor(cls, cursor, query, session): """Updates progress information""" from pyhive import hive - print("PATCHED TCLIService {}".format(hive.TCLIService.__file__)) unfinished_states = ( hive.ttypes.TOperationState.INITIALIZED_STATE, hive.ttypes.TOperationState.RUNNING_STATE, ) polled = cursor.poll() while polled.operationState in unfinished_states: + query = session.query(type(query)).filter_by(id=query.id) + if query.status == QueryStatus.STOPPED: + cursor.cancel() + break + resp = cursor.fetch_logs() if resp and resp.log: progress = cls.progress(resp.log) diff --git a/superset/db_engines/presto.py b/superset/db_engines/presto.py new file mode 100644 index 0000000000000..e6e84b1aa1b7a --- /dev/null +++ b/superset/db_engines/presto.py @@ -0,0 +1,19 @@ +from pyhive import presto + + +# TODO(bogdan): Remove this when new pyhive release will be available. +def cancel(self): + if self._state == self._STATE_NONE: + raise presto.ProgrammingError("No query yet") + if self._nextUri is None: + assert self._state == self._STATE_FINISHED, \ + "Should be finished if nextUri is None" + return + + response = presto.requests.delete(self._nextUri) + if response.status_code != presto.requests.codes.no_content: + fmt = "Unexpected status code after cancel {}\n{}" + raise presto.OperationalError( + fmt.format(response.status_code, response.content)) + self._state = self._STATE_FINISHED + return diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 41a6b2e357c75..5ea3f14ac3804 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -132,6 +132,13 @@ def handle_error(msg): conn.commit() conn.close() + if query.status == utils.QueryStatus.STOPPED: + return json.dumps({ + 'query_id': query.id, + 'status': query.status, + 'query': query.to_dict(), + }, default=utils.json_iso_dttm_ser) + column_names = ( [col[0] for col in cursor.description] if cursor.description else []) column_names = dedup(column_names) diff --git a/superset/utils.py b/superset/utils.py index 9d7f50d646974..b2a724cf8dc38 100644 --- a/superset/utils.py +++ b/superset/utils.py @@ -442,7 +442,7 @@ class QueryStatus(object): """Enum-type class for query statuses""" - CANCELLED = 'cancelled' + STOPPED = 'stopped' FAILED = 'failed' PENDING = 'pending' RUNNING = 'running' diff --git a/superset/views/core.py b/superset/views/core.py index bc19519106664..eaca53b2d413d 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -1960,6 +1960,20 @@ def results(self, key): return json_success( json.dumps(payload_json, default=utils.json_iso_dttm_ser)) + @has_access_api + @expose("/stop_query/", methods=['POST']) + @log_this + def stop_query(self): + client_id = request.form.get('client_id') + query = db.session.query(models.Query).filter_by( + client_id=client_id).one() + if query.user_id != g.user.id: + return json_error_response( + "Only original author can stop the query.") + query.status = utils.QueryStatus.STOPPED + db.session.commit() + return Response(201) + @has_access_api @expose("/sql_json/", methods=['POST', 'GET']) @log_this