From 9fe8d7313696f7f6fb11a5b4317acb5ae6c85118 Mon Sep 17 00:00:00 2001 From: Fabian Date: Thu, 17 Aug 2017 21:31:58 -0400 Subject: [PATCH] Feature/Fix: Get a full times_series for your filter instead of Topn for each point in time --- superset/connectors/druid/models.py | 86 ++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index 4aadf06e4d19a..cc40b83f1f1ee 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -1,4 +1,4 @@ -# pylint: disable=invalid-unary-operand-type + # pylint: disable=invalid-unary-operand-type from collections import OrderedDict import json import logging @@ -798,6 +798,28 @@ def values_for_column(self, def get_query_str(self, query_obj, phase=1, client=None): return self.run_query(client=client, phase=phase, **query_obj) + def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter): + ret = dim_filter + if df is not None and not df.empty: + new_filters = [] + for unused, row in df.iterrows(): + fields = [] + for dim in dimensions: + f = Dimension(dim) == row[dim] + fields.append(f) + if len(fields) > 1: + term = Filter(type="and", fields=fields) + new_filters.append(term) + elif fields: + new_filters.append(fields[0]) + if new_filters: + ff = Filter(type="or", fields=new_filters) + if not dim_filter: + ret = ff + else: + ret = Filter(type="and", fields=[ff, dim_filter]) + return ret + def run_query( # noqa / druid self, groupby, metrics, @@ -834,7 +856,9 @@ def run_query( # noqa / druid columns_dict = {c.column_name: c for c in self.columns} - all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict) + all_metrics, post_aggs = self._metrics_and_post_aggs( + metrics, + metrics_dict) aggregations = OrderedDict() for m in self.metrics: @@ -884,15 +908,41 @@ def run_query( # noqa / druid if having_filters: qry['having'] = having_filters order_direction = "descending" if order_desc else "ascending" - orig_filters = filters if len(groupby) == 0 and not having_filters: del qry['dimensions'] client.timeseries(**qry) if not having_filters and len(groupby) == 1 and order_desc: + dim = list(qry.get('dimensions'))[0] + if timeseries_limit_metric: + order_by = timeseries_limit_metric + else: + order_by = list(qry['aggregations'].keys())[0] + # Limit on the number of timeseries, doing a two-phases query + pre_qry = deepcopy(qry) + pre_qry['granularity'] = "all" + pre_qry['threshold'] = min(row_limit, + timeseries_limit or row_limit) + pre_qry['metric'] = order_by + pre_qry['dimension'] = dim + del pre_qry['dimensions'] + client.topn(**pre_qry) + query_str += "// Two phase query\n// Phase 1\n" + query_str += json.dumps( + client.query_builder.last_query.query_dict, indent=2) + query_str += "\n" + if phase == 1: + return query_str + query_str += ( + "//\nPhase 2 (built based on phase one's results)\n") + df = client.export_pandas() + qry['filter'] = self._add_filter_from_pre_query_data( + df, + qry['dimensions'], filters) qry['threshold'] = timeseries_limit or 1000 if row_limit and granularity == 'all': qry['threshold'] = row_limit qry['dimension'] = list(qry.get('dimensions'))[0] + qry['dimension'] = dim del qry['dimensions'] qry['metric'] = list(qry['aggregations'].keys())[0] client.topn(**qry) @@ -908,7 +958,7 @@ def run_query( # noqa / druid pre_qry['granularity'] = "all" pre_qry['limit_spec'] = { "type": "default", - "limit": timeseries_limit, + "limit": min(timeseries_limit, row_limit), 'intervals': ( inner_from_dttm.isoformat() + '/' + inner_to_dttm.isoformat()), @@ -927,29 +977,10 @@ def run_query( # noqa / druid query_str += ( "//\nPhase 2 (built based on phase one's results)\n") df = client.export_pandas() - if df is not None and not df.empty: - dims = qry['dimensions'] - filters = [] - for unused, row in df.iterrows(): - fields = [] - for dim in dims: - f = Dimension(dim) == row[dim] - fields.append(f) - if len(fields) > 1: - filt = Filter(type="and", fields=fields) - filters.append(filt) - elif fields: - filters.append(fields[0]) - - if filters: - ff = Filter(type="or", fields=filters) - if not orig_filters: - qry['filter'] = ff - else: - qry['filter'] = Filter(type="and", fields=[ - ff, - orig_filters]) - qry['limit_spec'] = None + qry['filter'] = self._add_filter_from_pre_query_data( + df, + qry['dimensions'], filters) + qry['limit_spec'] = None if row_limit: qry['limit_spec'] = { "type": "default", @@ -1111,5 +1142,6 @@ def query_datasources_by_name( .all() ) + sa.event.listen(DruidDatasource, 'after_insert', set_perm) sa.event.listen(DruidDatasource, 'after_update', set_perm)