Skip to content

Commit

Permalink
Allowing for druid post aggregations (#418)
Browse files Browse the repository at this point in the history
* Druid post aggregations

* Fixing tests
  • Loading branch information
mistercrunch committed May 2, 2016
1 parent 0ca3f5e commit 26d2736
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 20 deletions.
55 changes: 44 additions & 11 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pydruid.client import PyDruid
from flask.ext.appbuilder.models.decorators import renders
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.postaggregator import Postaggregator
from six import string_types
from sqlalchemy import (
Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date,
Expand All @@ -44,6 +45,17 @@
QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration'])


class JavascriptPostAggregator(Postaggregator):
def __init__(self, name, field_names, function):
self.post_aggregator = {
'type': 'javascript',
'fieldNames': field_names,
'name': name,
'function': function,
}
self.name = name


class AuditMixinNullable(AuditMixin):

"""Altering the AuditMixin to use nullable fields
Expand Down Expand Up @@ -319,6 +331,10 @@ def filterable_column_names(self):
def dttm_cols(self):
return []

@property
def url(self):
return '/{}/edit/{}'.format(self.baselink, self.id)


class Database(Model, AuditMixinNullable):

Expand Down Expand Up @@ -467,10 +483,6 @@ def __repr__(self):
def description_markeddown(self):
return utils.markdown(self.description)

@property
def url(self):
return '/tablemodelview/edit/{}'.format(self.id)

@property
def link(self):
return '<a href="{self.url}">{self.table_name}</a>'.format(**locals())
Expand Down Expand Up @@ -896,7 +908,7 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):

type = "druid"

baselink = "datasourcemodelview"
baselink = "druiddatasourcemodelview"

__tablename__ = 'datasources'
id = Column(Integer, primary_key=True)
Expand Down Expand Up @@ -930,10 +942,6 @@ def perm(self):
"[{obj.cluster_name}].[{obj.datasource_name}]"
"(id:{obj.id})").format(obj=self)

@property
def url(self):
return '/datasourcemodelview/edit/{}'.format(self.id)

@property
def link(self):
return (
Expand Down Expand Up @@ -1047,9 +1055,34 @@ def query( # druid
to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ"))

query_str = ""
metrics_dict = {m.metric_name: m for m in self.metrics}
all_metrics = []
post_aggs = {}
for metric_name in metrics:
metric = metrics_dict[metric_name]
if metric.metric_type != 'postagg':
all_metrics.append(metric_name)
else:
conf = metric.json_obj
fields = conf.get('fields', [])
all_metrics += [
f.get('fieldName') for f in fields
if f.get('type') == 'fieldAccess']
all_metrics += conf.get('fieldNames', [])
if conf.get('type') == 'javascript':
post_aggs[metric_name] = JavascriptPostAggregator(
name=conf.get('name'),
field_names=conf.get('fieldNames'),
function=conf.get('function'))
else:
post_aggs[metric_name] = Postaggregator(
conf.get('fn', "/"),
conf.get('fields', []),
conf.get('name', ''))
aggregations = {
m.metric_name: m.json_obj
for m in self.metrics if m.metric_name in metrics
for m in self.metrics
if m.metric_name in all_metrics
}
granularity = granularity or "all"
if granularity != "all":
Expand All @@ -1067,6 +1100,7 @@ def query( # druid
dimensions=groupby,
aggregations=aggregations,
granularity=granularity,
post_aggregations=post_aggs,
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
filters = None
Expand Down Expand Up @@ -1171,7 +1205,6 @@ def query( # druid
cols += ['timestamp']
cols += [col for col in groupby if col in df.columns]
cols += [col for col in metrics if col in df.columns]
cols += [col for col in df.columns if col not in cols]
df = df[cols]
return QueryResult(
df=df,
Expand Down
19 changes: 11 additions & 8 deletions caravel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,20 @@ class DruidMetricInlineView(CompactCRUDMixin, CaravelModelView): # noqa
datamodel = SQLAInterface(models.DruidMetric)
list_columns = ['metric_name', 'verbose_name', 'metric_type']
edit_columns = [
'metric_name', 'description', 'verbose_name', 'metric_type',
'datasource', 'json']
add_columns = [
'metric_name', 'verbose_name', 'metric_type', 'datasource', 'json']
'metric_name', 'description', 'verbose_name', 'metric_type', 'json',
'datasource']
add_columns = edit_columns
page_size = 500
validators_columns = {
'json': [validate_json],
}
description_columns = {
'metric_type': utils.markdown(
"use `postagg` as the metric type if you are defining a "
"[Druid Post Aggregation]"
"(http://druid.io/docs/latest/querying/post-aggregations.html)",
True),
}
appbuilder.add_view_no_menu(DruidMetricInlineView)


Expand Down Expand Up @@ -390,10 +396,7 @@ class LogModelView(CaravelModelView):
class DruidDatasourceModelView(CaravelModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.DruidDatasource)
list_columns = [
'datasource_link', 'cluster', 'owner',
'creator', 'created_on',
'changed_by_', 'changed_on',
'offset']
'datasource_link', 'cluster', 'changed_by_', 'modified', 'offset']
related_views = [DruidColumnInlineView, DruidMetricInlineView]
edit_columns = [
'datasource_name', 'cluster', 'description', 'owner',
Expand Down
2 changes: 1 addition & 1 deletion tests/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def test_client(self, PyDruid):
df = pd.DataFrame(nres)
instance.export_pandas.return_value = df
instance.query_dict = {}
resp = self.client.get('/caravel/explore/druid/1/?viz_type=table&granularity=one+day&druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&include_search=false&metrics=count&flt_col_0=dim1&flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&action=&datasource_name=test_datasource&datasource_id=1&datasource_type=druid&previous_viz_type=table&json=true&force=true')
resp = self.client.get('/caravel/explore/druid/1/?viz_type=table&granularity=one+day&druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&include_search=false&metrics=count&groupby=name&flt_col_0=dim1&flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&action=&datasource_name=test_datasource&datasource_id=1&datasource_type=druid&previous_viz_type=table&json=true&force=true')
print('-'*300)
print(resp.data.decode('utf-8'))
assert "Canada" in resp.data.decode('utf-8')
Expand Down

0 comments on commit 26d2736

Please sign in to comment.