Skip to content

Commit

Permalink
unrevert(feat(eventuser)): Migrate IssuesByTagProcessor away from Eve…
Browse files Browse the repository at this point in the history
…ntUser (#60071)

## Objective:
Create the for_tags method in the EventUser dataclass. Migrates
get_eventuser_callback to use the dataclass.

Unreverts #59672
  • Loading branch information
NisanthanNanthakumar authored Nov 16, 2023
1 parent 8c9174c commit cc7731f
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 136 deletions.
1 change: 1 addition & 0 deletions src/sentry/analytics/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .cron_monitor_created import * # noqa: F401,F403
from .eventuser_endpoint_request import * # noqa: F401,F403
from .eventuser_equality_check import * # noqa: F401,F403
from .eventuser_snuba_query import * # noqa: F401,F403
from .first_cron_checkin_sent import * # noqa: F401,F403
from .first_event_sent import * # noqa: F401,F403
from .first_feedback_sent import * # noqa: F401,F403
Expand Down
16 changes: 16 additions & 0 deletions src/sentry/analytics/events/eventuser_snuba_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sentry import analytics


class EventUserSnubaQuery(analytics.Event):
type = "eventuser_snuba.query"

attributes = (
analytics.Attribute("project_ids", type=list),
analytics.Attribute("query"),
analytics.Attribute("count_rows_returned", required=True, type=int),
analytics.Attribute("count_rows_filtered", required=True, type=int),
analytics.Attribute("query_time_ms", type=int),
)


analytics.register(EventUserSnubaQuery)
11 changes: 9 additions & 2 deletions src/sentry/data_export/processors/issues_by_tag.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from sentry import tagstore
from sentry.models.eventuser import EventUser
from sentry.models.eventuser import EventUser as EventUser_model
from sentry.models.group import Group, get_group_with_redirect
from sentry.models.project import Project
from sentry.utils.eventuser import EventUser

from ..base import ExportError

Expand Down Expand Up @@ -97,7 +98,13 @@ def serialize_row(item, key):
}
if key == "user":
euser = item._eventuser
result["id"] = euser.ident if euser else ""
result["id"] = (
euser.user_ident
if euser and isinstance(euser, EventUser)
else euser.ident
if euser and isinstance(euser, EventUser_model)
else ""
)
result["email"] = euser.email if euser else ""
result["username"] = euser.username if euser else ""
result["ip_address"] = euser.ip_address if euser else ""
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_user_tag(projects: Sequence[Project], key: str, value: str) -> str:
# TODO(dcramer): do something with case of multiple matches
try:
if features.has("organizations:eventuser-from-snuba", projects[0].organization):
euser = EventUser.for_projects(projects, {key: value})[0]
euser = EventUser.for_projects(projects, {key: [value]})[0]
else:
lookup = EventUser_model.attr_from_keyword(key)
euser = EventUser_model.objects.filter(
Expand Down
235 changes: 128 additions & 107 deletions src/sentry/utils/eventuser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import logging
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, List, Mapping, Optional, Tuple
from datetime import datetime
from typing import Any, Dict, List, Mapping, Optional

from snuba_sdk import (
BooleanCondition,
Expand All @@ -13,14 +14,15 @@
Direction,
Entity,
Function,
Limit,
Op,
OrderBy,
Query,
Request,
)

from sentry import analytics, features
from sentry.eventstore.models import Event
from sentry.models.eventuser import EventUser as EventUser_model
from sentry.models.project import Project
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.utils.avatar import get_gravatar_url
Expand All @@ -35,7 +37,7 @@
{
("user_id"): "id",
("user_name"): "username",
("email"): "email",
("user_email"): "email",
("ip_address_v4", "ip_address_v6"): "ip",
}
)
Expand Down Expand Up @@ -82,12 +84,18 @@ def get_display_name(self):

@classmethod
def for_projects(
self, projects: List[Project], keyword_filters: Mapping[str, Any]
self,
projects: List[Project],
keyword_filters: Mapping[str, List[Any]],
filter_boolean=BooleanOp.AND,
return_all=False,
) -> List[EventUser]:
"""
Fetch the EventUser with a Snuba query that exists within a list of projects
and valid `keyword_filters`. The `keyword_filter` keys are in `KEYWORD_MAP`.
"""
start_time = time.time()

oldest_project = min(projects, key=lambda item: item.date_added)

where_conditions = [
Expand All @@ -96,59 +104,108 @@ def for_projects(
Condition(Column("timestamp"), Op.GTE, oldest_project.date_added),
]

keyword_where_conditions = []
for keyword, value in keyword_filters.items():
if not isinstance(value, list):
raise ValueError(f"{keyword} filter must be a list of values")

snuba_column = SNUBA_KEYWORD_MAP.get_key(keyword)
if isinstance(snuba_column, tuple):
where_conditions.append(
BooleanCondition(
BooleanOp.OR,
[
Condition(
Column(column),
Op.EQ,
value
if SNUBA_COLUMN_COALASCE.get(column, None) is None
else Function(
SNUBA_COLUMN_COALASCE.get(column), parameters=[value]
),
)
for column in snuba_column
],
for filter_value in value:
keyword_where_conditions.append(
BooleanCondition(
BooleanOp.OR,
[
Condition(
Column(column),
Op.IN,
value
if SNUBA_COLUMN_COALASCE.get(column, None) is None
else Function(
SNUBA_COLUMN_COALASCE.get(column), parameters=[filter_value]
),
)
for column in snuba_column
],
)
)
else:
keyword_where_conditions.append(Condition(Column(snuba_column), Op.IN, value))

if len(keyword_where_conditions) > 1:
where_conditions.append(
BooleanCondition(
filter_boolean,
keyword_where_conditions,
)
)

else:
where_conditions.append(Condition(Column(snuba_column), Op.EQ, value))
if len(keyword_where_conditions) == 1:
where_conditions.extend(
keyword_where_conditions,
)

columns = [
Column("project_id"),
Column("ip_address_v6"),
Column("ip_address_v4"),
Column("user_id"),
Column("user_name"),
Column("user_email"),
]

query = Query(
match=Entity(EntityKey.Events.value),
select=[
Column("project_id"),
Column("group_id"),
Column("ip_address_v6"),
Column("ip_address_v4"),
Column("event_id"),
Column("user_id"),
Column("user"),
Column("user_name"),
Column("user_email"),
*columns,
Function("max", [Column("timestamp")], "latest_timestamp"),
],
where=where_conditions,
limit=Limit(1),
orderby=[OrderBy(Column("timestamp"), Direction.DESC)],
groupby=[*columns],
orderby=[OrderBy(Column("latest_timestamp"), Direction.DESC)],
)

if not return_all:
query.set_limit(1)

request = Request(
dataset=Dataset.Events.value,
app_id=REFERRER,
query=query,
tenant_ids={"referrer": REFERRER, "organization_id": projects[0].organization.id},
)
data_results = raw_snql_query(request, referrer=REFERRER)["data"]
results = [EventUser.from_snuba(result) for result in data_results]

results = self._find_unique(data_results)
end_time = time.time()
analytics.record(
"eventuser_snuba.query",
project_ids=[p.id for p in projects],
query=query.print(),
count_rows_returned=len(data_results),
count_rows_filtered=len(data_results) - len(results),
query_time_ms=int((end_time - start_time) * 1000),
)

return results

@staticmethod
def _find_unique(data_results: List[dict[str, Any]]):
"""
Return the first instance of an EventUser object
with a unique tag_value from the Snuba results.
"""
unique_tag_values = set()
unique_event_users = []

for euser in [EventUser.from_snuba(item) for item in data_results]:
tag_value = euser.tag_value
if tag_value not in unique_tag_values:
unique_event_users.append(euser)
unique_tag_values.add(tag_value)

return unique_event_users

@staticmethod
def from_snuba(result: Mapping[str, Any]) -> EventUser:
"""
Expand All @@ -159,11 +216,47 @@ def from_snuba(result: Mapping[str, Any]) -> EventUser:
project_id=result.get("project_id"),
email=result.get("user_email"),
username=result.get("user_name"),
name=result.get("user_name"),
name=None,
ip_address=result.get("ip_address_v4") or result.get("ip_address_v6"),
user_ident=result.get("user_id"),
)

@classmethod
def for_tags(cls, project_id: int, values):
"""
Finds matching EventUser objects from a list of tag values.
Return a dictionary of {tag_value: event_user}.
"""
projects = Project.objects.filter(id=project_id)

if not features.has("organizations:eventuser-from-snuba", projects[0].organization):
return EventUser_model.for_tags(project_id, values)

result = {}
keyword_filters: Dict[str, Any] = {}
for value in values:
key, value = value.split(":", 1)[0], value.split(":", 1)[-1]
if keyword_filters.get(key):
keyword_filters[key].append(value)
else:
keyword_filters[key] = [value]

eventusers = EventUser.for_projects(
projects, keyword_filters, filter_boolean=BooleanOp.OR, return_all=True
)

for keyword, values in keyword_filters.items():
column = KEYWORD_MAP.get_key(keyword)
for value in values:
matching_euser = next(
(euser for euser in eventusers if getattr(euser, column, None) == value), None
)
if matching_euser:
result[f"{keyword}:{value}"] = matching_euser

return result

@property
def tag_value(self):
"""
Expand All @@ -189,75 +282,3 @@ def serialize(self):
"ipAddress": self.ip_address,
"avatarUrl": get_gravatar_url(self.email, size=32),
}


def find_eventuser_with_snuba(event: Event):
"""
Query Snuba to get the EventUser information for an Event.
"""
start_date, end_date = _start_and_end_dates(event.datetime)

query = _generate_entity_dataset_query(
event.project_id, event.group_id, event.event_id, start_date, end_date
)
request = Request(
dataset=Dataset.Events.value,
app_id=REFERRER,
query=query,
tenant_ids={"referrer": REFERRER, "organization_id": event.project.organization.id},
)
data_results = raw_snql_query(request, referrer=REFERRER)["data"]

if len(data_results) == 0:
logger.info(
"Errors dataset query to find EventUser did not return any results.",
extra={
"event_id": event.event_id,
"project_id": event.project_id,
"group_id": event.group_id,
},
)
return {}

return data_results[0]


def _generate_entity_dataset_query(
project_id: Optional[int],
group_id: Optional[int],
event_id: str,
start_date: datetime,
end_date: datetime,
) -> Query:
"""This simply generates a query based on the passed parameters"""
where_conditions = [
Condition(Column("event_id"), Op.EQ, event_id),
Condition(Column("timestamp"), Op.GTE, start_date),
Condition(Column("timestamp"), Op.LT, end_date),
]
if project_id:
where_conditions.append(Condition(Column("project_id"), Op.EQ, project_id))

if group_id:
where_conditions.append(Condition(Column("group_id"), Op.EQ, group_id))

return Query(
match=Entity(EntityKey.Events.value),
select=[
Column("project_id"),
Column("group_id"),
Column("ip_address_v6"),
Column("ip_address_v4"),
Column("event_id"),
Column("user_id"),
Column("user"),
Column("user_name"),
Column("user_email"),
],
where=where_conditions,
)


def _start_and_end_dates(time: datetime) -> Tuple[datetime, datetime]:
"""Return the 10 min range start and end time range ."""
return time - timedelta(minutes=5), time + timedelta(minutes=5)
Loading

0 comments on commit cc7731f

Please sign in to comment.