Skip to content

Commit

Permalink
WIP - debug issues with derived offset metric filters
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Nov 14, 2024
1 parent df48dd8 commit 87076d9
Show file tree
Hide file tree
Showing 4 changed files with 512 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -700,24 +700,27 @@ metric:
alias: bookings_2_weeks_ago
---
metric:
name: "bookings_offset_once"
name: bookings_offset_once
description: bookings metric offset once.
type: derived
type_params:
expr: 2 * bookings
metrics:
- name: bookings
offset_window: 5 days
filter: "{{ Entity('listing') }} IS NOT NULL"
---
metric:
name: "bookings_offset_twice"
# TODO: rename this metric in a later commit
name: bookings_offset_twice
description: bookings metric offset twice.
type: derived
type_params:
expr: 2 * bookings_offset_once
metrics:
- name: bookings_offset_once
offset_window: 2 days
filter: "{{ Dimension('booking__is_instant') }}"
---
metric:
name: bookings_at_start_of_month
Expand Down
70 changes: 42 additions & 28 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,52 +584,66 @@ def _build_derived_metric_output_node(

parent_nodes: List[DataflowPlanNode] = []

# This is the filter that's defined for the metric in the configs.
metric_definition_filter_specs = filter_spec_factory.create_from_where_filter_intersection(
# These are the filters defined on the outer metric (the derived metric).
# how did we get the specs in here in the first place?
outer_metric_definition_filter_specs = filter_spec_factory.create_from_where_filter_intersection(
filter_location=WhereFilterLocation.for_metric(metric_spec.reference),
filter_intersection=metric.filter,
)

outer_metric_filter_spec_set = WhereFilterSpecSet(
metric_level_filter_specs=tuple(outer_metric_definition_filter_specs),
)
for metric_input_spec in metric_input_specs:
where_filter_spec_set = WhereFilterSpecSet(
metric_level_filter_specs=tuple(metric_definition_filter_specs),
)

# These are the filters that's defined as part of the input metric.
where_filter_spec_set = where_filter_spec_set.merge(metric_input_spec.filter_spec_set)
# Add the filters defined on each input metric.
input_metric_where_filter_spec_set = outer_metric_filter_spec_set.merge(metric_input_spec.filter_spec_set)

# If metric is offset, we'll apply where constraint after offset to avoid removing values
# unexpectedly. Time constraint will be applied by INNER JOINing to time spine.
# We may consider encapsulating this in pushdown state later, but as of this moment pushdown
# is about post-join to pre-join for dimension access, and relies on the builder to collect
# predicates from query and metric specs and make them available at measure level.
if not metric_spec.has_time_offset:
where_filter_spec_set = where_filter_spec_set.merge(metric_spec.filter_spec_set)
input_metric_where_filter_spec_set = input_metric_where_filter_spec_set.merge(
metric_spec.filter_spec_set
)
metric_pushdown_state = (
predicate_pushdown_state
if not metric_spec.has_time_offset
else PredicatePushdownState.with_pushdown_disabled()
)

parent_nodes.append(
self._build_any_metric_output_node(
BuildAnyMetricOutputNodeParameterSet(
metric_spec=MetricSpec(
element_name=metric_input_spec.element_name,
filter_spec_set=where_filter_spec_set,
alias=metric_input_spec.alias,
offset_window=metric_input_spec.offset_window,
offset_to_grain=metric_input_spec.offset_to_grain,
),
queried_linkable_specs=(
queried_linkable_specs if not metric_spec.has_time_offset else required_linkable_specs
),
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=metric_pushdown_state,
for_group_by_source_node=False,
)
# Need to include any outer metric filters when building the input metric, EXCEPT metric_time filters which should be saved for the outer metric
# How to apply those metric_time filters to the outer metric? Will metric_time be available at the top-level? Do we require it for offset metrics?
# I think so. but why couldn't we do that for join_to_timespine metrics? I think there was a reason. Go back to read that logic. Likely will copy it.
# Ok so separate metric_time from other filters (for the outer metric filters and query-level filters). I think we might need to take those filters
# and apply them to the offset column. Is that possible? Will it get messed up if there are custom grains? A lot to work out here!!!
input_metric_parent_node = self._build_any_metric_output_node(
BuildAnyMetricOutputNodeParameterSet(
metric_spec=MetricSpec(
element_name=metric_input_spec.element_name,
filter_spec_set=input_metric_where_filter_spec_set,
alias=metric_input_spec.alias,
offset_window=metric_input_spec.offset_window,
offset_to_grain=metric_input_spec.offset_to_grain,
),
queried_linkable_specs=(
# If there is an offset, we need to include filter specs when we aggregating the input metric
# so that they will be available to the outer metric.
# Is that right?? That seems wrong! This might be a bug!
# Write a test for this case. what is it?
# A derived metric with tiered filters of different specs. One on the input metric, one on the outer metric.
# That's the simple case. Then write another test with approximately the same metric, but with an offset on that input metric.
# Do the specs get filtered appropriately at the input metric level before aggregation? If not, is it possible to fix that?
queried_linkable_specs
# Logic below was wrong!
# if not metric_spec.has_time_offset
# else required_linkable_specs
),
filter_spec_factory=filter_spec_factory,
predicate_pushdown_state=metric_pushdown_state,
for_group_by_source_node=False,
)
)
parent_nodes.append(input_metric_parent_node)

parent_node = (
parent_nodes[0]
Expand Down
Loading

0 comments on commit 87076d9

Please sign in to comment.