From 2f34ebded410059d08bb2985b245d0df49f40d40 Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Thu, 4 Jan 2018 16:09:35 -0800 Subject: [PATCH 1/6] Use druiddb --- superset/connectors/sqla/models.py | 92 +++++++++++++++++++++--------- superset/db_engine_specs.py | 5 +- 2 files changed, 68 insertions(+), 29 deletions(-) diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index e0952288ffdcb..60780fe712efe 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -528,37 +528,73 @@ def get_sqla_query( # sqla if is_timeseries and \ timeseries_limit and groupby and not time_groupby_inline: - # some sql dialects require for order by expressions - # to also be in the select clause -- others, e.g. vertica, - # require a unique inner alias - inner_main_metric_expr = main_metric_expr.label('mme_inner__') - inner_select_exprs += [inner_main_metric_expr] - subq = select(inner_select_exprs) - subq = subq.select_from(tbl) - inner_time_filter = dttm_col.get_time_filter( - inner_from_dttm or from_dttm, - inner_to_dttm or to_dttm, - ) - subq = subq.where(and_(*(where_clause_and + [inner_time_filter]))) - subq = subq.group_by(*inner_groupby_exprs) - - ob = inner_main_metric_expr - if timeseries_limit_metric: - timeseries_limit_metric = metrics_dict.get(timeseries_limit_metric) - ob = timeseries_limit_metric.sqla_col - direction = desc if order_desc else asc - subq = subq.order_by(direction(ob)) - subq = subq.limit(timeseries_limit) - - on_clause = [] - for i, gb in enumerate(groupby): - on_clause.append( - groupby_exprs[i] == column(gb + '__')) - - tbl = tbl.join(subq.alias(), and_(*on_clause)) + if self.database.db_engine_spec.inner_joins: + # some sql dialects require for order by expressions + # to also be in the select clause -- others, e.g. vertica, + # require a unique inner alias + inner_main_metric_expr = main_metric_expr.label('mme_inner__') + inner_select_exprs += [inner_main_metric_expr] + subq = select(inner_select_exprs) + subq = subq.select_from(tbl) + inner_time_filter = dttm_col.get_time_filter( + inner_from_dttm or from_dttm, + inner_to_dttm or to_dttm, + ) + subq = subq.where( + and_(*(where_clause_and + [inner_time_filter]))) + subq = subq.group_by(*inner_groupby_exprs) + + ob = inner_main_metric_expr + if timeseries_limit_metric: + timeseries_limit_metric = metrics_dict.get( + timeseries_limit_metric) + ob = timeseries_limit_metric.sqla_col + direction = desc if order_desc else asc + subq = subq.order_by(direction(ob)) + subq = subq.limit(timeseries_limit) + + on_clause = [] + for i, gb in enumerate(groupby): + on_clause.append( + groupby_exprs[i] == column(gb + '__')) + + tbl = tbl.join(subq.alias(), and_(*on_clause)) + else: + # run subquery to get top groups + subquery_obj = { + 'is_timeseries': False, + 'row_limit': timeseries_limit, + 'groupby': groupby, + 'metrics': metrics, + 'granularity': granularity, + 'from_dttm': inner_from_dttm, + 'to_dttm': inner_to_dttm, + 'filter': filter, + 'orderby': orderby, + 'extras': extras, + 'columns': columns, + 'form_data': form_data, + 'order_desc': True, + } + result = self.query(subquery_obj) + dimensions = [c for c in result.df.columns if c not in metrics] + top_groups = self._get_top_groups(result.df, dimensions) + qry = qry.where(top_groups) return qry.select_from(tbl) + def _get_top_groups(self, df, dimensions): + cols = {col.column_name: col for col in self.columns} + groups = [] + for _, row in df.iterrows(): + group = [] + for dimension in dimensions: + col_obj = cols.get(dimension) + group.append(col_obj.sqla_col == row[dimension]) + groups.append(and_(*group)) + + return or_(*groups) + def query(self, query_obj): qry_start_dttm = datetime.now() sql = self.get_query_str(query_obj) diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 158d9d6cfc9cc..f6d3e592d4131 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -62,6 +62,7 @@ class BaseEngineSpec(object): time_groupby_inline = False limit_method = LimitMethod.FETCH_MANY time_secondary_columns = False + inner_joins = True @classmethod def fetch_data(cls, cursor, limit): @@ -823,7 +824,8 @@ def get_column_names(filepath): [s + ' STRING ' for s in column_names]) s3 = boto3.client('s3') - location = os.path.join('s3a://', bucket_path, upload_prefix, table_name) + location = os.path.join('s3a://', bucket_path, + upload_prefix, table_name) s3.upload_file( upload_path, 'airbnb-superset', os.path.join(upload_prefix, table_name, filename)) @@ -1221,6 +1223,7 @@ class DruidEngineSpec(BaseEngineSpec): """Engine spec for Druid.io""" engine = 'druid' limit_method = LimitMethod.FETCH_MANY + inner_joins = False engines = { From 6a99e8a769dd0659960785c845566abecd95e061 Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Thu, 4 Jan 2018 16:13:20 -0800 Subject: [PATCH 2/6] Remove auto formatting --- superset/connectors/sqla/models.py | 6 ++---- superset/db_engine_specs.py | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index 60780fe712efe..1b9414548a612 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -540,14 +540,12 @@ def get_sqla_query( # sqla inner_from_dttm or from_dttm, inner_to_dttm or to_dttm, ) - subq = subq.where( - and_(*(where_clause_and + [inner_time_filter]))) + subq = subq.where(and_(*(where_clause_and + [inner_time_filter]))) subq = subq.group_by(*inner_groupby_exprs) ob = inner_main_metric_expr if timeseries_limit_metric: - timeseries_limit_metric = metrics_dict.get( - timeseries_limit_metric) + timeseries_limit_metric = metrics_dict.get(timeseries_limit_metric) ob = timeseries_limit_metric.sqla_col direction = desc if order_desc else asc subq = subq.order_by(direction(ob)) diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index f6d3e592d4131..5284a1b580b51 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -824,8 +824,7 @@ def get_column_names(filepath): [s + ' STRING ' for s in column_names]) s3 = boto3.client('s3') - location = os.path.join('s3a://', bucket_path, - upload_prefix, table_name) + location = os.path.join('s3a://', bucket_path, upload_prefix, table_name) s3.upload_file( upload_path, 'airbnb-superset', os.path.join(upload_prefix, table_name, filename)) From bd096ac4513e5bcdd67826c00a132aa70d0ad3f4 Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Fri, 5 Jan 2018 11:08:12 -0800 Subject: [PATCH 3/6] Show prequeries --- superset/connectors/sqla/models.py | 32 +++++++++++++++++++++--------- superset/views/core.py | 8 +++++++- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index 1b9414548a612..d03dd5e93fbc7 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -323,9 +323,9 @@ def get_template_processor(self, **kwargs): return get_template_processor( table=self, database=self.database, **kwargs) - def get_query_str(self, query_obj): + def get_query_str(self, query_obj, prequeries=None, is_prequery=False): engine = self.database.get_sqla_engine() - qry = self.get_sqla_query(**query_obj) + qry = self.get_sqla_query(prequeries=prequeries, is_prequery=is_prequery, **query_obj) sql = six.text_type( qry.compile( engine, @@ -334,6 +334,8 @@ def get_query_str(self, query_obj): ) logging.info(sql) sql = sqlparse.format(sql, reindent=True) + if is_prequery and prequeries is not None: + prequeries.append(sql) return sql def get_sqla_table(self): @@ -369,7 +371,10 @@ def get_sqla_query( # sqla extras=None, columns=None, form_data=None, - order_desc=True): + order_desc=True, + prequeries=None, + is_prequery=False, + ): """Querying any sqla table from this common interface""" template_kwargs = { 'from_dttm': from_dttm, @@ -565,8 +570,8 @@ def get_sqla_query( # sqla 'groupby': groupby, 'metrics': metrics, 'granularity': granularity, - 'from_dttm': inner_from_dttm, - 'to_dttm': inner_to_dttm, + 'from_dttm': inner_from_dttm or from_dttm, + 'to_dttm': inner_to_dttm or to_dttm, 'filter': filter, 'orderby': orderby, 'extras': extras, @@ -574,7 +579,7 @@ def get_sqla_query( # sqla 'form_data': form_data, 'order_desc': True, } - result = self.query(subquery_obj) + result = self.query(subquery_obj, prequeries, is_prequery=True) dimensions = [c for c in result.df.columns if c not in metrics] top_groups = self._get_top_groups(result.df, dimensions) qry = qry.where(top_groups) @@ -584,7 +589,7 @@ def get_sqla_query( # sqla def _get_top_groups(self, df, dimensions): cols = {col.column_name: col for col in self.columns} groups = [] - for _, row in df.iterrows(): + for unused, row in df.iterrows(): group = [] for dimension in dimensions: col_obj = cols.get(dimension) @@ -593,9 +598,13 @@ def _get_top_groups(self, df, dimensions): return or_(*groups) - def query(self, query_obj): + def query(self, query_obj, prequeries=None, is_prequery=False): qry_start_dttm = datetime.now() - sql = self.get_query_str(query_obj) + + # run query storing any prequeries for 2-phase backends + prequeries = prequeries or [] + sql = self.get_query_str(query_obj, prequeries, is_prequery) + status = QueryStatus.SUCCESS error_message = None df = None @@ -607,6 +616,11 @@ def query(self, query_obj): error_message = ( self.database.db_engine_spec.extract_error_message(e)) + # if this is a main query with prequeries, combine them together + if not is_prequery and prequeries: + prequeries.append(sql) + sql = ';\n\n'.join(prequeries) + ';' + return QueryResult( status=status, df=df, diff --git a/superset/views/core.py b/superset/views/core.py index 00254b4ca27ba..dc71d8a798f2a 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -986,9 +986,15 @@ def slice(self, slice_id): def get_query_string_response(self, viz_obj): try: query_obj = viz_obj.query_obj() - query = viz_obj.datasource.get_query_str(query_obj) + prequeries = [] + query = viz_obj.datasource.get_query_str(query_obj, prequeries) except Exception as e: return json_error_response(e) + + if prequeries: + prequeries.append(query) + query = ';\n\n'.join(prequeries) + ';' + return Response( json.dumps({ 'query': query, From 25a6d3fb2fdb19bf6735f5d7add3a67b776c5079 Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Fri, 5 Jan 2018 11:39:43 -0800 Subject: [PATCH 4/6] Fix subtle bug with lists --- superset/connectors/sqla/models.py | 6 ++++-- superset/views/core.py | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index d03dd5e93fbc7..3a1b3103c9ae2 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -602,7 +602,8 @@ def query(self, query_obj, prequeries=None, is_prequery=False): qry_start_dttm = datetime.now() # run query storing any prequeries for 2-phase backends - prequeries = prequeries or [] + if prequeries is None: + prequeries = [] sql = self.get_query_str(query_obj, prequeries, is_prequery) status = QueryStatus.SUCCESS @@ -619,7 +620,8 @@ def query(self, query_obj, prequeries=None, is_prequery=False): # if this is a main query with prequeries, combine them together if not is_prequery and prequeries: prequeries.append(sql) - sql = ';\n\n'.join(prequeries) + ';' + sql = ';\n\n'.join(prequeries) + sql += ';' return QueryResult( status=status, diff --git a/superset/views/core.py b/superset/views/core.py index dc71d8a798f2a..655004867e070 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -993,7 +993,8 @@ def get_query_string_response(self, viz_obj): if prequeries: prequeries.append(query) - query = ';\n\n'.join(prequeries) + ';' + query = ';\n\n'.join(prequeries) + query += ';' return Response( json.dumps({ From d898c1ca7572b31d22bdf4f8201e0a9bc342720d Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Fri, 5 Jan 2018 12:44:44 -0800 Subject: [PATCH 5/6] Move arguments to query object --- superset/connectors/sqla/models.py | 27 ++++++++++++--------------- superset/views/core.py | 9 ++++----- superset/viz.py | 2 ++ 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index 3a1b3103c9ae2..15b8cbbe7918c 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -323,9 +323,9 @@ def get_template_processor(self, **kwargs): return get_template_processor( table=self, database=self.database, **kwargs) - def get_query_str(self, query_obj, prequeries=None, is_prequery=False): + def get_query_str(self, query_obj): engine = self.database.get_sqla_engine() - qry = self.get_sqla_query(prequeries=prequeries, is_prequery=is_prequery, **query_obj) + qry = self.get_sqla_query(**query_obj) sql = six.text_type( qry.compile( engine, @@ -334,8 +334,8 @@ def get_query_str(self, query_obj, prequeries=None, is_prequery=False): ) logging.info(sql) sql = sqlparse.format(sql, reindent=True) - if is_prequery and prequeries is not None: - prequeries.append(sql) + if query_obj['is_prequery']: + query_obj['prequeries'].append(sql) return sql def get_sqla_table(self): @@ -565,6 +565,8 @@ def get_sqla_query( # sqla else: # run subquery to get top groups subquery_obj = { + 'prequeries': prequeries, + 'is_prequery': True, 'is_timeseries': False, 'row_limit': timeseries_limit, 'groupby': groupby, @@ -579,7 +581,7 @@ def get_sqla_query( # sqla 'form_data': form_data, 'order_desc': True, } - result = self.query(subquery_obj, prequeries, is_prequery=True) + result = self.query(subquery_obj) dimensions = [c for c in result.df.columns if c not in metrics] top_groups = self._get_top_groups(result.df, dimensions) qry = qry.where(top_groups) @@ -598,14 +600,9 @@ def _get_top_groups(self, df, dimensions): return or_(*groups) - def query(self, query_obj, prequeries=None, is_prequery=False): + def query(self, query_obj): qry_start_dttm = datetime.now() - - # run query storing any prequeries for 2-phase backends - if prequeries is None: - prequeries = [] - sql = self.get_query_str(query_obj, prequeries, is_prequery) - + sql = self.get_query_str(query_obj) status = QueryStatus.SUCCESS error_message = None df = None @@ -618,9 +615,9 @@ def query(self, query_obj, prequeries=None, is_prequery=False): self.database.db_engine_spec.extract_error_message(e)) # if this is a main query with prequeries, combine them together - if not is_prequery and prequeries: - prequeries.append(sql) - sql = ';\n\n'.join(prequeries) + if not query_obj['is_prequery']: + query_obj['prequeries'].append(sql) + sql = ';\n\n'.join(query_obj['prequeries']) sql += ';' return QueryResult( diff --git a/superset/views/core.py b/superset/views/core.py index 655004867e070..30d647d112fd4 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -986,14 +986,13 @@ def slice(self, slice_id): def get_query_string_response(self, viz_obj): try: query_obj = viz_obj.query_obj() - prequeries = [] - query = viz_obj.datasource.get_query_str(query_obj, prequeries) + query = viz_obj.datasource.get_query_str(query_obj) except Exception as e: return json_error_response(e) - if prequeries: - prequeries.append(query) - query = ';\n\n'.join(prequeries) + if query_obj['prequeries']: + query_obj['prequeries'].append(query) + query = ';\n\n'.join(query_obj['prequeries']) query += ';' return Response( diff --git a/superset/viz.py b/superset/viz.py index 6551577de15c4..2b7df0e900fa1 100644 --- a/superset/viz.py +++ b/superset/viz.py @@ -200,6 +200,8 @@ def query_obj(self): 'timeseries_limit_metric': timeseries_limit_metric, 'form_data': form_data, 'order_desc': order_desc, + 'prequeries': [], + 'is_prequery': False, } return d From 3f77fb4226476f02dffa94e8fb92873d28d795c2 Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Fri, 5 Jan 2018 13:11:00 -0800 Subject: [PATCH 6/6] Fix druid run_query --- superset/connectors/druid/models.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index 0b150686c63b0..bc826f15a0d44 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -1015,7 +1015,10 @@ def run_query( # noqa / druid orderby=None, extras=None, # noqa columns=None, phase=2, client=None, form_data=None, - order_desc=True): + order_desc=True, + prequeries=None, + is_prequery=False, + ): """Runs a query against Druid and returns a dataframe. """ # TODO refactor into using a TBD Query object