Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(experiments): Initial data warehouse Trend support #26356

Merged
merged 33 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
57ca26b
Support for data warehouse experiments, v3
danielbachhuber Nov 22, 2024
3fb6f4e
First pass at incorporating data warehouse tests
danielbachhuber Nov 22, 2024
e4ff1e9
Fix type issues
danielbachhuber Nov 22, 2024
4cd89e4
Add a failing test case for the left join
danielbachhuber Nov 22, 2024
3f9f2de
First pass at lazy joining the events table
danielbachhuber Nov 25, 2024
e4566be
Update UI snapshots for `chromium` (2)
github-actions[bot] Nov 25, 2024
33d0149
Property filters always use the column name
danielbachhuber Nov 25, 2024
dcd47d4
Clean up the subselect
danielbachhuber Nov 25, 2024
31606f1
Try an ASOF LEFT JOIN
danielbachhuber Nov 25, 2024
927a3c2
Update UI snapshots for `chromium` (1)
github-actions[bot] Nov 21, 2024
23cf381
Update UI snapshots for `chromium` (1)
github-actions[bot] Nov 21, 2024
7d7b3e8
Temporarily disable
danielbachhuber Nov 25, 2024
fddffa8
Drop `context`
danielbachhuber Nov 26, 2024
a35d509
Restore `to_printed_hogql()`
danielbachhuber Nov 26, 2024
f7c042e
A note
danielbachhuber Nov 26, 2024
22d7652
Add a select to the table argument
danielbachhuber Nov 26, 2024
b3fbff1
Add a scenario for out of bounds entry
danielbachhuber Nov 26, 2024
2e737af
Merge branch 'master' into experiments/data-warehouse-support-v3
danielbachhuber Nov 26, 2024
15edbc2
Update UI snapshots for `chromium` (2)
github-actions[bot] Nov 26, 2024
0df2168
No longer need the column name
danielbachhuber Nov 26, 2024
bf607d1
Remove more extraneous code
danielbachhuber Nov 26, 2024
dff03ea
Use potentially dynamic `distinct_id` and `timestamp` field names
danielbachhuber Nov 26, 2024
7f273c6
Add a test for an invalid table name
danielbachhuber Nov 26, 2024
79fc1a2
Add a helpful note
danielbachhuber Nov 26, 2024
310b03f
Clearer naming
danielbachhuber Nov 26, 2024
8330c53
Don't apply to exposure queries quite yet
danielbachhuber Nov 26, 2024
3163cb6
Fix type issues
danielbachhuber Nov 26, 2024
27416ac
Remove comments
danielbachhuber Nov 26, 2024
6b3b721
Avoid unnecessary variable definition
danielbachhuber Nov 26, 2024
a07bb2a
Explain why we need a custom database instance
danielbachhuber Nov 26, 2024
f131df9
Make sure we're only evaluating `$feature_flag_called`
danielbachhuber Nov 26, 2024
c6d70fc
Add another exposure that gets ignored
danielbachhuber Nov 26, 2024
e435442
Explain `ASOF LEFT JOIN`
danielbachhuber Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ee/clickhouse/views/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def update(self, instance: Experiment, validated_data: dict, *args: Any, **kwarg
# if (
# not instance.filters.get("events")
# and not instance.filters.get("actions")
# and not instance.filters.get("data_warehouse")
# and validated_data.get("start_date")
# and not validated_data.get("filters")
# ):
Expand Down
8 changes: 7 additions & 1 deletion frontend/src/scenes/experiments/ExperimentView/Goal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ export function MetricDisplayOld({ filters }: { filters?: FilterType }): JSX.Ele

return (
<>
{([...(filters?.events || []), ...(filters?.actions || [])] as ActionFilter[])
{(
[
...(filters?.events || []),
...(filters?.actions || []),
...(filters?.data_warehouse || []),
] as ActionFilter[]
)
.sort((a, b) => (a.order || 0) - (b.order || 0))
.map((event: ActionFilter, idx: number) => (
<div key={idx} className="mb-2">
Expand Down
19 changes: 14 additions & 5 deletions frontend/src/scenes/experiments/experimentLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ export const experimentLogic = kea<experimentLogicType>([
setExperiment: async ({ experiment }) => {
const experimentEntitiesChanged =
(experiment.filters?.events && experiment.filters.events.length > 0) ||
(experiment.filters?.actions && experiment.filters.actions.length > 0)
(experiment.filters?.actions && experiment.filters.actions.length > 0) ||
(experiment.filters?.data_warehouse && experiment.filters.data_warehouse.length > 0)

if (!experiment.filters || Object.keys(experiment.filters).length === 0) {
return
Expand All @@ -668,7 +669,9 @@ export const experimentLogic = kea<experimentLogicType>([

if (name === 'filters') {
const experimentEntitiesChanged =
(value?.events && value.events.length > 0) || (value?.actions && value.actions.length > 0)
(value?.events && value.events.length > 0) ||
(value?.actions && value.actions.length > 0) ||
(value?.data_warehouse && value.data_warehouse.length > 0)

if (!value || Object.keys(value).length === 0) {
return
Expand All @@ -686,7 +689,8 @@ export const experimentLogic = kea<experimentLogicType>([

const experimentEntitiesChanged =
(experiment.filters?.events && experiment.filters.events.length > 0) ||
(experiment.filters?.actions && experiment.filters.actions.length > 0)
(experiment.filters?.actions && experiment.filters.actions.length > 0) ||
(experiment.filters?.data_warehouse && experiment.filters.data_warehouse.length > 0)

if (!experiment.filters || Object.keys(experiment.filters).length === 0) {
return
Expand All @@ -700,7 +704,8 @@ export const experimentLogic = kea<experimentLogicType>([
const experiment = values.experiment
const experimentEntitiesChanged =
(experiment.filters?.events && experiment.filters.events.length > 0) ||
(experiment.filters?.actions && experiment.filters.actions.length > 0)
(experiment.filters?.actions && experiment.filters.actions.length > 0) ||
(experiment.filters?.data_warehouse && experiment.filters.data_warehouse.length > 0)

if (!experiment.filters || Object.keys(experiment.filters).length === 0) {
return
Expand Down Expand Up @@ -1046,7 +1051,11 @@ export const experimentLogic = kea<experimentLogicType>([
if (!filters) {
return undefined
}
entities = [...(filters?.events || []), ...(filters?.actions || [])] as ActionFilterType[]
entities = [
...(filters?.events || []),
...(filters?.actions || []),
...(filters?.data_warehouse || []),
] as ActionFilterType[]
}

// Find out if we're using count per actor math aggregates averages per user
Expand Down
11 changes: 8 additions & 3 deletions posthog/hogql/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.models import Table, FunctionCallTable, SavedQuery
from posthog.hogql.database.database import create_hogql_database
from posthog.hogql.database.database import Database, create_hogql_database
from posthog.hogql.database.s3_table import S3Table
from posthog.hogql.errors import ImpossibleASTError, InternalHogQLError, QueryError, ResolutionError
from posthog.hogql.escape_sql import (
Expand Down Expand Up @@ -66,13 +66,18 @@ def team_id_guard_for_table(table_type: Union[ast.TableType, ast.TableAliasType]
)


def to_printed_hogql(query: ast.Expr, team: Team, modifiers: Optional[HogQLQueryModifiers] = None) -> str:
def to_printed_hogql(
query: ast.Expr, team: Team, modifiers: Optional[HogQLQueryModifiers] = None, database: Optional["Database"] = None
) -> str:
"""Prints the HogQL query without mutating the node"""
return print_ast(
clone_expr(query),
dialect="hogql",
context=HogQLContext(
team_id=team.pk, enable_select_queries=True, modifiers=create_default_modifiers_for_team(team, modifiers)
team_id=team.pk,
enable_select_queries=True,
modifiers=create_default_modifiers_for_team(team, modifiers),
database=database,
),
pretty=True,
)
Expand Down
124 changes: 106 additions & 18 deletions posthog/hogql_queries/experiments/experiment_trends_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from django.conf import settings
from posthog.constants import ExperimentNoResultsErrorKeys
from posthog.hogql import ast
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.database import create_hogql_database
from posthog.hogql.database.models import LazyJoin
from posthog.hogql_queries.experiments import CONTROL_VARIANT_KEY
from posthog.hogql_queries.experiments.trends_statistics import (
are_results_significant,
Expand All @@ -19,6 +22,8 @@
BreakdownFilter,
CachedExperimentTrendsQueryResponse,
ChartDisplayType,
DataWarehouseNode,
DataWarehousePropertyFilter,
EventPropertyFilter,
EventsNode,
ExperimentSignificanceCode,
Expand All @@ -27,11 +32,12 @@
ExperimentVariantTrendsBaseStats,
InsightDateRange,
PropertyMathType,
PropertyOperator,
TrendsFilter,
TrendsQuery,
TrendsQueryResponse,
)
from typing import Any, Optional
from typing import Any, Optional, cast
import threading


Expand Down Expand Up @@ -89,12 +95,18 @@ def _get_insight_date_range(self) -> InsightDateRange:
explicitDate=True,
)

def _get_breakdown_filter(self) -> BreakdownFilter:
def _get_event_breakdown_filter(self) -> BreakdownFilter:
return BreakdownFilter(
breakdown=self.breakdown_key,
breakdown_type="event",
)

def _get_data_warehouse_breakdown_filter(self) -> BreakdownFilter:
return BreakdownFilter(
breakdown=f"events.properties.{self.breakdown_key}",
breakdown_type="data_warehouse",
)

def _prepare_count_query(self) -> TrendsQuery:
"""
This method takes the raw trend query and adapts it
Expand All @@ -118,15 +130,32 @@ def _prepare_count_query(self) -> TrendsQuery:

prepared_count_query.trendsFilter = TrendsFilter(display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE)
prepared_count_query.dateRange = self._get_insight_date_range()
prepared_count_query.breakdownFilter = self._get_breakdown_filter()
prepared_count_query.properties = [
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator="exact",
type="event",
)
]
if self._is_data_warehouse_query(prepared_count_query):
prepared_count_query.breakdownFilter = self._get_data_warehouse_breakdown_filter()
prepared_count_query.properties = [
DataWarehousePropertyFilter(
key="events.event",
value="$feature_flag_called",
operator=PropertyOperator.EXACT,
type="data_warehouse",
),
DataWarehousePropertyFilter(
key=f"events.properties.{self.breakdown_key}",
value=self.variants,
operator=PropertyOperator.EXACT,
type="data_warehouse",
),
]
else:
prepared_count_query.breakdownFilter = self._get_event_breakdown_filter()
prepared_count_query.properties = [
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator=PropertyOperator.EXACT,
type="event",
)
]

return prepared_count_query

Expand All @@ -152,7 +181,7 @@ def _prepare_exposure_query(self) -> TrendsQuery:

if hasattr(count_event, "event"):
prepared_exposure_query.dateRange = self._get_insight_date_range()
prepared_exposure_query.breakdownFilter = self._get_breakdown_filter()
prepared_exposure_query.breakdownFilter = self._get_event_breakdown_filter()
prepared_exposure_query.trendsFilter = TrendsFilter(
display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE
)
Expand All @@ -166,7 +195,7 @@ def _prepare_exposure_query(self) -> TrendsQuery:
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator="exact",
operator=PropertyOperator.EXACT,
type="event",
)
]
Expand All @@ -177,13 +206,13 @@ def _prepare_exposure_query(self) -> TrendsQuery:
elif self.query.exposure_query:
prepared_exposure_query = TrendsQuery(**self.query.exposure_query.model_dump())
prepared_exposure_query.dateRange = self._get_insight_date_range()
prepared_exposure_query.breakdownFilter = self._get_breakdown_filter()
prepared_exposure_query.trendsFilter = TrendsFilter(display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE)
prepared_exposure_query.breakdownFilter = self._get_event_breakdown_filter()
prepared_exposure_query.properties = [
EventPropertyFilter(
key=self.breakdown_key,
value=self.variants,
operator="exact",
operator=PropertyOperator.EXACT,
type="event",
)
]
Expand All @@ -206,13 +235,13 @@ def _prepare_exposure_query(self) -> TrendsQuery:
EventPropertyFilter(
key="$feature_flag_response",
value=self.variants,
operator="exact",
operator=PropertyOperator.EXACT,
type="event",
),
EventPropertyFilter(
key="$feature_flag",
value=[self.feature_flag.key],
operator="exact",
operator=PropertyOperator.EXACT,
type="event",
),
],
Expand All @@ -226,7 +255,63 @@ def calculate(self) -> ExperimentTrendsQueryResponse:

def run(query_runner: TrendsQueryRunner, result_key: str, is_parallel: bool):
try:
result = query_runner.calculate()
database = create_hogql_database(team_id=self.team.pk)
Copy link
Contributor

@jurajmajerik jurajmajerik Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to confirm my understanding: the purpose of this is to create a Database instance, which is an object that lets you run HogQL queries across various data sources, like person-events or data warehouse tables.

We do this because we need to build a custom context with our own Database, including our own table where we define the join in a custom way. Then, we pass this custom context to the query runner.

Please check if the above is correct and leave a comment in the code :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this because we need to build a custom context with our own Database, including our own table where we define the join in a custom way. Then, we pass this custom context to the query runner.

This is correct :) Left a comment in a07bb2a

if self._is_data_warehouse_query(query_runner.query):
series_node = cast(DataWarehouseNode, query_runner.query.series[0])
table = database.get_table(series_node.table_name)
table.fields["events"] = LazyJoin(
from_field=[series_node.distinct_id_field],
join_table=database.get_table("events"),
join_function=lambda join_to_add, context, node: (
ast.JoinExpr(
table=ast.SelectQuery(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently joining over all events, but we should join only over the $feature_flag_events. We probably need a where clause here.

In the future we should also support custom exposure events here, but for now feel free to hardcode $feature_flag_called and leave a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, handled with f131df9

select=[
ast.Alias(alias=name, expr=ast.Field(chain=["events", *chain]))
for name, chain in {
**join_to_add.fields_accessed,
"timestamp": ["timestamp"],
"distinct_id": ["distinct_id"],
"properties": ["properties"],
}.items()
],
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
),
join_type="ASOF LEFT JOIN",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to have a comment here explaining what we're doing, especially because some of how the ASOF JOIN works is only explicit in the Clickhouse docs:

        # ASOF JOIN finds the most recent matching event that occurred at or before each data warehouse timestamp.
        # 
        # Why this matters:
        # When a user performs an action (recorded in data warehouse), we want to know which
        # experiment variant they were assigned at that moment. The most recent $feature_flag_called
        # event before their action represents their active variant assignment.
        #
        # Example:
        #   Data Warehouse: timestamp=2024-01-03 12:00, distinct_id=user1
        #   Events: 
        #     2024-01-02: (user1, variant='control')   <- This event will be joined
        #     2024-01-03: (user1, variant='test')     <- Ignored
        #
        # This ensures we capture the correct causal relationship: which experiment variant
        # was the user assigned to when they performed the action?

(feel free to adjust)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, added the ASOF LEFT JOIN comment in e435442

alias=join_to_add.to_table,
constraint=ast.JoinConstraint(
expr=ast.And(
exprs=[
ast.CompareOperation(
left=ast.Field(
chain=[
join_to_add.from_table,
series_node.distinct_id_field,
]
),
op=ast.CompareOperationOp.Eq,
right=ast.Field(chain=[join_to_add.to_table, "distinct_id"]),
),
ast.CompareOperation(
left=ast.Field(
chain=[
join_to_add.from_table,
series_node.timestamp_field,
]
),
op=ast.CompareOperationOp.GtEq,
right=ast.Field(chain=[join_to_add.to_table, "timestamp"]),
),
]
),
constraint_type="ON",
),
)
),
)

context = HogQLContext(team_id=self.team.pk, database=database)

result = query_runner.calculate(context=context)
shared_results[result_key] = result
except Exception as e:
errors.append(e)
Expand Down Expand Up @@ -362,5 +447,8 @@ def _validate_event_variants(self, count_result: TrendsQueryResponse):
if has_errors:
raise ValidationError(detail=json.dumps(errors))

def _is_data_warehouse_query(self, query: TrendsQuery) -> bool:
return isinstance(query.series[0], DataWarehouseNode)

def to_query(self) -> ast.SelectQuery:
raise ValueError(f"Cannot convert source query of type {self.query.count_query.kind} to query")
Loading
Loading