Skip to content

Commit

Permalink
use stats_logger timing
Browse files Browse the repository at this point in the history
  • Loading branch information
timifasubaa committed Sep 11, 2018
1 parent 03edce5 commit 2989409
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
29 changes: 18 additions & 11 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def session_scope(nullpool):
@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
def get_sql_results(
ctask, query_id, rendered_query, return_results=True, store_results=False,
user_name=None):
user_name=None, start_time=None):
"""Executes the sql query returns the results."""
with session_scope(not ctask.request.called_directly) as session:

try:
return execute_sql(
ctask, query_id, rendered_query, return_results, store_results, user_name,
session=session)
session=session, start_time=start_time)
except Exception as e:
logging.exception(e)
stats_logger.incr('error_sqllab_unhandled')
Expand All @@ -97,12 +97,14 @@ def get_sql_results(

def execute_sql(
ctask, query_id, rendered_query, return_results=True, store_results=False,
user_name=None, session=None,
user_name=None, session=None, start_time=None,
):
"""Executes the sql query returns the results."""
time_at_worker = utils.now_as_float()
if store_results:
# only asynchronous queries
stats_logger.timing(
'sqllab.query.time_pending', utils.now_as_float() - start_time)
query = get_query(query_id, session)
query.time_at_worker = time_at_worker
payload = dict(query_id=query_id)

database = query.database
Expand Down Expand Up @@ -158,6 +160,7 @@ def handle_error(msg):

query.executed_sql = executed_sql
query.status = QueryStatus.RUNNING
query.start_running_time = utils.now_as_float()
session.merge(query)
session.commit()
logging.info("Set query to 'running'")
Expand All @@ -172,13 +175,15 @@ def handle_error(msg):
cursor = conn.cursor()
logging.info('Running query: \n{}'.format(executed_sql))
logging.info(query.executed_sql)
query.time_at_db = utils.now_as_float()
query_start_time = utils.now_as_float()
db_engine_spec.execute(cursor, query.executed_sql, async_=True)
logging.info('Handling cursor')
db_engine_spec.handle_cursor(cursor, query, session)
logging.info('Fetching data: {}'.format(query.to_dict()))
data = db_engine_spec.fetch_data(cursor, query.limit)
query.time_at_db_result = utils.now_as_float()
stats_logger.timing(
'sqllab.query.time_executing_query',
utils.now_as_float() - query_start_time)
except SoftTimeLimitExceeded as e:
logging.exception(e)
if conn is not None:
Expand All @@ -201,7 +206,7 @@ def handle_error(msg):
if query.status == utils.QueryStatus.STOPPED:
return handle_error('The query has been stopped')

cdf = dataframe.SupersetDataFrame(data, cursor.description, db_engine_spec)
cdf = dataframe.SupersetDataFrame(data, cursor_description, db_engine_spec)

query.rows = cdf.size
query.progress = 100
Expand All @@ -214,6 +219,7 @@ def handle_error(msg):
schema=database.force_ctas_schema,
show_cols=False,
latest_partition=False))
query.end_time = utils.now_as_float()
session.merge(query)
session.flush()

Expand All @@ -226,16 +232,17 @@ def handle_error(msg):
if store_results:
key = '{}'.format(uuid.uuid4())
logging.info('Storing results in results backend, key: {}'.format(key))
query.time_at_results_backend_write = utils.now_as_float()
write_to_results_backend_start = utils.now_as_float()
json_payload = json.dumps(
payload, default=utils.json_iso_dttm_ser, ignore_nan=True)
cache_timeout = database.cache_timeout
if cache_timeout is None:
cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0)
results_backend.set(key, utils.zlib_compress(json_payload), cache_timeout)
query.results_key = key
query.time_after_results_backend_write = utils.now_as_float()

stats_logger.timing(
'sqllab.query.results_backend_write',
utils.now_as_float() - write_to_results_backend_start)
session.merge(query)
session.commit()

Expand Down
8 changes: 7 additions & 1 deletion superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2334,7 +2334,12 @@ def results(self, key):
if not results_backend:
return json_error_response("Results backend isn't configured")

read_from_results_backend_start = utils.now_as_float()
blob = results_backend.get(key)
stats_logger.timing(
'sqllab.query.results_backend_read',
read_from_results_backend_start - utils.now_as_float(),
)
if not blob:
return json_error_response(
'Data could not be retrieved. '
Expand Down Expand Up @@ -2446,7 +2451,8 @@ def sql_json(self):
rendered_query,
return_results=False,
store_results=not query.select_as_cta,
user_name=g.user.username)
user_name=g.user.username,
start_time=utils.now_as_float())
except Exception as e:
logging.exception(e)
msg = (
Expand Down

0 comments on commit 2989409

Please sign in to comment.