From 03be3cfce1e0b56e8d57d967ec2ffaef2f7c38b7 Mon Sep 17 00:00:00 2001 From: Brian Wolfe Date: Sun, 16 Jul 2017 14:42:42 -0700 Subject: [PATCH 1/3] [druid] Allow custom druid postaggregators Also, fix the postaggregation for approxHistogram quantiles so it adds the dependent field and that can show up in the graphs/tables. In general, postAggregators add significant power, we should probably support including custom postAggregators. Plywood has standard postAggregators here, and a customAggregator escape hatch that allows you to define custom postAggregators. This commit adds a similar capability for Superset and a additional field/fields/fieldName breakdown of the typical naming for dependent aggregations, which should make it significantly easier to develop approxHistogram and custom postAggregation-required dashboards. --- superset/connectors/druid/models.py | 101 +++++++++++++++++----------- tests/druid_tests.py | 71 ++++++++++++++++++- 2 files changed, 129 insertions(+), 43 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index 69f10c75f596b..495a20fd0a51d 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -718,51 +718,29 @@ def values_for_column(self, def get_query_str(self, query_obj, phase=1, client=None): return self.run_query(client=client, phase=phase, **query_obj) - def run_query( # noqa / druid - self, - groupby, metrics, - granularity, - from_dttm, to_dttm, - filter=None, # noqa - is_timeseries=True, - timeseries_limit=None, - timeseries_limit_metric=None, - row_limit=None, - inner_from_dttm=None, inner_to_dttm=None, - orderby=None, - extras=None, # noqa - select=None, # noqa - columns=None, phase=2, client=None, form_data=None): - """Runs a query against Druid and returns a dataframe. - """ - # TODO refactor into using a TBD Query object - client = client or self.cluster.get_pydruid_client() - if not is_timeseries: - granularity = 'all' - inner_from_dttm = inner_from_dttm or from_dttm - inner_to_dttm = inner_to_dttm or to_dttm - - # add tzinfo to native datetime with config - from_dttm = from_dttm.replace(tzinfo=DRUID_TZ) - to_dttm = to_dttm.replace(tzinfo=DRUID_TZ) - timezone = from_dttm.tzname() - - query_str = "" - metrics_dict = {m.metric_name: m for m in self.metrics} + @staticmethod + def _metrics_and_post_aggs(metrics, metrics_dict): all_metrics = [] post_aggs = {} - columns_dict = {c.column_name: c for c in self.columns} - def recursive_get_fields(_conf): - _fields = _conf.get('fields', []) + print(_conf) + _type = _conf.get('type') + _field = _conf.get('field', None) + _fields = _conf.get('fields', None) + field_names = [] - for _f in _fields: - _type = _f.get('type') - if _type in ['fieldAccess', 'hyperUniqueCardinality']: - field_names.append(_f.get('fieldName')) - elif _type == 'arithmetic': + if _type in ['fieldAccess', 'hyperUniqueCardinality', + 'quantile', 'quantiles']: + field_names.append(_conf.get('fieldName', '')) + + if _field is not None: + field_names += recursive_get_fields(_field) + + if _fields is not None: + for _f in _fields: field_names += recursive_get_fields(_f) + return list(set(field_names)) for metric_name in metrics: @@ -799,11 +777,54 @@ def recursive_get_fields(_conf): post_aggs[metric_name] = HyperUniqueCardinality( mconf.get('name') ) - else: + elif mconf.get('type') == 'arithmetic': post_aggs[metric_name] = Postaggregator( mconf.get('fn', "/"), mconf.get('fields', []), mconf.get('name', '')) + else: + post_aggs[metric_name] = Postaggregator( + None, + None, + mconf.get('name', '')) + post_aggs[metric_name].post_aggregator = mconf + return all_metrics, post_aggs + + def run_query( # noqa / druid + self, + groupby, metrics, + granularity, + from_dttm, to_dttm, + filter=None, # noqa + is_timeseries=True, + timeseries_limit=None, + timeseries_limit_metric=None, + row_limit=None, + inner_from_dttm=None, inner_to_dttm=None, + orderby=None, + extras=None, # noqa + select=None, # noqa + columns=None, phase=2, client=None, form_data=None): + """Runs a query against Druid and returns a dataframe. + """ + # TODO refactor into using a TBD Query object + client = client or self.cluster.get_pydruid_client() + if not is_timeseries: + granularity = 'all' + inner_from_dttm = inner_from_dttm or from_dttm + inner_to_dttm = inner_to_dttm or to_dttm + + # add tzinfo to native datetime with config + from_dttm = from_dttm.replace(tzinfo=DRUID_TZ) + to_dttm = to_dttm.replace(tzinfo=DRUID_TZ) + timezone = from_dttm.tzname() + + query_str = "" + metrics_dict = {m.metric_name: m for m in self.metrics} + + columns_dict = {c.column_name: c for c in self.columns} + + all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict) aggregations = OrderedDict() for m in self.metrics: diff --git a/tests/druid_tests.py b/tests/druid_tests.py index d7b93dee0638e..220abb78613a5 100644 --- a/tests/druid_tests.py +++ b/tests/druid_tests.py @@ -11,8 +11,8 @@ from mock import Mock, patch from superset import db, sm, security -from superset.connectors.druid.models import DruidCluster, DruidDatasource -from superset.connectors.druid.models import PyDruid +from superset.connectors.druid.models import DruidMetric, DruidCluster, DruidDatasource +from superset.connectors.druid.models import PyDruid, Quantile, Postaggregator from .base_tests import SupersetTestCase @@ -38,7 +38,7 @@ "metric1": { "type": "longSum", "name": "metric1", - "fieldName": "metric1"} + "fieldName": "metric1"}, }, "size": 300000, "numRows": 5000000 @@ -318,6 +318,71 @@ def test_sync_druid_perm(self, PyDruid): permission=permission, view_menu=view_menu).first() assert pv is not None + def test_metrics_and_post_aggs(self): + metrics_dict = { + 'unused_count': DruidMetric( + metric_name='unused_count', + verbose_name='COUNT(*)', + metric_type='count', + json=json.dumps({'type': 'count', 'name': 'unused_count'})), + 'some_sum': DruidMetric( + metric_name='some_sum', + verbose_name='SUM(*)', + metric_type='sum', + json=json.dumps({'type': 'sum', 'name': 'sum'})), + 'a_histogram': DruidMetric( + metric_name='a_histogram', + verbose_name='APPROXIMATE_HISTOGRAM(*)', + metric_type='approxHistogramFold', + json=json.dumps({'type': 'approxHistogramFold', 'name': 'a_histogram'})), + 'aCustomMetric': DruidMetric( + metric_name='aCustomMetric', + verbose_name='MY_AWESOME_METRIC(*)', + metric_type='aCustomType', + json=json.dumps({'type': 'customMetric', 'name': 'aCustomMetric'})), + 'quantile_p95': DruidMetric( + metric_name='quantile_p95', + verbose_name='P95(*)', + metric_type='postagg', + json=json.dumps({ + 'type': 'quantile', + 'probability': 0.95, + 'name': 'p95', + 'fieldName': 'a_histogram'})), + 'aCustomPostAgg': DruidMetric( + metric_name='aCustomPostAgg', + verbose_name='CUSTOM_POST_AGG(*)', + metric_type='postagg', + json=json.dumps({ + 'type': 'customPostAgg', + 'name': 'aCustomPostAgg', + 'field': { + 'type': 'fieldAccess', + 'fieldName': 'aCustomMetric'}})), + } + + metrics = ['some_sum'] + all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs( + metrics, metrics_dict) + + assert all_metrics == ['some_sum'] + assert post_aggs == {} + + metrics = ['quantile_p95'] + all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs( + metrics, metrics_dict) + + result_postaggs = set(['quantile_p95']) + assert all_metrics == ['a_histogram'] + assert set(post_aggs.keys()) == result_postaggs + + metrics = ['aCustomPostAgg'] + all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs( + metrics, metrics_dict) + + result_postaggs = set(['aCustomPostAgg']) + assert all_metrics == ['aCustomMetric'] + assert set(post_aggs.keys()) == result_postaggs if __name__ == '__main__': From 96cfbfe74559346a5fcffb275c23a8e248c138d5 Mon Sep 17 00:00:00 2001 From: Brian Wolfe Date: Tue, 18 Jul 2017 11:40:29 -0700 Subject: [PATCH 2/3] [druid] Minor style cleanup in tests file. --- tests/druid_tests.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/druid_tests.py b/tests/druid_tests.py index 220abb78613a5..637afe984ce02 100644 --- a/tests/druid_tests.py +++ b/tests/druid_tests.py @@ -319,6 +319,12 @@ def test_sync_druid_perm(self, PyDruid): assert pv is not None def test_metrics_and_post_aggs(self): + """ + Test generation of metrics and post-aggregations from an initial list + of superset metrics (which may include the results of either). This + primarily tests that specifying a post-aggregator metric will also + require the raw aggregation of the associated druid metric column. + """ metrics_dict = { 'unused_count': DruidMetric( metric_name='unused_count', @@ -363,14 +369,14 @@ def test_metrics_and_post_aggs(self): metrics = ['some_sum'] all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs( - metrics, metrics_dict) + metrics, metrics_dict) assert all_metrics == ['some_sum'] assert post_aggs == {} metrics = ['quantile_p95'] all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs( - metrics, metrics_dict) + metrics, metrics_dict) result_postaggs = set(['quantile_p95']) assert all_metrics == ['a_histogram'] @@ -378,7 +384,7 @@ def test_metrics_and_post_aggs(self): metrics = ['aCustomPostAgg'] all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs( - metrics, metrics_dict) + metrics, metrics_dict) result_postaggs = set(['aCustomPostAgg']) assert all_metrics == ['aCustomMetric'] From 9a52e9831c49b19061406381b43c8e7c32a23272 Mon Sep 17 00:00:00 2001 From: Brian Wolfe Date: Thu, 27 Jul 2017 21:29:39 -0700 Subject: [PATCH 3/3] [druid] Apply code review suggestions * break out CustomPostAggregator into separate class. This just cleans up the creation of the postaggregator a little bit. * minor style issues. * move the function around so the git diff is more readable --- superset/connectors/druid/models.py | 80 +++++++++++++++-------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index 495a20fd0a51d..cc85a92b7fae4 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -50,6 +50,13 @@ def __init__(self, name, field_names, function): self.name = name +class CustomPostAggregator(Postaggregator): + """A way to allow users to specify completely custom PostAggregators""" + def __init__(self, name, post_aggregator): + self.name = name + self.post_aggregator = post_aggregator + + class DruidCluster(Model, AuditMixinNullable): """ORM object referencing the Druid clusters""" @@ -690,54 +697,25 @@ def granularity(period_name, timezone=None, origin=None): period_name).total_seconds() * 1000 return granularity - def values_for_column(self, - column_name, - limit=10000): - """Retrieve some values for the given column""" - # TODO: Use Lexicographic TopNMetricSpec once supported by PyDruid - if self.fetch_values_from: - from_dttm = utils.parse_human_datetime(self.fetch_values_from) - else: - from_dttm = datetime(1970, 1, 1) - - qry = dict( - datasource=self.datasource_name, - granularity="all", - intervals=from_dttm.isoformat() + '/' + datetime.now().isoformat(), - aggregations=dict(count=count("count")), - dimension=column_name, - metric="count", - threshold=limit, - ) - - client = self.cluster.get_pydruid_client() - client.topn(**qry) - df = client.export_pandas() - return [row[column_name] for row in df.to_records(index=False)] - - def get_query_str(self, query_obj, phase=1, client=None): - return self.run_query(client=client, phase=phase, **query_obj) - @staticmethod def _metrics_and_post_aggs(metrics, metrics_dict): all_metrics = [] post_aggs = {} def recursive_get_fields(_conf): - print(_conf) _type = _conf.get('type') - _field = _conf.get('field', None) - _fields = _conf.get('fields', None) + _field = _conf.get('field') + _fields = _conf.get('fields') field_names = [] if _type in ['fieldAccess', 'hyperUniqueCardinality', 'quantile', 'quantiles']: field_names.append(_conf.get('fieldName', '')) - if _field is not None: + if _field: field_names += recursive_get_fields(_field) - if _fields is not None: + if _fields: for _f in _fields: field_names += recursive_get_fields(_f) @@ -783,13 +761,39 @@ def recursive_get_fields(_conf): mconf.get('fields', []), mconf.get('name', '')) else: - post_aggs[metric_name] = Postaggregator( - None, - None, - mconf.get('name', '')) - post_aggs[metric_name].post_aggregator = mconf + post_aggs[metric_name] = CustomPostAggregator( + mconf.get('name', ''), + mconf) return all_metrics, post_aggs + def values_for_column(self, + column_name, + limit=10000): + """Retrieve some values for the given column""" + # TODO: Use Lexicographic TopNMetricSpec once supported by PyDruid + if self.fetch_values_from: + from_dttm = utils.parse_human_datetime(self.fetch_values_from) + else: + from_dttm = datetime(1970, 1, 1) + + qry = dict( + datasource=self.datasource_name, + granularity="all", + intervals=from_dttm.isoformat() + '/' + datetime.now().isoformat(), + aggregations=dict(count=count("count")), + dimension=column_name, + metric="count", + threshold=limit, + ) + + client = self.cluster.get_pydruid_client() + client.topn(**qry) + df = client.export_pandas() + return [row[column_name] for row in df.to_records(index=False)] + + def get_query_str(self, query_obj, phase=1, client=None): + return self.run_query(client=client, phase=phase, **query_obj) + def run_query( # noqa / druid self, groupby, metrics,