Skip to content

Commit

Permalink
feat(replay): add viewed-by endpoint + abstract kafka utils for repla…
Browse files Browse the repository at this point in the history
  • Loading branch information
aliu39 and cmanallen authored Apr 9, 2024
1 parent d38e564 commit c5c60d1
Show file tree
Hide file tree
Showing 12 changed files with 533 additions and 44 deletions.
6 changes: 6 additions & 0 deletions src/sentry/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
ProjectReplayRecordingSegmentIndexEndpoint,
)
from sentry.replays.endpoints.project_replay_video_details import ProjectReplayVideoDetailsEndpoint
from sentry.replays.endpoints.project_replay_viewed_by import ProjectReplayViewedByEndpoint
from sentry.rules.history.endpoints.project_rule_group_history import (
ProjectRuleGroupHistoryIndexEndpoint,
)
Expand Down Expand Up @@ -2400,6 +2401,11 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]:
ProjectReplayDetailsEndpoint.as_view(),
name="sentry-api-0-project-replay-details",
),
re_path(
r"^(?P<organization_slug>[^/]+)/(?P<project_slug>[^\/]+)/replays/(?P<replay_id>[\w-]+)/viewed-by/$",
ProjectReplayViewedByEndpoint.as_view(),
name="sentry-api-0-project-replay-viewed-by",
),
re_path(
r"^(?P<organization_slug>[^/]+)/(?P<project_slug>[^\/]+)/replays/(?P<replay_id>[\w-]+)/accessibility-issues/$",
ProjectReplayAccessibilityIssuesEndpoint.as_view(),
Expand Down
44 changes: 44 additions & 0 deletions src/sentry/apidocs/examples/replay_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,47 @@ class ReplayExamples:
response_only=True,
)
]

GET_REPLAY_VIEWED_BY = [
OpenApiExample(
"Get list of users who have viewed a replay",
value={
"data": {
"viewed_by": [
{
"id": "884411",
"name": "[email protected]",
"username": "d93522a35cb64c13991104bd73d44519",
"email": "[email protected]",
"avatarUrl": "https://gravatar.com/avatar/d93522a35cb64c13991104bd73d44519d93522a35cb64c13991104bd73d44519?s=32&d=mm",
"isActive": True,
"hasPasswordAuth": False,
"isManaged": False,
"dateJoined": "2022-07-25T23:36:29.593212Z",
"lastLogin": "2024-03-14T18:11:28.740309Z",
"has2fa": True,
"lastActive": "2024-03-15T22:22:06.925934Z",
"isSuperuser": True,
"isStaff": False,
"experiments": {},
"emails": [
{
"id": "2231333",
"email": "[email protected]",
"is_verified": True,
}
],
"avatar": {
"avatarType": "upload",
"avatarUuid": "499dcd0764da42a589654a2224086e67",
"avatarUrl": "https://sentry.io/avatar/499dcd0764da42a589654a2224086e67/",
},
"type": "user",
}
],
}
},
status_codes=[200],
response_only=True,
)
]
147 changes: 147 additions & 0 deletions src/sentry/replays/endpoints/project_replay_viewed_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import uuid
from typing import Any, TypedDict

from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.response import Response

from sentry import features
from sentry.api.api_owners import ApiOwner
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import region_silo_endpoint
from sentry.api.bases.project import ProjectEndpoint
from sentry.apidocs.constants import RESPONSE_BAD_REQUEST, RESPONSE_FORBIDDEN, RESPONSE_NOT_FOUND
from sentry.apidocs.examples.replay_examples import ReplayExamples
from sentry.apidocs.parameters import GlobalParams, ReplayParams
from sentry.apidocs.utils import inline_sentry_response_serializer
from sentry.models.project import Project
from sentry.replays.query import query_replay_viewed_by_ids
from sentry.replays.usecases.events import publish_replay_event, viewed_event
from sentry.services.hybrid_cloud.user.serial import serialize_generic_user
from sentry.services.hybrid_cloud.user.service import user_service


class ReplayViewedByResponsePayload(TypedDict):
viewed_by: list[dict[str, Any]]


class ReplayViewedByResponse(TypedDict):
data: ReplayViewedByResponsePayload


@region_silo_endpoint
@extend_schema(tags=["Replays"])
class ProjectReplayViewedByEndpoint(ProjectEndpoint):
owner = ApiOwner.REPLAY
publish_status = {"GET": ApiPublishStatus.PUBLIC, "POST": ApiPublishStatus.PRIVATE}

@extend_schema(
operation_id="Get list of user who have viewed a replay",
parameters=[
GlobalParams.ORG_SLUG,
GlobalParams.PROJECT_SLUG,
ReplayParams.REPLAY_ID,
],
responses={
200: inline_sentry_response_serializer("GetReplayViewedBy", ReplayViewedByResponse),
400: RESPONSE_BAD_REQUEST,
403: RESPONSE_FORBIDDEN,
404: RESPONSE_NOT_FOUND,
},
examples=ReplayExamples.GET_REPLAY_VIEWED_BY,
)
def get(self, request: Request, project: Project, replay_id: str) -> Response:
"""Return a list of users who have viewed a replay."""
if not features.has(
"organizations:session-replay", project.organization, actor=request.user
):
return Response(status=404)

try:
uuid.UUID(replay_id)
except ValueError:
return Response(status=404)

# query for user ids who viewed the replay
filter_params = self.get_filter_params(request, project, date_filter_optional=False)

# If no rows were found then the replay does not exist and a 404 is returned.
viewed_by_ids_response: list[dict[str, Any]] = query_replay_viewed_by_ids(
project_id=project.id,
replay_id=replay_id,
start=filter_params["start"],
end=filter_params["end"],
request_user_id=request.user.id,
organization=project.organization,
)
if not viewed_by_ids_response:
return Response(status=404)

viewed_by_ids = viewed_by_ids_response[0]["viewed_by_ids"]
if viewed_by_ids == []:
return Response({"data": {"viewed_by": []}}, status=200)

# Note: in the rare/error case where Snuba returns non-existent user ids, this fx will filter them out.
serialized_users = user_service.serialize_many(
filter=dict(user_ids=viewed_by_ids),
as_user=serialize_generic_user(request.user),
)
serialized_users = [_normalize_user(user) for user in serialized_users]

return Response({"data": {"viewed_by": serialized_users}}, status=200)

def post(self, request: Request, project: Project, replay_id: str) -> Response:
"""Create a replay-viewed event."""
if not features.has(
"organizations:session-replay", project.organization, actor=request.user
):
return Response(status=404)

try:
replay_id = str(uuid.UUID(replay_id))
except ValueError:
return Response(status=404)

message = viewed_event(project.id, replay_id, request.user.id)
publish_replay_event(message, is_async=False)

return Response(status=204)


def _normalize_user(user: dict[str, Any]) -> dict[str, Any]:
"""Return a normalized user dictionary.
The viewed-by resource is expected to return a subset of the user_service's
response output.
"""
return {
"avatar": {
"avatarType": user["avatar"]["avatarType"],
"avatarUuid": user["avatar"]["avatarUuid"],
"avatarUrl": user["avatar"]["avatarUrl"],
},
"avatarUrl": user["avatarUrl"],
"dateJoined": user["dateJoined"],
"email": user["email"],
"emails": [
{
"id": email["id"],
"email": email["email"],
"is_verified": email["is_verified"],
}
for email in user["emails"]
],
"experiments": user["experiments"],
"has2fa": user["has2fa"],
"hasPasswordAuth": user["hasPasswordAuth"],
"id": user["id"],
"isActive": user["isActive"],
"isManaged": user["isManaged"],
"isStaff": user["isStaff"],
"isSuperuser": user["isSuperuser"],
"lastActive": user["lastActive"],
"lastLogin": user["lastLogin"],
"name": user["name"],
"type": "user",
"username": user["username"],
}
42 changes: 30 additions & 12 deletions src/sentry/replays/lib/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,40 @@
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.pubsub import KafkaPublisher

replay_publisher: KafkaPublisher | None = None
# We keep a synchronous and asynchronous singleton because a shared singleton could lead
# to synchronous publishing when asynchronous publishing was desired and vice-versa.
sync_publisher: KafkaPublisher | None = None
async_publisher: KafkaPublisher | None = None


def initialize_replays_publisher(is_async=False) -> KafkaPublisher:
global replay_publisher
def initialize_replays_publisher(is_async: bool = False) -> KafkaPublisher:
if is_async:
global async_publisher

if replay_publisher is None:
config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
replay_publisher = KafkaPublisher(
get_kafka_producer_cluster_options(config["cluster"]),
asynchronous=is_async,
)
if async_publisher is None:
async_publisher = _init_replay_publisher(is_async=True)

return replay_publisher
return async_publisher
else:
global sync_publisher

if sync_publisher is None:
sync_publisher = _init_replay_publisher(is_async=False)

return sync_publisher


def _init_replay_publisher(is_async: bool) -> KafkaPublisher:
config = get_topic_definition(Topic.INGEST_REPLAY_EVENTS)
return KafkaPublisher(
get_kafka_producer_cluster_options(config["cluster"]),
asynchronous=is_async,
)


def clear_replay_publisher() -> None:
global replay_publisher
replay_publisher = None
global sync_publisher
global async_publisher

sync_publisher = None
async_publisher = None
2 changes: 1 addition & 1 deletion src/sentry/replays/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _strip_dashes(field: str) -> str:


def generate_normalized_output(
response: list[dict[str, Any]],
response: list[dict[str, Any]]
) -> Generator[ReplayDetailsResponse, None, None]:
"""For each payload in the response strip "agg_" prefixes."""
for item in response:
Expand Down
39 changes: 38 additions & 1 deletion src/sentry/replays/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,34 @@ def query_replay_instance(
)["data"]


def query_replay_viewed_by_ids(
project_id: int | list[int],
replay_id: str,
start: datetime,
end: datetime,
request_user_id: int | None,
organization: Organization | None = None,
) -> list[dict[str, Any]]:
"""Query unique user ids who viewed a given replay."""
if isinstance(project_id, list):
project_ids = project_id
else:
project_ids = [project_id]

return execute_query(
query=make_full_aggregation_query(
fields=["viewed_by_ids"],
replay_ids=[replay_id],
project_ids=project_ids,
period_start=start,
period_end=end,
request_user_id=request_user_id,
),
tenant_id={"organization_id": organization.id} if organization else {},
referrer="replays.query.viewed_by_query",
)["data"]


def query_replays_count(
project_ids: list[int],
start: datetime,
Expand Down Expand Up @@ -559,7 +587,8 @@ def _empty_uuids_lambda():
"info_ids": ["info_ids"],
"count_warnings": ["count_warnings"],
"count_infos": ["count_infos"],
"has_viewed": ["has_viewed"],
"viewed_by_ids": ["viewed_by_ids"],
"has_viewed": ["viewed_by_ids"],
}


Expand Down Expand Up @@ -707,6 +736,14 @@ def _empty_uuids_lambda():
parameters=[Column("count_info_events")],
alias="count_infos",
),
"viewed_by_ids": Function(
"groupUniqArrayIf",
parameters=[
Column("viewed_by_id"),
Function("greater", parameters=[Column("viewed_by_id"), 0]),
],
alias="viewed_by_ids",
),
}


Expand Down
35 changes: 6 additions & 29 deletions src/sentry/replays/tasks.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from __future__ import annotations

import concurrent.futures as cf
import time
import uuid
from typing import Any

from sentry.replays.lib.kafka import initialize_replays_publisher
from sentry.replays.lib.storage import filestore, storage
from sentry.replays.models import ReplayRecordingSegment
from sentry.replays.usecases.events import archive_event
from sentry.replays.usecases.reader import fetch_segments_metadata
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import json, metrics
from sentry.utils import metrics
from sentry.utils.pubsub import KafkaPublisher


Expand Down Expand Up @@ -77,30 +76,8 @@ def delete_replay_recording(project_id: int, replay_id: str) -> None:

def archive_replay(publisher: KafkaPublisher, project_id: int, replay_id: str) -> None:
"""Archive a Replay instance. The Replay is not deleted."""
replay_payload: dict[str, Any] = {
"type": "replay_event",
"replay_id": replay_id,
"event_id": uuid.uuid4().hex,
"segment_id": None,
"trace_ids": [],
"error_ids": [],
"urls": [],
"timestamp": time.time(),
"is_archived": True,
"platform": "",
}
message = archive_event(project_id, replay_id)

publisher.publish(
"ingest-replay-events",
json.dumps(
{
"type": "replay_event",
"start_time": int(time.time()),
"replay_id": replay_id,
"project_id": project_id,
"segment_id": None,
"retention_days": 30,
"payload": list(bytes(json.dumps(replay_payload).encode())),
}
),
)
# We publish manually here because we sometimes provide a managed Kafka
# publisher interface which has its own setup and teardown behavior.
publisher.publish("ingest-replay-events", message)
Loading

0 comments on commit c5c60d1

Please sign in to comment.