Skip to content

Commit

Permalink
Merge pull request #265 from OP-TED/feature/TED-652
Browse files Browse the repository at this point in the history
Feature/ted 652
  • Loading branch information
CaptainOfHacks authored Sep 8, 2022
2 parents 833bf00 + f66dee6 commit b99499b
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 4 deletions.
64 changes: 64 additions & 0 deletions dags/notice_batch_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from itertools import chain, islice

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from dags.notice_batch_worker import NOTICE_BATCH_KEY
from ted_sws import config
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "notice_batch_orchestrator"

NOTICE_STATUS_KEY = "notice_status"
NOTICE_BATCH_SIZE_KEY = "notice_batch_size"
NOTICE_BATCH_SIZE_DEFAULT = 5000
NOTICE_STATUS_VALUE_DEFAULT = "RAW"


def chunks(iterable, chunk_size: int):
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, chunk_size - 1))


@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master', 'notice_batch_orchestrator'])
def notice_batch_orchestrator():
@task
@event_log(TechnicalEventMessage(
message="generate_notice_batches_and_trigger_worker",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
))
)
def generate_notice_batches_and_trigger_worker():
context = get_current_context()
dag_conf = context["dag_run"].conf

notice_status = dag_conf[
NOTICE_STATUS_KEY] if NOTICE_STATUS_KEY in dag_conf.keys() else NOTICE_STATUS_VALUE_DEFAULT
notice_batch_size = dag_conf[
NOTICE_BATCH_SIZE_KEY] if NOTICE_BATCH_SIZE_KEY in dag_conf.keys() else NOTICE_BATCH_SIZE_DEFAULT

mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice_batch_counter = 0
for notice_batch in chunks(notice_repository.get_notice_by_status(notice_status=NoticeStatus[notice_status]),
chunk_size=notice_batch_size):
TriggerDagRunOperator(
task_id=f'trigger_notice_batch_worker_dag_{notice_batch_counter}',
trigger_dag_id="notice_batch_worker",
conf={NOTICE_BATCH_KEY: list(notice_batch)}
).execute(context=context)
notice_batch_counter += 1

generate_notice_batches_and_trigger_worker()


dag = notice_batch_orchestrator()
56 changes: 56 additions & 0 deletions dags/notice_batch_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from ted_sws.event_manager.services.log import log_error
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice

DAG_NAME = "notice_batch_worker"

NOTICE_BATCH_KEY = "notice_batch"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
max_active_runs=256,
max_active_tasks=256,
tags=['master', 'notice_batch_worker'])
def notice_batch_worker():
@task
@event_log(TechnicalEventMessage(
message="process_notice_batch",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
))
)
def process_notice_batch():
context = get_current_context()
dag_conf = context["dag_run"].conf

if NOTICE_BATCH_KEY not in dag_conf.keys():
raise Exception(f"Config key [{NOTICE_BATCH_KEY}] is not present in dag context")

notice_ids_batch = dag_conf[NOTICE_BATCH_KEY]
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)

for notice_id in notice_ids_batch:
notice = notice_repository.get(reference=notice_id)
notice = index_notice(notice=notice)
try:
normalised_notice = normalise_notice(notice=notice)
notice_repository.update(notice=normalised_notice)
except Exception as e:
log_error(message=str(e))

process_notice_batch()


dag = notice_batch_worker()
68 changes: 68 additions & 0 deletions ted_sws/event_manager/services/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from ted_sws.event_manager.model.event_message import EventMessage, TechnicalEventMessage, NoticeEventMessage, \
MappingSuiteEventMessage
from ted_sws.event_manager.services.logger_from_context import get_logger


def log_info(message: str, name: str = None):
get_logger(name=name).info(event_message=EventMessage(message=message))


def log_error(message: str, name: str = None):
get_logger(name=name).error(event_message=EventMessage(message=message))


def log_debug(message: str, name: str = None):
get_logger(name=name).debug(event_message=EventMessage(message=message))


def log_warning(message: str, name: str = None):
get_logger(name=name).warning(event_message=EventMessage(message=message))


def log_technical_info(message: str, name: str = None):
get_logger(name=name).info(event_message=TechnicalEventMessage(message=message))


def log_technical_error(message: str, name: str = None):
get_logger(name=name).error(event_message=TechnicalEventMessage(message=message))


def log_technical_debug(message: str, name: str = None):
get_logger(name=name).debug(event_message=TechnicalEventMessage(message=message))


def log_technical_warning(message: str, name: str = None):
get_logger(name=name).warning(event_message=TechnicalEventMessage(message=message))


def log_notice_info(message: str, name: str = None):
get_logger(name=name).info(event_message=NoticeEventMessage(message=message))


def log_notice_error(message: str, name: str = None):
get_logger(name=name).error(event_message=NoticeEventMessage(message=message))


def log_notice_debug(message: str, name: str = None):
get_logger(name=name).debug(event_message=NoticeEventMessage(message=message))


def log_notice_warning(message: str, name: str = None):
get_logger(name=name).warning(event_message=NoticeEventMessage(message=message))


def log_mapping_suite_info(message: str, name: str = None):
get_logger(name=name).info(event_message=MappingSuiteEventMessage(message=message))


def log_mapping_suite_error(message: str, name: str = None):
get_logger(name=name).error(event_message=MappingSuiteEventMessage(message=message))


def log_mapping_suite_debug(message: str, name: str = None):
get_logger(name=name).debug(event_message=MappingSuiteEventMessage(message=message))


def log_mapping_suite_warning(message: str, name: str = None):
get_logger(name=name).warning(event_message=MappingSuiteEventMessage(message=message))

15 changes: 14 additions & 1 deletion ted_sws/event_manager/services/logger_from_context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, Any, MutableMapping, Union

from ted_sws.event_manager.adapters.event_handler_config import NULLLoggerConfig, ConsoleLoggerConfig
from ted_sws.event_manager.adapters.event_handler_config import NULLLoggerConfig, ConsoleLoggerConfig, \
DAGLoggerConfig, CLILoggerConfig
from ted_sws.event_manager.adapters.event_logger import EventLogger
from ted_sws.event_manager.adapters.log import EVENT_LOGGER_CONTEXT_KEY
from ted_sws.event_manager.adapters.log import is_env_logging_enabled
Expand All @@ -13,6 +14,18 @@
"""


def get_logger(name: str = None) -> EventLogger:
return get_env_logger(EventLogger(DAGLoggerConfig(
name=DAGLoggerConfig.init_logger_name(name)
)))


def get_cli_logger(name: str = None) -> EventLogger:
return get_env_logger(EventLogger(CLILoggerConfig(
name=CLILoggerConfig.init_logger_name(name)
)), is_cli=True)


def get_env_logger(logger: EventLogger, is_cli: bool = False) -> EventLogger:
"""
This method returns the event logger, based on environment:
Expand Down
68 changes: 68 additions & 0 deletions tests/unit/event_manager/services/test_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from ted_sws.event_manager.services.log import log_debug, log_error, log_info, log_warning, log_technical_debug, \
log_technical_error, log_technical_info, log_technical_warning, log_notice_warning, log_notice_error, \
log_notice_debug, log_notice_info, log_mapping_suite_debug, log_mapping_suite_info, log_mapping_suite_error, \
log_mapping_suite_warning


def test_log_info():
log_info("TEST_INFO_MESSAGE")


def test_log_error():
log_error("TEST_ERROR_MESSAGE")


def test_log_debug():
log_debug("TEST_DEBUG_MESSAGE")


def test_log_warning():
log_warning("TEST_WARNING_MESSAGE")


def test_log_technical_info():
log_technical_info("TEST_TECHNICAL_INFO_MESSAGE")


def test_log_technical_error():
log_technical_error("TEST_TECHNICAL_ERROR_MESSAGE")


def test_log_technical_debug():
log_technical_debug("TEST_TECHNICAL_DEBUG_MESSAGE")


def test_log_technical_warning():
log_technical_warning("TEST_TECHNICAL_WARNING_MESSAGE")


def test_log_notice_info():
log_notice_info("TEST_NOTICE_INFO_MESSAGE")


def test_log_notice_error():
log_notice_error("TEST_NOTICE_ERROR_MESSAGE")


def test_log_notice_debug():
log_notice_debug("TEST_NOTICE_DEBUG_MESSAGE")


def test_log_notice_warning():
log_notice_warning("TEST_NOTICE_WARNING_MESSAGE")


def test_log_mapping_suite_info():
log_mapping_suite_info("TEST_MAPPING_SUITE_INFO_MESSAGE")


def test_log_mapping_suite_error():
log_mapping_suite_error("TEST_MAPPING_SUITE_ERROR_MESSAGE")


def test_log_mapping_suite_debug():
log_mapping_suite_debug("TEST_MAPPING_SUITE_DEBUG_MESSAGE")


def test_log_mapping_suite_warning():
log_mapping_suite_warning("TEST_MAPPING_SUITE_WARNING_MESSAGE")
23 changes: 20 additions & 3 deletions tests/unit/event_manager/services/test_logger_from_context.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import os
from unittest import mock

import pytest

from ted_sws.event_manager.adapters.event_handler_config import DAGLoggerConfig
from ted_sws import RUN_ENV_NAME, RUN_ENV_VAL
from ted_sws.event_manager.adapters.event_handler_config import DAGLoggerConfig, NULLLoggerConfig
from ted_sws.event_manager.adapters.event_log_decorator import EVENT_LOGGER_CONTEXT_KEY
from ted_sws.event_manager.adapters.event_logger import EventLogger
from ted_sws.event_manager.adapters.log import ConfigHandlerType
from ted_sws.event_manager.model.event_message import EventMessage, EventMessageProcessType, EventMessageMetadata
from ted_sws.event_manager.services.logger_from_context import get_logger_from_dag_context, \
handle_event_message_metadata_context, handle_event_message_metadata_dag_context, \
get_task_id_from_dag_context, get_dag_id_from_dag_context, get_dag_run_id_from_dag_context
handle_event_message_metadata_context, handle_event_message_metadata_dag_context, get_logger, get_cli_logger, \
get_task_id_from_dag_context, get_dag_id_from_dag_context, get_dag_run_id_from_dag_context, get_env_logger


def test_get_logger():
assert isinstance(get_logger(), EventLogger)


def test_get_cli_logger():
assert isinstance(get_cli_logger(), EventLogger)


def test_get_logger_from_dag_context():
Expand Down Expand Up @@ -64,3 +76,8 @@ def test_handle_event_message_metadata_context():
assert metadata.process_context
assert metadata.process_context['context_var']


@mock.patch.dict(os.environ, {RUN_ENV_NAME: RUN_ENV_VAL})
def test_get_env_logger(monkeypatch):
logger = get_env_logger(EventLogger(NULLLoggerConfig()))
assert isinstance(logger, EventLogger)

0 comments on commit b99499b

Please sign in to comment.