Skip to content

Commit

Permalink
Merge branch 'main' into bug-2025-workflow-execution
Browse files Browse the repository at this point in the history
  • Loading branch information
rajeshj11 authored Oct 16, 2024
2 parents 3625e51 + ac9cc2c commit d63dbc1
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 4 deletions.
23 changes: 21 additions & 2 deletions keep/contextmanager/contextmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None):
self.logger, self, tenant_id, workflow_id, workflow_execution_id
)
self.workflow_id = workflow_id
self.workflow_execution_id = workflow_execution_id
self.tenant_id = tenant_id
self.steps_context = {}
self.steps_context_size = 0
Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None):
self.dependencies = set()
self.workflow_execution_id = None
self._api_key = None
self.__loggers = {}

@property
def api_key(self):
Expand All @@ -73,9 +75,26 @@ def api_key(self):
def set_execution_context(self, workflow_execution_id):
self.workflow_execution_id = workflow_execution_id
self.logger_adapter.workflow_execution_id = workflow_execution_id
for logger in self.__loggers.values():
logger.workflow_execution_id = workflow_execution_id

def get_logger(self, name=None):
if not name:
return self.logger_adapter

def get_logger(self):
return self.logger_adapter
if name in self.__loggers:
return self.__loggers[name]

logger = logging.getLogger(name)
logger_adapter = WorkflowLoggerAdapter(
logger,
self,
self.tenant_id,
self.workflow_id,
self.workflow_execution_id,
)
self.__loggers[name] = logger_adapter
return logger_adapter

def set_event_context(self, event):
self.event_context = event
Expand Down
8 changes: 7 additions & 1 deletion keep/providers/base/base_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ def __init__(
self.webhook_markdown = webhook_markdown
self.provider_description = provider_description
self.context_manager = context_manager
self.logger = context_manager.get_logger()
self.logger = context_manager.get_logger(self.provider_id)
self.logger.setLevel(
os.environ.get(
"KEEP_{}_PROVIDER_LOG_LEVEL".format(self.provider_id.upper()),
os.environ.get("LOG_LEVEL", "INFO"),
)
)
self.validate_config()
self.logger.debug(
"Base provider initalized", extra={"provider": self.__class__.__name__}
Expand Down
234 changes: 233 additions & 1 deletion tests/test_workflow_execution.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import asyncio
import logging
import time
from collections import defaultdict
from datetime import datetime, timedelta
from functools import partial
from unittest.mock import patch

import pytest
import pytz

from keep.api.core.db import get_last_workflow_execution_by_workflow_id
from keep.api.core.dependencies import SINGLE_TENANT_UUID
from keep.api.logging import WorkflowLoggerAdapter
from keep.api.models.alert import AlertDto, AlertStatus, IncidentDto
from keep.api.models.db.workflow import Workflow
from keep.api.models.db.workflow import Workflow, WorkflowExecutionLog
from keep.workflowmanager.workflowmanager import WorkflowManager
from tests.fixtures.client import client, test_app # noqa

Expand Down Expand Up @@ -44,6 +49,33 @@
"""


workflow_definition_with_two_providers = """workflow:
id: susu-and-sons
description: Just to test the logs of 2 providers
triggers:
- type: alert
filters:
- key: name
value: "server-is-hamburger"
steps:
- name: keep_step
provider:
type: keep
with:
filters:
- key: status
value: open
actions:
- name: console_action
provider:
type: console
with:
message: |
"Tier 1 Alert: {{ alert.name }} - {{ alert.description }}
Alert details: {{ alert }}"
"""


@pytest.fixture(scope="module")
def workflow_manager():
"""
Expand Down Expand Up @@ -77,6 +109,25 @@ def setup_workflow(db_session):
db_session.commit()


@pytest.fixture
def setup_workflow_with_two_providers(db_session):
"""
Fixture to set up a workflow in the database before each test.
It creates a Workflow object with the predefined workflow definition and adds it to the database.
"""
workflow = Workflow(
id="susu-and-sons",
name="susu-and-sons",
tenant_id=SINGLE_TENANT_UUID,
description="some stuff for unit testing",
created_by="[email protected]",
interval=0,
workflow_raw=workflow_definition_with_two_providers,
)
db_session.add(workflow)
db_session.commit()


@pytest.mark.parametrize(
"test_app, test_case, alert_statuses, expected_tier, db_session",
[
Expand Down Expand Up @@ -794,3 +845,184 @@ def wait_workflow_execution(workflow_id):
assert workflow_execution_deleted.results["mock-action"] == [
'"deleted incident: incident"\n'
]



logs_counter = {}

def count_logs(instance, original_method):
log_levels = logging.getLevelNamesMapping()
def wrapper(*args, **kwargs):
level_name = original_method.__name__.upper()
max_level = instance.getEffectiveLevel()
current_level = log_levels[level_name]
if current_level >= max_level:
logs_counter.setdefault(instance.workflow_execution_id, defaultdict(int))
logs_counter[instance.workflow_execution_id]["all"] += 1
logs_counter[instance.workflow_execution_id][level_name] += 1

return original_method(*args, **kwargs)

return wrapper

def fake_workflow_adapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id):
adapter = WorkflowLoggerAdapter(logger, context_manager, tenant_id, workflow_id, workflow_execution_id)

adapter.info = count_logs(adapter, adapter.info)
adapter.debug = count_logs(adapter, adapter.debug)
adapter.warning = count_logs(adapter, adapter.warning)
adapter.error = count_logs(adapter, adapter.error)
adapter.critical = count_logs(adapter, adapter.critical)
return adapter


@pytest.mark.parametrize(
"test_app, test_case, alert_statuses, expected_tier, db_session",
[
({"AUTH_TYPE": "NOAUTH"}, "No action", [[0, "firing"]], None, None),
],
indirect=["test_app", "db_session"],
)
def test_workflow_execution_logs(
db_session,
test_app,
create_alert,
setup_workflow_with_two_providers,
workflow_manager,
test_case,
alert_statuses,
expected_tier,
):
with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter',
side_effect=fake_workflow_adapter),\
patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True):
base_time = datetime.now(tz=pytz.utc)

# Create alerts with specified statuses and timestamps
alert_statuses.reverse()
for time_diff, status in alert_statuses:
alert_status = (
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED
)
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))

time.sleep(1)
# Create the current alert
current_alert = AlertDto(
id="grafana-1",
source=["grafana"],
name="server-is-hamburger",
status=AlertStatus.FIRING,
severity="critical",
fingerprint="fp1",
)

# Insert the current alert into the workflow manager
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert])

# Wait for the workflow execution to complete
workflow_execution = None
count = 0
status = None
while workflow_execution is None and count < 30 and status != "success":
workflow_execution = get_last_workflow_execution_by_workflow_id(
SINGLE_TENANT_UUID, "susu-and-sons"
)
if workflow_execution is not None:
status = workflow_execution.status
time.sleep(1)
count += 1

# Check if the workflow execution was successful
assert workflow_execution is not None
assert workflow_execution.status == "success"

logs = (
db_session.query(WorkflowExecutionLog)
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution.id)
.all()
)

assert len(logs) == logs_counter[workflow_execution.id]["all"]


@pytest.mark.parametrize(
"test_app, test_case, alert_statuses, expected_tier, db_session",
[
({"AUTH_TYPE": "NOAUTH"}, "No action", [[0, "firing"]], None, None),
],
indirect=["test_app", "db_session"],
)
def test_workflow_execution_logs_log_level_debug_console_provider(
db_session,
test_app,
create_alert,
setup_workflow_with_two_providers,
workflow_manager,
test_case,
alert_statuses,
expected_tier,
monkeypatch,
):

logs_counts = {}
logs_level_counts = {}
for level in ["INFO", "DEBUG"]:
monkeypatch.setenv("KEEP_CONSOLE_PROVIDER_LOG_LEVEL", level)
with patch('keep.contextmanager.contextmanager.WorkflowLoggerAdapter',
side_effect=fake_workflow_adapter), \
patch('keep.api.logging.RUNNING_IN_CLOUD_RUN', value=True):
base_time = datetime.now(tz=pytz.utc)

# Create alerts with specified statuses and timestamps
alert_statuses.reverse()
for time_diff, status in alert_statuses:
alert_status = (
AlertStatus.FIRING if status == "firing" else AlertStatus.RESOLVED
)
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))

time.sleep(1)
# Create the current alert
current_alert = AlertDto(
id="grafana-1-{}".format(level),
source=["grafana"],
name="server-is-hamburger",
status=AlertStatus.FIRING,
severity="critical",
fingerprint="fp1-{}".format(level),
)

# Insert the current alert into the workflow manager
workflow_manager.insert_events(SINGLE_TENANT_UUID, [current_alert])

# Wait for the workflow execution to complete
workflow_execution = None
count = 0
status = None
time.sleep(1)
while workflow_execution is None and count < 30 and status != "success":
workflow_execution = get_last_workflow_execution_by_workflow_id(
SINGLE_TENANT_UUID, "susu-and-sons"
)
if workflow_execution is not None:
status = workflow_execution.status
time.sleep(1)
count += 1

# Check if the workflow execution was successful
assert workflow_execution is not None
assert workflow_execution.status == "success"

logs_counts[workflow_execution.id] = logs_counter[workflow_execution.id]["all"]
logs_level_counts[level] = logs_counter[workflow_execution.id]["all"]

for workflow_execution_id in logs_counts:
logs = (
db_session.query(WorkflowExecutionLog)
.filter(WorkflowExecutionLog.workflow_execution_id == workflow_execution_id)
.all()
)
assert logs_counts[workflow_execution_id] == len(logs)

assert logs_level_counts["DEBUG"] > logs_level_counts["INFO"]

0 comments on commit d63dbc1

Please sign in to comment.