Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query-performance): Make clickhouse_lag celery metric cheap #13397

Merged
merged 2 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from functools import cached_property

import structlog
from django.conf import settings

Expand Down Expand Up @@ -31,35 +33,40 @@ def get_table_definition(self) -> str:
)
return result[0][0] if len(result) > 0 else ""

operations = [
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD PROJECTION fast_max_kafka_timestamp (SELECT max(_timestamp))",
rollback=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP PROJECTION fast_max_kafka_timestamp",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE PROJECTION fast_max_kafka_timestamp",
rollback=None,
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD INDEX kafka_timestamp_minmax _timestamp TYPE minmax GRANULARITY 3",
rollback=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP INDEX kafka_timestamp_minmax",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE INDEX kafka_timestamp_minmax",
rollback=None,
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE events_dead_letter_queue ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD INDEX kafka_timestamp_minmax_dlq _timestamp TYPE minmax GRANULARITY 3",
rollback=f"ALTER TABLE events_dead_letter_queue ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP INDEX kafka_timestamp_minmax_dlq",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE events_dead_letter_queue ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE INDEX kafka_timestamp_minmax_dlq",
rollback=None,
),
]
@cached_property
def operations(self):
operations = []

for table in ["sharded_events", "person", "person_distinct_id2", "sharded_session_recording_events"]:
operations.extend(
[
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD PROJECTION fast_max_kafka_timestamp_{table} (SELECT max(_timestamp))",
rollback=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP PROJECTION fast_max_kafka_timestamp_{table}",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE PROJECTION fast_max_kafka_timestamp_{table}",
rollback=None,
),
]
)

for table in ["sharded_events", "events_dead_letter_queue"]:
operations.extend(
[
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD INDEX kafka_timestamp_minmax_{table} _timestamp TYPE minmax GRANULARITY 3",
rollback=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP INDEX kafka_timestamp_minmax_{table}",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE INDEX kafka_timestamp_minmax_{table}",
rollback=None,
),
]
)

return operations
2 changes: 1 addition & 1 deletion posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def pg_plugin_server_query_timing():
pass


CLICKHOUSE_TABLES = ["events", "person", "person_distinct_id", "person_distinct_id2", "session_recording_events"]
CLICKHOUSE_TABLES = ["events", "person", "person_distinct_id2", "session_recording_events"]


@app.task(ignore_result=True)
Expand Down
3 changes: 2 additions & 1 deletion posthog/clickhouse/dead_letter_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, kafka_engine, ttl_period
from posthog.clickhouse.table_engines import ReplacingMergeTree
from posthog.kafka_client.topics import KAFKA_DEAD_LETTER_QUEUE
Expand Down Expand Up @@ -43,7 +44,7 @@
cluster=CLICKHOUSE_CLUSTER,
extra_fields=f"""
{KAFKA_COLUMNS}
, INDEX kafka_timestamp_minmax_dlq _timestamp TYPE minmax GRANULARITY 3
, {index_by_kafka_timestamp(DEAD_LETTER_QUEUE_TABLE)}
""",
engine=DEAD_LETTER_QUEUE_TABLE_ENGINE(),
ttl_period=ttl_period("_timestamp", 4), # 4 weeks
Expand Down
8 changes: 8 additions & 0 deletions posthog/clickhouse/indexes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Speeds up selecting max(_timestamp)
def projection_for_max_kafka_timestamp(table: str):
return f"PROJECTION fast_max_kafka_timestamp_{table} (SELECT max(_timestamp))"


# Speeds up filtering by _timestamp columns
def index_by_kafka_timestamp(table: str):
return f"INDEX kafka_timestamp_minmax_{table} _timestamp TYPE minmax GRANULARITY 3"
7 changes: 4 additions & 3 deletions posthog/models/event/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from django.conf import settings

from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL
from posthog.clickhouse.indexes import index_by_kafka_timestamp, projection_for_max_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine, trim_quotes_expr
from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree, ReplicationScheme
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON
Expand Down Expand Up @@ -79,9 +80,9 @@
engine=EVENTS_DATA_TABLE_ENGINE(),
extra_fields=KAFKA_COLUMNS,
materialized_columns=EVENTS_TABLE_MATERIALIZED_COLUMNS,
indexes="""
, PROJECTION fast_max_kafka_timestamp (SELECT max(_timestamp))
, INDEX kafka_timestamp_minmax _timestamp TYPE minmax GRANULARITY 3
indexes=f"""
, {projection_for_max_kafka_timestamp(EVENTS_DATA_TABLE())}
, {index_by_kafka_timestamp(EVENTS_DATA_TABLE())}
""",
sample_by="SAMPLE BY cityHash64(distinct_id)",
storage_policy=STORAGE_POLICY(),
Expand Down
12 changes: 10 additions & 2 deletions posthog/models/person/sql.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL
from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine
from posthog.clickhouse.table_engines import CollapsingMergeTree, ReplacingMergeTree
from posthog.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_DISTINCT_ID, KAFKA_PERSON_UNIQUE_ID
Expand Down Expand Up @@ -39,7 +40,10 @@
table_name=PERSONS_TABLE,
cluster=CLICKHOUSE_CLUSTER,
engine=PERSONS_TABLE_ENGINE(),
extra_fields=KAFKA_COLUMNS,
extra_fields=f"""
{KAFKA_COLUMNS}
, {index_by_kafka_timestamp(PERSONS_TABLE)}
""",
storage_policy=STORAGE_POLICY(),
)

Expand Down Expand Up @@ -182,7 +186,11 @@
table_name=PERSON_DISTINCT_ID2_TABLE,
cluster=CLICKHOUSE_CLUSTER,
engine=PERSON_DISTINCT_ID2_TABLE_ENGINE(),
extra_fields=KAFKA_COLUMNS + "\n, _partition UInt64",
extra_fields=f"""
{KAFKA_COLUMNS}
, _partition UInt64
, {index_by_kafka_timestamp(PERSON_DISTINCT_ID2_TABLE)}
""",
)

KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL = lambda: PERSON_DISTINCT_ID2_TABLE_BASE_SQL.format(
Expand Down
6 changes: 5 additions & 1 deletion posthog/models/session_recording_event/sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.conf import settings

from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, kafka_engine, ttl_period
from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree, ReplicationScheme
from posthog.kafka_client.topics import KAFKA_SESSION_RECORDING_EVENTS
Expand Down Expand Up @@ -86,7 +87,10 @@
table_name=SESSION_RECORDING_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
materialized_columns=SESSION_RECORDING_EVENTS_MATERIALIZED_COLUMNS,
extra_fields=KAFKA_COLUMNS,
extra_fields=f"""
{KAFKA_COLUMNS}
, {index_by_kafka_timestamp(SESSION_RECORDING_EVENTS_DATA_TABLE())}
""",
engine=SESSION_RECORDING_EVENTS_DATA_TABLE_ENGINE(),
ttl_period=ttl_period(),
)
Expand Down