Skip to content

Commit

Permalink
Druid support via SQLAlchemy (apache#4163)
Browse files Browse the repository at this point in the history
* Use druiddb

* Remove auto formatting

* Show prequeries

* Fix subtle bug with lists

* Move arguments to query object

* Fix druid run_query
  • Loading branch information
betodealmeida authored and mistercrunch committed Jan 5, 2018
1 parent 065dcc1 commit 5ebab98
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 30 deletions.
5 changes: 4 additions & 1 deletion superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,10 @@ def run_query( # noqa / druid
orderby=None,
extras=None, # noqa
columns=None, phase=2, client=None, form_data=None,
order_desc=True):
order_desc=True,
prequeries=None,
is_prequery=False,
):
"""Runs a query against Druid and returns a dataframe.
"""
# TODO refactor into using a TBD Query object
Expand Down
105 changes: 76 additions & 29 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ def get_query_str(self, query_obj):
)
logging.info(sql)
sql = sqlparse.format(sql, reindent=True)
if query_obj['is_prequery']:
query_obj['prequeries'].append(sql)
return sql

def get_sqla_table(self):
Expand Down Expand Up @@ -405,7 +407,10 @@ def get_sqla_query( # sqla
extras=None,
columns=None,
form_data=None,
order_desc=True):
order_desc=True,
prequeries=None,
is_prequery=False,
):
"""Querying any sqla table from this common interface"""
template_kwargs = {
'from_dttm': from_dttm,
Expand Down Expand Up @@ -564,37 +569,73 @@ def get_sqla_query( # sqla

if is_timeseries and \
timeseries_limit and groupby and not time_groupby_inline:
# some sql dialects require for order by expressions
# to also be in the select clause -- others, e.g. vertica,
# require a unique inner alias
inner_main_metric_expr = main_metric_expr.label('mme_inner__')
inner_select_exprs += [inner_main_metric_expr]
subq = select(inner_select_exprs)
subq = subq.select_from(tbl)
inner_time_filter = dttm_col.get_time_filter(
inner_from_dttm or from_dttm,
inner_to_dttm or to_dttm,
)
subq = subq.where(and_(*(where_clause_and + [inner_time_filter])))
subq = subq.group_by(*inner_groupby_exprs)

ob = inner_main_metric_expr
if timeseries_limit_metric:
timeseries_limit_metric = metrics_dict.get(timeseries_limit_metric)
ob = timeseries_limit_metric.sqla_col
direction = desc if order_desc else asc
subq = subq.order_by(direction(ob))
subq = subq.limit(timeseries_limit)

on_clause = []
for i, gb in enumerate(groupby):
on_clause.append(
groupby_exprs[i] == column(gb + '__'))

tbl = tbl.join(subq.alias(), and_(*on_clause))
if self.database.db_engine_spec.inner_joins:
# some sql dialects require for order by expressions
# to also be in the select clause -- others, e.g. vertica,
# require a unique inner alias
inner_main_metric_expr = main_metric_expr.label('mme_inner__')
inner_select_exprs += [inner_main_metric_expr]
subq = select(inner_select_exprs)
subq = subq.select_from(tbl)
inner_time_filter = dttm_col.get_time_filter(
inner_from_dttm or from_dttm,
inner_to_dttm or to_dttm,
)
subq = subq.where(and_(*(where_clause_and + [inner_time_filter])))
subq = subq.group_by(*inner_groupby_exprs)

ob = inner_main_metric_expr
if timeseries_limit_metric:
timeseries_limit_metric = metrics_dict.get(timeseries_limit_metric)
ob = timeseries_limit_metric.sqla_col
direction = desc if order_desc else asc
subq = subq.order_by(direction(ob))
subq = subq.limit(timeseries_limit)

on_clause = []
for i, gb in enumerate(groupby):
on_clause.append(
groupby_exprs[i] == column(gb + '__'))

tbl = tbl.join(subq.alias(), and_(*on_clause))
else:
# run subquery to get top groups
subquery_obj = {
'prequeries': prequeries,
'is_prequery': True,
'is_timeseries': False,
'row_limit': timeseries_limit,
'groupby': groupby,
'metrics': metrics,
'granularity': granularity,
'from_dttm': inner_from_dttm or from_dttm,
'to_dttm': inner_to_dttm or to_dttm,
'filter': filter,
'orderby': orderby,
'extras': extras,
'columns': columns,
'form_data': form_data,
'order_desc': True,
}
result = self.query(subquery_obj)
dimensions = [c for c in result.df.columns if c not in metrics]
top_groups = self._get_top_groups(result.df, dimensions)
qry = qry.where(top_groups)

return qry.select_from(tbl)

def _get_top_groups(self, df, dimensions):
cols = {col.column_name: col for col in self.columns}
groups = []
for unused, row in df.iterrows():
group = []
for dimension in dimensions:
col_obj = cols.get(dimension)
group.append(col_obj.sqla_col == row[dimension])
groups.append(and_(*group))

return or_(*groups)

def query(self, query_obj):
qry_start_dttm = datetime.now()
sql = self.get_query_str(query_obj)
Expand All @@ -609,6 +650,12 @@ def query(self, query_obj):
error_message = (
self.database.db_engine_spec.extract_error_message(e))

# if this is a main query with prequeries, combine them together
if not query_obj['is_prequery']:
query_obj['prequeries'].append(sql)
sql = ';\n\n'.join(query_obj['prequeries'])
sql += ';'

return QueryResult(
status=status,
df=df,
Expand Down
2 changes: 2 additions & 0 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class BaseEngineSpec(object):
time_groupby_inline = False
limit_method = LimitMethod.FETCH_MANY
time_secondary_columns = False
inner_joins = True

@classmethod
def fetch_data(cls, cursor, limit):
Expand Down Expand Up @@ -1229,6 +1230,7 @@ class DruidEngineSpec(BaseEngineSpec):
"""Engine spec for Druid.io"""
engine = 'druid'
limit_method = LimitMethod.FETCH_MANY
inner_joins = False


engines = {
Expand Down
6 changes: 6 additions & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,12 @@ def get_query_string_response(self, viz_obj):
query = viz_obj.datasource.get_query_str(query_obj)
except Exception as e:
return json_error_response(e)

if query_obj['prequeries']:
query_obj['prequeries'].append(query)
query = ';\n\n'.join(query_obj['prequeries'])
query += ';'

return Response(
json.dumps({
'query': query,
Expand Down
2 changes: 2 additions & 0 deletions superset/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ def query_obj(self):
'timeseries_limit_metric': timeseries_limit_metric,
'form_data': form_data,
'order_desc': order_desc,
'prequeries': [],
'is_prequery': False,
}
return d

Expand Down

0 comments on commit 5ebab98

Please sign in to comment.