Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/Fix: times_series limit for Druid #3434

Merged
merged 1 commit into from
Sep 18, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 59 additions & 27 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# pylint: disable=invalid-unary-operand-type
# pylint: disable=invalid-unary-operand-type
from collections import OrderedDict
import json
import logging
Expand Down Expand Up @@ -798,6 +798,28 @@ def values_for_column(self,
def get_query_str(self, query_obj, phase=1, client=None):
return self.run_query(client=client, phase=phase, **query_obj)

def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
ret = dim_filter
if df is not None and not df.empty:
new_filters = []
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
f = Dimension(dim) == row[dim]
fields.append(f)
if len(fields) > 1:
term = Filter(type="and", fields=fields)
new_filters.append(term)
elif fields:
new_filters.append(fields[0])
if new_filters:
ff = Filter(type="or", fields=new_filters)
if not dim_filter:
ret = ff
else:
ret = Filter(type="and", fields=[ff, dim_filter])
return ret

def run_query( # noqa / druid
self,
groupby, metrics,
Expand Down Expand Up @@ -834,7 +856,9 @@ def run_query( # noqa / druid

columns_dict = {c.column_name: c for c in self.columns}

all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict)
all_metrics, post_aggs = self._metrics_and_post_aggs(
metrics,
metrics_dict)

aggregations = OrderedDict()
for m in self.metrics:
Expand Down Expand Up @@ -884,15 +908,41 @@ def run_query( # noqa / druid
if having_filters:
qry['having'] = having_filters
order_direction = "descending" if order_desc else "ascending"
orig_filters = filters
if len(groupby) == 0 and not having_filters:
del qry['dimensions']
client.timeseries(**qry)
if not having_filters and len(groupby) == 1 and order_desc:
dim = list(qry.get('dimensions'))[0]
if timeseries_limit_metric:
order_by = timeseries_limit_metric
else:
order_by = list(qry['aggregations'].keys())[0]
# Limit on the number of timeseries, doing a two-phases query
pre_qry = deepcopy(qry)
pre_qry['granularity'] = "all"
pre_qry['threshold'] = min(row_limit,
timeseries_limit or row_limit)
pre_qry['metric'] = order_by
pre_qry['dimension'] = dim
del pre_qry['dimensions']
client.topn(**pre_qry)
query_str += "// Two phase query\n// Phase 1\n"
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
query_str += "\n"
if phase == 1:
return query_str
query_str += (
"//\nPhase 2 (built based on phase one's results)\n")
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'], filters)
qry['threshold'] = timeseries_limit or 1000
if row_limit and granularity == 'all':
qry['threshold'] = row_limit
qry['dimension'] = list(qry.get('dimensions'))[0]
qry['dimension'] = dim
del qry['dimensions']
qry['metric'] = list(qry['aggregations'].keys())[0]
client.topn(**qry)
Expand All @@ -908,7 +958,7 @@ def run_query( # noqa / druid
pre_qry['granularity'] = "all"
pre_qry['limit_spec'] = {
"type": "default",
"limit": timeseries_limit,
"limit": min(timeseries_limit, row_limit),
'intervals': (
inner_from_dttm.isoformat() + '/' +
inner_to_dttm.isoformat()),
Expand All @@ -927,29 +977,10 @@ def run_query( # noqa / druid
query_str += (
"//\nPhase 2 (built based on phase one's results)\n")
df = client.export_pandas()
if df is not None and not df.empty:
dims = qry['dimensions']
filters = []
for unused, row in df.iterrows():
fields = []
for dim in dims:
f = Dimension(dim) == row[dim]
fields.append(f)
if len(fields) > 1:
filt = Filter(type="and", fields=fields)
filters.append(filt)
elif fields:
filters.append(fields[0])

if filters:
ff = Filter(type="or", fields=filters)
if not orig_filters:
qry['filter'] = ff
else:
qry['filter'] = Filter(type="and", fields=[
ff,
orig_filters])
qry['limit_spec'] = None
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'], filters)
qry['limit_spec'] = None
if row_limit:
qry['limit_spec'] = {
"type": "default",
Expand Down Expand Up @@ -1111,5 +1142,6 @@ def query_datasources_by_name(
.all()
)


sa.event.listen(DruidDatasource, 'after_insert', set_perm)
sa.event.listen(DruidDatasource, 'after_update', set_perm)