Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(aci): pass WorkflowJob into process_workflows #82096

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/sentry/tasks/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,10 +1002,8 @@ def process_workflow_engine(job: PostProcessJob) -> None:
# If the flag is enabled, use the code below
from sentry.workflow_engine.processors.workflow import process_workflows

evt = job["event"]

with sentry_sdk.start_span(op="tasks.post_process_group.workflow_engine.process_workflow"):
process_workflows(evt)
process_workflows(job)


def process_rules(job: PostProcessJob) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/handlers/action/notification.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler
Expand All @@ -8,7 +8,7 @@
class NotificationActionHandler(ActionHandler):
@staticmethod
def execute(
evt: GroupEvent,
job: PostProcessJob,
action: Action,
detector: Detector,
) -> None:
Expand Down
4 changes: 1 addition & 3 deletions src/sentry/workflow_engine/handlers/condition/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
__all__ = [
"GroupEventConditionHandler",
]
__all__ = ["GroupEventConditionHandler"]

from .group_event import GroupEventConditionHandler
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any

from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler
Expand All @@ -22,8 +22,8 @@ def get_nested_value(data: Any, path: str, default: Any = None) -> Any | None:


@condition_handler_registry.register(Condition.GROUP_EVENT_ATTR_COMPARISON)
class GroupEventConditionHandler(DataConditionHandler[GroupEvent]):
class GroupEventConditionHandler(DataConditionHandler[PostProcessJob]):
@staticmethod
def evaluate_value(data: GroupEvent, comparison: Any, data_filter: str) -> bool:
def evaluate_value(data: PostProcessJob, comparison: Any, data_filter: str) -> bool:
event_value = get_nested_value(data, data_filter)
return event_value == comparison
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/models/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.eventstore.models import GroupEvent
from sentry.notifications.models.notificationaction import ActionTarget
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.registry import action_handler_registry
from sentry.workflow_engine.types import ActionHandler

Expand Down Expand Up @@ -59,7 +59,7 @@ def get_handler(self) -> ActionHandler:
action_type = Action.Type(self.type)
return action_handler_registry.get(action_type)

def trigger(self, evt: GroupEvent, detector: Detector) -> None:
def trigger(self, job: PostProcessJob, detector: Detector) -> None:
# get the handler for the action type
handler = self.get_handler()
handler.execute(evt, self, detector)
handler.execute(job, self, detector)
2 changes: 1 addition & 1 deletion src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def evaluate_value(self, value: T) -> DataConditionResult:
op: Callable | None = None

try:
# Use a custom hanler
# Use a custom handler
condition_handler = self.get_condition_handler()
except NoRegistrationExistsError:
# If it's not a custom handler, use the default operators
Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.eventstore.models import GroupEvent
from sentry.models.owner_base import OwnerModel
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group

from .json_config import JSONConfigBase
Expand Down Expand Up @@ -53,15 +53,15 @@ class Meta:
)
]

def evaluate_trigger_conditions(self, evt: GroupEvent) -> bool:
def evaluate_trigger_conditions(self, job: PostProcessJob) -> bool:
"""
Evaluate the conditions for the workflow trigger and return if the evaluation was successful.
If there aren't any workflow trigger conditions, the workflow is considered triggered.
"""
if self.when_condition_group is None:
return True

evaluation, _ = evaluate_condition_group(self.when_condition_group, evt)
evaluation, _ = evaluate_condition_group(self.when_condition_group, job)
return evaluation


Expand Down
6 changes: 3 additions & 3 deletions src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, DataConditionGroup, Workflow
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group


def evaluate_workflow_action_filters(
workflows: set[Workflow], evt: GroupEvent
workflows: set[Workflow], job: PostProcessJob
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()

Expand All @@ -17,7 +17,7 @@ def evaluate_workflow_action_filters(
).distinct()

for action_condition in action_conditions:
evaluation, result = evaluate_condition_group(action_condition, evt)
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
filtered_action_groups.add(action_condition)
Expand Down
10 changes: 6 additions & 4 deletions src/sentry/workflow_engine/processors/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import logging

from sentry.eventstore.models import GroupEvent
from sentry.issues.grouptype import ErrorGroupType
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
from sentry.workflow_engine.models import DataPacket, Detector
from sentry.workflow_engine.types import DetectorGroupKey
Expand All @@ -14,11 +14,13 @@


# TODO - cache these by evt.group_id? :thinking:
def get_detector_by_event(evt: GroupEvent) -> Detector:
issue_occurrence = evt.occurrence
def get_detector_by_event(job: PostProcessJob) -> Detector:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: get_detector_by_job?

issue_occurrence = job["event"].occurrence
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be nice to pull this off event = job["event"] since you do that twice


if issue_occurrence is None:
detector = Detector.objects.get(project_id=evt.project_id, type=ErrorGroupType.slug)
detector = Detector.objects.get(
project_id=job["event"].project_id, type=ErrorGroupType.slug
)
else:
detector = Detector.objects.get(id=issue_occurrence.evidence_data.get("detector_id", None))

Expand Down
20 changes: 11 additions & 9 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import sentry_sdk

from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.utils import metrics
from sentry.workflow_engine.models import Detector, Workflow
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
Expand All @@ -11,39 +11,41 @@
logger = logging.getLogger(__name__)


def evaluate_workflow_triggers(workflows: set[Workflow], evt: GroupEvent) -> set[Workflow]:
def evaluate_workflow_triggers(workflows: set[Workflow], job: PostProcessJob) -> set[Workflow]:
triggered_workflows: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(evt):
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)

return triggered_workflows


def process_workflows(evt: GroupEvent) -> set[Workflow]:
def process_workflows(job: PostProcessJob) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
the workflow will be added to a unique list of triggered workflows.

Finally, each of the triggered workflows will have their actions evaluated and executed.

**kwargs are added only for issue alerts, as there is a EventState object that is evaluated in conditions.
"""
# Check to see if the GroupEvent has an issue occurrence
try:
detector = get_detector_by_event(evt)
detector = get_detector_by_event(job)
except Detector.DoesNotExist:
metrics.incr("workflow_engine.process_workflows.error")
logger.exception("Detector not found for event", extra={"event_id": evt.event_id})
logger.exception("Detector not found for event", extra={"event_id": job["event"].event_id})
return set()

# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct())
triggered_workflows = evaluate_workflow_triggers(workflows, evt)
actions = evaluate_workflow_action_filters(triggered_workflows, evt)
triggered_workflows = evaluate_workflow_triggers(workflows, job)
actions = evaluate_workflow_action_filters(triggered_workflows, job)

with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
for action in actions:
action.trigger(evt, detector)
action.trigger(job, detector)

return triggered_workflows
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from sentry.types.group import PriorityLevel

if TYPE_CHECKING:
from sentry.eventstore.models import GroupEvent
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models import Action, Detector

T = TypeVar("T")
Expand All @@ -30,7 +30,7 @@ class DetectorPriorityLevel(IntEnum):

class ActionHandler:
@staticmethod
def execute(group_event: GroupEvent, action: Action, detector: Detector) -> None:
def execute(job: PostProcessJob, action: Action, detector: Detector) -> None:
raise NotImplementedError


Expand Down
8 changes: 5 additions & 3 deletions tests/sentry/workflow_engine/models/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry.tasks.post_process import PostProcessJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest


Expand All @@ -8,21 +9,22 @@ def setUp(self):
)
self.data_condition = self.data_condition_group.conditions.first()
self.group, self.event, self.group_event = self.create_group_event()
self.job = PostProcessJob({"event": self.group_event})

def test_evaluate_trigger_conditions__condition_new_event__True(self):
evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True

def test_evaluate_trigger_conditions__condition_new_event__False(self):
# Update event to have been seen before
self.group_event.group.times_seen = 5

evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is False

def test_evaluate_trigger_conditions__no_conditions(self):
self.workflow.when_condition_group = None
self.workflow.save()

evaluation = self.workflow.evaluate_trigger_conditions(self.group_event)
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True
10 changes: 6 additions & 4 deletions tests/sentry/workflow_engine/processors/test_action.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry.tasks.post_process import PostProcessJob
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
Expand All @@ -17,21 +18,22 @@ def setUp(self):
self.group, self.event, self.group_event = self.create_group_event(
occurrence=self.build_occurrence_data(evidence_data={"detector_id": self.detector.id})
)
self.job = PostProcessJob({"event": self.group_event})

def test_basic__no_filter(self):
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert set(triggered_actions) == {self.action}

def test_basic__with_filter__passes(self):
self.create_data_condition(
condition_group=self.action_group,
type=Condition.GROUP_EVENT_ATTR_COMPARISON,
condition="group.times_seen",
condition="event.group.times_seen",
comparison=1,
condition_result=True,
)

triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert set(triggered_actions) == {self.action}

def test_basic__with_filter__filtered(self):
Expand All @@ -43,5 +45,5 @@ def test_basic__with_filter__filtered(self):
comparison=self.detector.id + 1,
)

triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.group_event)
triggered_actions = evaluate_workflow_action_filters({self.workflow}, self.job)
assert not triggered_actions
Loading
Loading