Skip to content

Commit

Permalink
ES healthcheck: skip message composition task when cluster is green (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
AetherUnbound authored Mar 12, 2024
1 parent d7d4208 commit db6ab8a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
17 changes: 9 additions & 8 deletions catalog/dags/elasticsearch_cluster/healthcheck_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
from datetime import datetime
from textwrap import dedent, indent
from typing import Literal

from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
Expand All @@ -39,6 +40,7 @@
EXPECTED_NODE_COUNT = 6
EXPECTED_DATA_NODE_COUNT = 3
EXPECTED_MASTER_NODE_COUNT = 3
MessageType = Literal["alert", "notification"]


def _format_response_body(response_body: dict) -> str:
Expand All @@ -59,7 +61,7 @@ def _format_response_body(response_body: dict) -> str:
"""


def _compose_red_status(env: Environment, response_body: dict):
def _compose_red_status(env: Environment, response_body: dict) -> str:
message = f"""
Elasticsearch {env} cluster status is **red**.
Expand All @@ -70,7 +72,7 @@ def _compose_red_status(env: Environment, response_body: dict):
return message


def _compose_unexpected_node_count(env: Environment, response_body: dict):
def _compose_unexpected_node_count(env: Environment, response_body: dict) -> str:
node_count = response_body["number_of_nodes"]
data_node_count = response_body["number_of_data_nodes"]
master_node_count = node_count - data_node_count
Expand All @@ -91,7 +93,7 @@ def _compose_unexpected_node_count(env: Environment, response_body: dict):
return message


def _compose_yellow_cluster_health(env: Environment, response_body: dict):
def _compose_yellow_cluster_health(env: Environment, response_body: dict) -> str:
message = f"""
Elasticsearch {env} cluster health is **yellow**.
Expand All @@ -104,7 +106,7 @@ def _compose_yellow_cluster_health(env: Environment, response_body: dict):


@task
def ping_healthcheck(env: str, es_host: str):
def ping_healthcheck(env: str, es_host: str) -> dict:
es_conn: Elasticsearch = ElasticsearchPythonHook(hosts=[es_host]).get_conn

response = es_conn.cluster.health()
Expand All @@ -115,7 +117,7 @@ def ping_healthcheck(env: str, es_host: str):
@task
def compose_notification(
env: Environment, response_body: dict, is_data_refresh_running: bool
):
) -> tuple[MessageType, str]:
status = response_body["status"]

if status == "red":
Expand All @@ -133,12 +135,11 @@ def compose_notification(

return "notification", _compose_yellow_cluster_health(env, response_body)

logger.info(f"Cluster health was green; {json.dumps(response_body)}")
return None, None
raise AirflowSkipException(f"Cluster health is green; {json.dumps(response_body)}")


@task
def notify(env: str, message_type_and_string: tuple[str, str]):
def notify(env: str, message_type_and_string: tuple[MessageType, str]):
message_type, message = message_type_and_string

if message_type == "alert":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ def _missing_node_keys(master_nodes: int, data_nodes: int):
),
id="yellow-status-all-nodes-present",
),
pytest.param(
None,
None,
_make_response_body(status="green"),
id="green-status",
marks=pytest.mark.raises(exception=AirflowSkipException),
),
),
)
def test_compose_notification(
Expand Down

0 comments on commit db6ab8a

Please sign in to comment.