Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[druid] Allow custom druid postaggregators #3146

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 61 additions & 40 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,51 +718,29 @@ def values_for_column(self,
def get_query_str(self, query_obj, phase=1, client=None):
return self.run_query(client=client, phase=phase, **query_obj)

def run_query( # noqa / druid
self,
groupby, metrics,
granularity,
from_dttm, to_dttm,
filter=None, # noqa
is_timeseries=True,
timeseries_limit=None,
timeseries_limit_metric=None,
row_limit=None,
inner_from_dttm=None, inner_to_dttm=None,
orderby=None,
extras=None, # noqa
select=None, # noqa
columns=None, phase=2, client=None, form_data=None):
"""Runs a query against Druid and returns a dataframe.
"""
# TODO refactor into using a TBD Query object
client = client or self.cluster.get_pydruid_client()
if not is_timeseries:
granularity = 'all'
inner_from_dttm = inner_from_dttm or from_dttm
inner_to_dttm = inner_to_dttm or to_dttm

# add tzinfo to native datetime with config
from_dttm = from_dttm.replace(tzinfo=DRUID_TZ)
to_dttm = to_dttm.replace(tzinfo=DRUID_TZ)
timezone = from_dttm.tzname()

query_str = ""
metrics_dict = {m.metric_name: m for m in self.metrics}
@staticmethod
def _metrics_and_post_aggs(metrics, metrics_dict):
all_metrics = []
post_aggs = {}

columns_dict = {c.column_name: c for c in self.columns}

def recursive_get_fields(_conf):
_fields = _conf.get('fields', [])
print(_conf)
_type = _conf.get('type')
_field = _conf.get('field', None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

, None is implicit, that's the default behavior

_fields = _conf.get('fields', None)

field_names = []
for _f in _fields:
_type = _f.get('type')
if _type in ['fieldAccess', 'hyperUniqueCardinality']:
field_names.append(_f.get('fieldName'))
elif _type == 'arithmetic':
if _type in ['fieldAccess', 'hyperUniqueCardinality',
'quantile', 'quantiles']:
field_names.append(_conf.get('fieldName', ''))

if _field is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if _field: is the pythonesque convention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose. I prefer to be explicit when I care about None vs. when I want to reject all falsy values. https://docs.python.org/2.4/lib/truth.html

I'll change the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field_names += recursive_get_fields(_field)

if _fields is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the same if block as the one right above this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_field vs _fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mostly a matter of supporting a newer and older postAggregators that have different customs for storing the dependent fields.

for _f in _fields:
field_names += recursive_get_fields(_f)

return list(set(field_names))

for metric_name in metrics:
Expand Down Expand Up @@ -799,11 +777,54 @@ def recursive_get_fields(_conf):
post_aggs[metric_name] = HyperUniqueCardinality(
mconf.get('name')
)
else:
elif mconf.get('type') == 'arithmetic':
post_aggs[metric_name] = Postaggregator(
mconf.get('fn', "/"),
mconf.get('fields', []),
mconf.get('name', ''))
else:
post_aggs[metric_name] = Postaggregator(
None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would help readability to use keyword args here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by making a CustomPostAggregator class that encompasses this logic. Should make more sense that way.

None,
mconf.get('name', ''))
post_aggs[metric_name].post_aggregator = mconf
return all_metrics, post_aggs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm never a big fan of returning tuples but I think it's ok in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, for an internal api, I feel like it's kinda defensible.


def run_query( # noqa / druid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why git is getting the diffing all wrong here, things are not lining up. Makes it hard to see what has changed in this section. Maybe moving _metrics_and_post_aggs higher / lower in the class definition might help git diffing it right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved up to other static methods.

self,
groupby, metrics,
granularity,
from_dttm, to_dttm,
filter=None, # noqa
is_timeseries=True,
timeseries_limit=None,
timeseries_limit_metric=None,
row_limit=None,
inner_from_dttm=None, inner_to_dttm=None,
orderby=None,
extras=None, # noqa
select=None, # noqa
columns=None, phase=2, client=None, form_data=None):
"""Runs a query against Druid and returns a dataframe.
"""
# TODO refactor into using a TBD Query object
client = client or self.cluster.get_pydruid_client()
if not is_timeseries:
granularity = 'all'
inner_from_dttm = inner_from_dttm or from_dttm
inner_to_dttm = inner_to_dttm or to_dttm

# add tzinfo to native datetime with config
from_dttm = from_dttm.replace(tzinfo=DRUID_TZ)
to_dttm = to_dttm.replace(tzinfo=DRUID_TZ)
timezone = from_dttm.tzname()

query_str = ""
metrics_dict = {m.metric_name: m for m in self.metrics}

columns_dict = {c.column_name: c for c in self.columns}

all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict)

aggregations = OrderedDict()
for m in self.metrics:
Expand Down
77 changes: 74 additions & 3 deletions tests/druid_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from mock import Mock, patch

from superset import db, sm, security
from superset.connectors.druid.models import DruidCluster, DruidDatasource
from superset.connectors.druid.models import PyDruid
from superset.connectors.druid.models import DruidMetric, DruidCluster, DruidDatasource
from superset.connectors.druid.models import PyDruid, Quantile, Postaggregator

from .base_tests import SupersetTestCase

Expand All @@ -38,7 +38,7 @@
"metric1": {
"type": "longSum",
"name": "metric1",
"fieldName": "metric1"}
"fieldName": "metric1"},
},
"size": 300000,
"numRows": 5000000
Expand Down Expand Up @@ -318,6 +318,77 @@ def test_sync_druid_perm(self, PyDruid):
permission=permission, view_menu=view_menu).first()
assert pv is not None

def test_metrics_and_post_aggs(self):
"""
Test generation of metrics and post-aggregations from an initial list
of superset metrics (which may include the results of either). This
primarily tests that specifying a post-aggregator metric will also
require the raw aggregation of the associated druid metric column.
"""
metrics_dict = {
'unused_count': DruidMetric(
metric_name='unused_count',
verbose_name='COUNT(*)',
metric_type='count',
json=json.dumps({'type': 'count', 'name': 'unused_count'})),
'some_sum': DruidMetric(
metric_name='some_sum',
verbose_name='SUM(*)',
metric_type='sum',
json=json.dumps({'type': 'sum', 'name': 'sum'})),
'a_histogram': DruidMetric(
metric_name='a_histogram',
verbose_name='APPROXIMATE_HISTOGRAM(*)',
metric_type='approxHistogramFold',
json=json.dumps({'type': 'approxHistogramFold', 'name': 'a_histogram'})),
'aCustomMetric': DruidMetric(
metric_name='aCustomMetric',
verbose_name='MY_AWESOME_METRIC(*)',
metric_type='aCustomType',
json=json.dumps({'type': 'customMetric', 'name': 'aCustomMetric'})),
'quantile_p95': DruidMetric(
metric_name='quantile_p95',
verbose_name='P95(*)',
metric_type='postagg',
json=json.dumps({
'type': 'quantile',
'probability': 0.95,
'name': 'p95',
'fieldName': 'a_histogram'})),
'aCustomPostAgg': DruidMetric(
metric_name='aCustomPostAgg',
verbose_name='CUSTOM_POST_AGG(*)',
metric_type='postagg',
json=json.dumps({
'type': 'customPostAgg',
'name': 'aCustomPostAgg',
'field': {
'type': 'fieldAccess',
'fieldName': 'aCustomMetric'}})),
}

metrics = ['some_sum']
all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs(
metrics, metrics_dict)

assert all_metrics == ['some_sum']
assert post_aggs == {}

metrics = ['quantile_p95']
all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs(
metrics, metrics_dict)

result_postaggs = set(['quantile_p95'])
assert all_metrics == ['a_histogram']
assert set(post_aggs.keys()) == result_postaggs

metrics = ['aCustomPostAgg']
all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs(
metrics, metrics_dict)

result_postaggs = set(['aCustomPostAgg'])
assert all_metrics == ['aCustomMetric']
assert set(post_aggs.keys()) == result_postaggs


if __name__ == '__main__':
Expand Down