Skip to content

Commit

Permalink
fix(query-performance): Make clickhouse_lag celery metric cheap (#13397)
Browse files Browse the repository at this point in the history
* Limit what we measure lag on

* fix(query-performance): make clickhouse_lag faster for other tables
  • Loading branch information
macobo authored Dec 19, 2022
1 parent 26c5246 commit a86c122
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from functools import cached_property

import structlog
from django.conf import settings

Expand Down Expand Up @@ -31,35 +33,40 @@ def get_table_definition(self) -> str:
)
return result[0][0] if len(result) > 0 else ""

operations = [
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD PROJECTION fast_max_kafka_timestamp (SELECT max(_timestamp))",
rollback=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP PROJECTION fast_max_kafka_timestamp",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE PROJECTION fast_max_kafka_timestamp",
rollback=None,
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD INDEX kafka_timestamp_minmax _timestamp TYPE minmax GRANULARITY 3",
rollback=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP INDEX kafka_timestamp_minmax",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE sharded_events ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE INDEX kafka_timestamp_minmax",
rollback=None,
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE events_dead_letter_queue ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD INDEX kafka_timestamp_minmax_dlq _timestamp TYPE minmax GRANULARITY 3",
rollback=f"ALTER TABLE events_dead_letter_queue ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP INDEX kafka_timestamp_minmax_dlq",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE events_dead_letter_queue ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE INDEX kafka_timestamp_minmax_dlq",
rollback=None,
),
]
@cached_property
def operations(self):
operations = []

for table in ["sharded_events", "person", "person_distinct_id2", "sharded_session_recording_events"]:
operations.extend(
[
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD PROJECTION fast_max_kafka_timestamp_{table} (SELECT max(_timestamp))",
rollback=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP PROJECTION fast_max_kafka_timestamp_{table}",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE PROJECTION fast_max_kafka_timestamp_{table}",
rollback=None,
),
]
)

for table in ["sharded_events", "events_dead_letter_queue"]:
operations.extend(
[
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' ADD INDEX kafka_timestamp_minmax_{table} _timestamp TYPE minmax GRANULARITY 3",
rollback=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' DROP INDEX kafka_timestamp_minmax_{table}",
),
AsyncMigrationOperationSQL(
database=AnalyticsDBMS.CLICKHOUSE,
sql=f"ALTER TABLE {table} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' MATERIALIZE INDEX kafka_timestamp_minmax_{table}",
rollback=None,
),
]
)

return operations
2 changes: 1 addition & 1 deletion posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def pg_plugin_server_query_timing():
pass


CLICKHOUSE_TABLES = ["events", "person", "person_distinct_id", "person_distinct_id2", "session_recording_events"]
CLICKHOUSE_TABLES = ["events", "person", "person_distinct_id2", "session_recording_events"]


@app.task(ignore_result=True)
Expand Down
3 changes: 2 additions & 1 deletion posthog/clickhouse/dead_letter_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, kafka_engine, ttl_period
from posthog.clickhouse.table_engines import ReplacingMergeTree
from posthog.kafka_client.topics import KAFKA_DEAD_LETTER_QUEUE
Expand Down Expand Up @@ -43,7 +44,7 @@
cluster=CLICKHOUSE_CLUSTER,
extra_fields=f"""
{KAFKA_COLUMNS}
, INDEX kafka_timestamp_minmax_dlq _timestamp TYPE minmax GRANULARITY 3
, {index_by_kafka_timestamp(DEAD_LETTER_QUEUE_TABLE)}
""",
engine=DEAD_LETTER_QUEUE_TABLE_ENGINE(),
ttl_period=ttl_period("_timestamp", 4), # 4 weeks
Expand Down
8 changes: 8 additions & 0 deletions posthog/clickhouse/indexes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Speeds up selecting max(_timestamp)
def projection_for_max_kafka_timestamp(table: str):
return f"PROJECTION fast_max_kafka_timestamp_{table} (SELECT max(_timestamp))"


# Speeds up filtering by _timestamp columns
def index_by_kafka_timestamp(table: str):
return f"INDEX kafka_timestamp_minmax_{table} _timestamp TYPE minmax GRANULARITY 3"
7 changes: 4 additions & 3 deletions posthog/models/event/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from django.conf import settings

from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL
from posthog.clickhouse.indexes import index_by_kafka_timestamp, projection_for_max_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine, trim_quotes_expr
from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree, ReplicationScheme
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON
Expand Down Expand Up @@ -79,9 +80,9 @@
engine=EVENTS_DATA_TABLE_ENGINE(),
extra_fields=KAFKA_COLUMNS,
materialized_columns=EVENTS_TABLE_MATERIALIZED_COLUMNS,
indexes="""
, PROJECTION fast_max_kafka_timestamp (SELECT max(_timestamp))
, INDEX kafka_timestamp_minmax _timestamp TYPE minmax GRANULARITY 3
indexes=f"""
, {projection_for_max_kafka_timestamp(EVENTS_DATA_TABLE())}
, {index_by_kafka_timestamp(EVENTS_DATA_TABLE())}
""",
sample_by="SAMPLE BY cityHash64(distinct_id)",
storage_policy=STORAGE_POLICY(),
Expand Down
12 changes: 10 additions & 2 deletions posthog/models/person/sql.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL
from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine
from posthog.clickhouse.table_engines import CollapsingMergeTree, ReplacingMergeTree
from posthog.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_DISTINCT_ID, KAFKA_PERSON_UNIQUE_ID
Expand Down Expand Up @@ -39,7 +40,10 @@
table_name=PERSONS_TABLE,
cluster=CLICKHOUSE_CLUSTER,
engine=PERSONS_TABLE_ENGINE(),
extra_fields=KAFKA_COLUMNS,
extra_fields=f"""
{KAFKA_COLUMNS}
, {index_by_kafka_timestamp(PERSONS_TABLE)}
""",
storage_policy=STORAGE_POLICY(),
)

Expand Down Expand Up @@ -182,7 +186,11 @@
table_name=PERSON_DISTINCT_ID2_TABLE,
cluster=CLICKHOUSE_CLUSTER,
engine=PERSON_DISTINCT_ID2_TABLE_ENGINE(),
extra_fields=KAFKA_COLUMNS + "\n, _partition UInt64",
extra_fields=f"""
{KAFKA_COLUMNS}
, _partition UInt64
, {index_by_kafka_timestamp(PERSON_DISTINCT_ID2_TABLE)}
""",
)

KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL = lambda: PERSON_DISTINCT_ID2_TABLE_BASE_SQL.format(
Expand Down
6 changes: 5 additions & 1 deletion posthog/models/session_recording_event/sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.conf import settings

from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, kafka_engine, ttl_period
from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree, ReplicationScheme
from posthog.kafka_client.topics import KAFKA_SESSION_RECORDING_EVENTS
Expand Down Expand Up @@ -86,7 +87,10 @@
table_name=SESSION_RECORDING_EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
materialized_columns=SESSION_RECORDING_EVENTS_MATERIALIZED_COLUMNS,
extra_fields=KAFKA_COLUMNS,
extra_fields=f"""
{KAFKA_COLUMNS}
, {index_by_kafka_timestamp(SESSION_RECORDING_EVENTS_DATA_TABLE())}
""",
engine=SESSION_RECORDING_EVENTS_DATA_TABLE_ENGINE(),
ttl_period=ttl_period(),
)
Expand Down

0 comments on commit a86c122

Please sign in to comment.