Skip to content

Commit

Permalink
feat(issues-platform): Start writing occurrences to platform (#45712)
Browse files Browse the repository at this point in the history
Use system and project options set in
#45709

Fixes #43975

---------

Co-authored-by: Dan Fuller <[email protected]>
  • Loading branch information
udameli and wedamija authored Mar 23, 2023
1 parent c844d7e commit 74e838a
Show file tree
Hide file tree
Showing 37 changed files with 394 additions and 11 deletions.
4 changes: 4 additions & 0 deletions bin/load-mocks
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,10 @@ if __name__ == "__main__":
load_performance_issues=options.load_performance_issues,
slow=options.slow,
)

from sentry.issues.producer import get_occurrence_producer

get_occurrence_producer().close()
except Exception:
# Avoid reporting any issues recursively back into Sentry
import sys
Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3088,6 +3088,7 @@ def build_cdc_postgres_init_db_volume(settings):
SENTRY_PERFORMANCE_ISSUES_REDUCE_NOISE = False

SENTRY_ISSUE_PLATFORM_RATE_LIMITER_OPTIONS = {}
SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT = 10000

SENTRY_REGION = os.environ.get("SENTRY_REGION", None)
SENTRY_REGION_CONFIG: Iterable[Region] = ()
Expand Down
45 changes: 42 additions & 3 deletions src/sentry/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import random
import re
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta
from hashlib import md5
Expand Down Expand Up @@ -78,6 +79,9 @@
from sentry.grouping.result import CalculatedHashes
from sentry.ingest.inbound_filters import FilterStatKeys
from sentry.issues.grouptype import GroupCategory, reduce_noise
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import produce_occurrence_to_kafka
from sentry.issues.utils import can_create_group, write_occurrence_to_platform
from sentry.killswitches import killswitch_matches_context
from sentry.lang.native.utils import STORE_CRASH_REPORTS_ALL, convert_crashreport_count
from sentry.locks import locks
Expand Down Expand Up @@ -2304,7 +2308,6 @@ def _message_from_metadata(meta: Mapping[str, str]) -> str:

@metrics.wraps("save_event.save_aggregate_performance")
def _save_aggregate_performance(jobs: Sequence[PerformanceJob], projects: ProjectsMapping) -> None:

MAX_GROUPS = (
10 # safety check in case we are passed too many. constant will live somewhere else tbd
)
Expand All @@ -2317,7 +2320,6 @@ def _save_aggregate_performance(jobs: Sequence[PerformanceJob], projects: Projec
# Granular, per-project option
per_project_rate = project.get_option("sentry:performance_issue_creation_rate", 1.0)
if per_project_rate > random.random():

kwargs = _create_kwargs(job)
kwargs["culprit"] = job["culprit"]
kwargs["data"] = materialize_metadata(
Expand All @@ -2327,7 +2329,14 @@ def _save_aggregate_performance(jobs: Sequence[PerformanceJob], projects: Projec
)
kwargs["data"]["last_received"] = job["received_timestamp"]

performance_problems = job["performance_problems"]
all_performance_problems = job["performance_problems"]

# Filter out performance problems that will be later sent to the issues platform
performance_problems = [
problem
for problem in all_performance_problems
if not can_create_group(problem, project)
]
for problem in performance_problems:
problem.fingerprint = md5(problem.fingerprint.encode("utf-8")).hexdigest()

Expand Down Expand Up @@ -2455,6 +2464,35 @@ def _save_aggregate_performance(jobs: Sequence[PerformanceJob], projects: Projec
EventPerformanceProblem(event, performance_problems_by_hash[problem_hash]).save()


@metrics.wraps("save_event.send_occurrence_to_platform")
def _send_occurrence_to_platform(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
for job in jobs:
event = job["event"]
project = event.project
event_id = event.event_id

performance_problems = job["performance_problems"]
for problem in performance_problems:
if write_occurrence_to_platform(problem, project):
occurrence = IssueOccurrence(
id=uuid.uuid4().hex,
resource_id=None,
project_id=project.id,
event_id=event_id,
fingerprint=[problem.fingerprint],
type=problem.type,
issue_title=problem.title,
subtitle=problem.desc,
culprit=event.transaction,
evidence_data=problem.evidence_data,
evidence_display=problem.evidence_display,
detection_time=event.datetime,
level="info",
)

produce_occurrence_to_kafka(occurrence)


@metrics.wraps("event_manager.save_transaction_events")
def save_transaction_events(jobs: Sequence[Job], projects: ProjectsMapping) -> Sequence[Job]:
with metrics.timer("event_manager.save_transactions.collect_organization_ids"):
Expand Down Expand Up @@ -2493,6 +2531,7 @@ def save_transaction_events(jobs: Sequence[Job], projects: ProjectsMapping) -> S
_nodestore_save_many(jobs)
_eventstream_insert_many(jobs)
_track_outcome_accepted_many(jobs)
_send_occurrence_to_platform(jobs, projects)
return jobs


Expand Down
32 changes: 27 additions & 5 deletions src/sentry/issues/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from sentry.ratelimits.sliding_windows import Quota, RedisSlidingWindowRateLimiter, RequestedQuota
from sentry.utils import metrics, redis

from .utils import can_create_group

issue_rate_limiter = RedisSlidingWindowRateLimiter(
**settings.SENTRY_ISSUE_PLATFORM_RATE_LIMITER_OPTIONS
)
Expand All @@ -39,7 +41,11 @@
def save_issue_occurrence(
occurrence_data: IssueOccurrenceData, event: Event
) -> Tuple[IssueOccurrence, Optional[GroupInfo]]:
process_occurrence_data(occurrence_data)
# Do not hash fingerprints for performance issues because they're already
# hashed in save_aggregate_performance
# This check should be removed once perf issues are created through the platform
if can_create_group(occurrence_data, event.project):
process_occurrence_data(occurrence_data)
# Convert occurrence data to `IssueOccurrence`
occurrence = IssueOccurrence.from_dict(occurrence_data)
if occurrence.event_id != event.event_id:
Expand All @@ -57,6 +63,9 @@ def save_issue_occurrence(
group_info = save_issue_from_occurrence(occurrence, event, release)
if group_info:
send_issue_occurrence_to_eventstream(event, occurrence, group_info)

if not can_create_group(occurrence_data, event.project):
return occurrence, group_info
environment = event.get_environment()
_get_or_create_group_environment(environment, release, [group_info])
_increment_release_associated_counts(
Expand Down Expand Up @@ -154,7 +163,15 @@ def save_issue_from_occurrence(
.select_related("group")
.first()
)

# This forces an early return to skip extra processing steps
# for performance issues because they are already created/updated in save_transaction
return_group_info_early = not can_create_group(occurrence, project)

if not existing_grouphash:
if return_group_info_early:
return None

cluster_key = settings.SENTRY_ISSUE_PLATFORM_RATE_LIMITER_OPTIONS.get("cluster", "default")
client = redis.redis_clusters.get(cluster_key)
if not should_create_group(occurrence.type, client, new_grouphash, project):
Expand Down Expand Up @@ -203,9 +220,12 @@ def save_issue_from_occurrence(
return None

is_new = False
# Note: This updates the message of the issue based on the event. Not sure what we want to
# store there yet, so we may need to revisit that.
is_regression = _process_existing_aggregate(group, event, issue_kwargs, release)
is_regression = False

if not return_group_info_early:
# Note: This updates the message of the issue based on the event. Not sure what we want to
# store there yet, so we may need to revisit that.
is_regression = _process_existing_aggregate(group, event, issue_kwargs, release)
group_info = GroupInfo(group=group, is_new=is_new, is_regression=is_regression)

return group_info
Expand All @@ -217,14 +237,16 @@ def send_issue_occurrence_to_eventstream(
group_event = event.for_group(group_info.group)
group_event.occurrence = occurrence

skip_consume = not can_create_group(occurrence, event.project)

eventstream.insert(
event=group_event,
is_new=group_info.is_new,
is_regression=group_info.is_regression,
is_new_group_environment=group_info.is_new_group_environment,
primary_hash=occurrence.fingerprint[0],
received_timestamp=group_event.data.get("received") or group_event.datetime,
skip_consume=False,
skip_consume=skip_consume,
group_states=[
{
"id": group_info.group.id,
Expand Down
71 changes: 71 additions & 0 deletions src/sentry/issues/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import logging
from atexit import register
from collections import deque
from concurrent import futures
from concurrent.futures import Future
from typing import Deque

from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from arroyo.types import BrokerValue
from django.conf import settings

from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.utils import json
from sentry.utils.kafka_config import get_kafka_producer_cluster_options

occurrence_producer = None


def get_occurrence_producer() -> KafkaProducer:
global occurrence_producer
if occurrence_producer is None:
cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_INGEST_OCCURRENCES]["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
occurrence_producer = KafkaProducer(
build_kafka_configuration(default_config=producer_config)
)
return occurrence_producer


def produce_occurrence_to_kafka(occurrence: IssueOccurrence) -> None:
if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
# If we're not running Kafka then we're just in dev. Skip producing here and just log for
# debugging.
logging.info(
"Attempted to produce occurrence to Kafka, but Kafka isn't running",
extra={"occurrence": occurrence},
)
return
payload = KafkaPayload(None, json.dumps(occurrence.to_dict()).encode("utf-8"), [])
occurrence_producer = get_occurrence_producer()
future = occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload)

track_occurrence_producer_futures(future)


occurrence_producer_futures: Deque[Future[BrokerValue[KafkaPayload]]] = deque()


def track_occurrence_producer_futures(future: Future[BrokerValue[KafkaPayload]]) -> None:
global occurrence_producer_futures
occurrence_producer_futures.append(future)
if len(occurrence_producer_futures) >= settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT:
try:
future = occurrence_producer_futures.popleft()
future.result()
except IndexError:
return


def handle_occurrence_producer() -> None:
futures.wait(occurrence_producer_futures)
if occurrence_producer:
occurrence_producer.close()


register(handle_occurrence_producer)
45 changes: 45 additions & 0 deletions src/sentry/issues/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Union

from sentry import options
from sentry.issues.grouptype import (
GroupCategory,
PerformanceNPlusOneGroupType,
get_group_type_by_type_id,
)
from sentry.issues.issue_occurrence import IssueOccurrence, IssueOccurrenceData
from sentry.models import Project
from sentry.models.group import Group
from sentry.utils.performance_issues.performance_problem import PerformanceProblem


def can_create_group(
entity: Union[IssueOccurrence, IssueOccurrenceData, PerformanceProblem, Group], project: Project
) -> bool:
if isinstance(entity, dict):
group_type = get_group_type_by_type_id(entity["type"])
elif isinstance(entity, Group):
group_type = entity.issue_type
else:
group_type = entity.type
return bool(
group_type.category != GroupCategory.PERFORMANCE.value
or (
# create N+1 db query issues first
group_type.type_id == PerformanceNPlusOneGroupType.type_id
# system-wide option
and options.get("performance.issues.create_issues_through_platform", False)
# more-granular per-project option
and project.get_option("sentry:performance_issue_create_issue_through_platform", False)
)
)


def write_occurrence_to_platform(performance_problem: PerformanceProblem, project: Project) -> bool:
return bool(
# handle only N+1 db query detector first
performance_problem.type.type_id == PerformanceNPlusOneGroupType.type_id
# system-wide option
and options.get("performance.issues.send_to_issues_platform", False)
# more-granular per-project option
and project.get_option("sentry:performance_issue_send_to_issues_platform", False)
)
9 changes: 8 additions & 1 deletion src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,16 @@
register("performance.issues.render_blocking_assets.fcp_ratio_threshold", default=0.33)
register("performance.issues.render_blocking_assets.size_threshold", default=1000000)

# System-wide option for performance issue creation through issues platform
# System-wide option for sending occurrences to the issues platform
register("performance.issues.send_to_issues_platform", default=False, flags=FLAG_MODIFIABLE_BOOL)

# System-wide option for performance issue creation through issues platform
register(
"performance.issues.create_issues_through_platform",
default=False,
flags=FLAG_MODIFIABLE_BOOL,
)

# Dynamic Sampling system wide options
# Killswitch to disable new dynamic sampling behavior specifically new dynamic sampling biases
register("dynamic-sampling:enabled-biases", default=True)
Expand Down
6 changes: 5 additions & 1 deletion src/sentry/projectoptions/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@
# Can be used to turn off a projects detection for users if there is a project-specific issue.
register(key="sentry:performance_issue_creation_rate", default=1.0)

# Rate at which performance problems are sent to issues platform. Defaults to False, system flags and options will determine if an organization sends perf problems to platform.
# Can be used to turn off writing occurrences for users if there is a project-specific issue.
register(key="sentry:performance_issue_send_to_issues_platform", default=False)

# Rate at which performance issues are created through issues platform per project. Defaults to False, system flags and options will determine if an organization creates issues through platform.
# Can be used to turn off issue creation for users if there is a project-specific issue.
register(key="sentry:performance_issue_send_to_issues_platform", default=False)
register(key="sentry:performance_issue_create_issue_through_platform", default=False)

DEFAULT_PROJECT_PERFORMANCE_DETECTION_SETTINGS = {
"n_plus_one_db_detection_rate": 1.0,
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/tasks/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,10 @@ def post_process_group(
def run_post_process_job(job: PostProcessJob):
group_event = job["event"]
issue_category = group_event.group.issue_category

if not group_event.group.issue_type.allow_post_process_group(group_event.group.organization):
return

if issue_category not in GROUP_CATEGORY_POST_PROCESS_PIPELINE:
# pipeline for generic issues
pipeline = GENERIC_POST_PROCESS_PIPELINE
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/testutils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ def inject_performance_problems(jobs, _):
parent_span_ids=None,
cause_span_ids=None,
offender_span_ids=None,
evidence_data={},
evidence_display=[],
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def _store_performance_problem(self) -> None:
cause_span_ids=cause_span_ids,
parent_span_ids=None,
offender_span_ids=offender_span_ids,
evidence_data={},
evidence_display=[],
)

self._reset_variables()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def _store_performance_problem(self) -> None:
cause_span_ids=[],
parent_span_ids=None,
offender_span_ids=offender_span_ids,
evidence_data={},
evidence_display=[],
)

self._reset_variables()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def on_complete(self):
type=self.group_type,
cause_span_ids=[],
offender_span_ids=[span["span_id"] for span in span_list if "span_id" in span],
evidence_data={},
evidence_display=[],
)

def is_creation_allowed_for_project(self, project: Project) -> bool:
Expand Down
Loading

0 comments on commit 74e838a

Please sign in to comment.