Skip to content

Commit

Permalink
Merge pull request #325 from OP-TED/feature/TED-874
Browse files Browse the repository at this point in the history
optimize Airflow imports
  • Loading branch information
CaptainOfHacks authored Oct 27, 2022
2 parents d703488 + 82b9dfe commit 6d310df
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 52 deletions.
27 changes: 16 additions & 11 deletions dags/notice_validation_workflow.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
from datetime import timedelta, datetime

from airflow.decorators import dag, task
from pymongo import MongoClient
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from ted_sws.event_manager.adapters.event_log_decorator import event_log

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.supra_notice_repository import DailySupraNoticeRepository
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
validate_notice_availability_in_cellar
from ted_sws.supra_notice_manager.services.supra_notice_validator import validate_and_update_daily_supra_notice, \
summary_validation_for_daily_supra_notice

DAG_NAME = "notice_daily_validation_workflow"
NOTICE_PUBLICATION_DATE_DAG_CONF_KEY = "notice_publication_date"
Expand All @@ -41,6 +33,9 @@ def notice_daily_validation_workflow():
))
)
def validate_fetched_notices():
from ted_sws import config
from ted_sws.supra_notice_manager.services.supra_notice_validator import validate_and_update_daily_supra_notice

publication_date = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
validate_and_update_daily_supra_notice(notice_publication_day=publication_date,
Expand All @@ -55,6 +50,10 @@ def validate_fetched_notices():
))
)
def summarize_validation_for_daily_supra_notice():
from ted_sws import config
from ted_sws.supra_notice_manager.services.supra_notice_validator import \
summary_validation_for_daily_supra_notice

publication_date = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
summary_validation_for_daily_supra_notice(notice_publication_day=publication_date,
Expand All @@ -69,6 +68,12 @@ def summarize_validation_for_daily_supra_notice():
))
)
def validate_availability_of_notice_in_cellar():
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.supra_notice_repository import DailySupraNoticeRepository
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
validate_notice_availability_in_cellar

notice_publication_day = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
repo = DailySupraNoticeRepository(mongodb_client=mongodb_client)
Expand Down
7 changes: 3 additions & 4 deletions dags/pipelines/notice_batch_processor_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from typing import List

from pymongo import MongoClient

from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_entities_by_cet_uri

CET_URIS = ["http://www.w3.org/ns/org#Organization"]


Expand All @@ -15,6 +11,9 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M
:param mongodb_client:
:return:
"""
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_entities_by_cet_uri

notices = []
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for notice_id in notice_ids:
Expand Down
19 changes: 8 additions & 11 deletions dags/pipelines/notice_fetcher_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from datetime import datetime, timedelta
from typing import List

from pymongo import MongoClient

from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \
create_and_store_in_mongo_db_daily_supra_notice


def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
from pymongo import MongoClient
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \
create_and_store_in_mongo_db_daily_supra_notice

date_wild_card = date_wild_card if date_wild_card else (datetime.now() - timedelta(days=1)).strftime("%Y%m%d*")
notice_publication_date = datetime.strptime(date_wild_card, "%Y%m%d*").date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
Expand All @@ -23,4 +21,3 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
notice_fetched_date=notice_publication_date)

return notice_ids

34 changes: 20 additions & 14 deletions dags/pipelines/notice_processor_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,15 @@
from pymongo import MongoClient

from dags.pipelines.pipeline_protocols import NoticePipelineOutput
from ted_sws import config
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker
from ted_sws.notice_packager.services.notice_packager import package_notice
from ted_sws.notice_publisher.services.notice_publisher import publish_notice
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_with_shacl_suite
from ted_sws.notice_validator.services.sparql_test_suite_runner import validate_notice_with_sparql_suite
from ted_sws.notice_validator.services.validation_summary_runner import validation_summary_report_notice
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice


def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient) -> NoticePipelineOutput:
"""
"""
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice

indexed_notice = index_notice(notice=notice)
normalised_notice = normalise_notice(notice=indexed_notice)

Expand All @@ -31,6 +20,13 @@ def notice_transformation_pipeline(notice: Notice, mongodb_client: MongoClient)
"""
"""

from ted_sws import config
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB

mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
if not result:
Expand All @@ -48,6 +44,12 @@ def notice_validation_pipeline(notice: Notice, mongodb_client: MongoClient) -> N
"""
"""
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_with_shacl_suite
from ted_sws.notice_validator.services.sparql_test_suite_runner import validate_notice_with_sparql_suite
from ted_sws.notice_validator.services.validation_summary_runner import validation_summary_report_notice
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB

mapping_suite_id = notice.distilled_rdf_manifestation.mapping_suite_id
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
Expand All @@ -62,6 +64,8 @@ def notice_package_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
"""
"""
from ted_sws.notice_packager.services.notice_packager import package_notice

# TODO: Implement notice package eligiblity
notice.set_is_eligible_for_packaging(eligibility=True)
packaged_notice = package_notice(notice=notice)
Expand All @@ -72,6 +76,8 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
"""
"""
from ted_sws.notice_publisher.services.notice_publisher import publish_notice

notice.set_is_eligible_for_publishing(eligibility=True)
result = publish_notice(notice=notice)
if result:
Expand Down
11 changes: 5 additions & 6 deletions dags/pipelines/notice_selectors_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
from datetime import datetime
from typing import List

from pymongo import MongoClient

from ted_sws import config
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository, NOTICE_TED_ID

NOTICE_STATUS = "status"
FORM_NUMBER = "normalised_metadata.form_number"
Expand All @@ -16,6 +10,7 @@
def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
start_date: str = None, end_date: str = None,
xsd_version: str = None) -> dict:
from datetime import datetime
mongodb_filter = {NOTICE_STATUS: notice_status}
if form_number:
mongodb_filter[FORM_NUMBER] = form_number
Expand All @@ -31,6 +26,10 @@ def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
def notice_ids_selector_by_status(notice_statuses: List[NoticeStatus], form_number: str = None,
start_date: str = None, end_date: str = None,
xsd_version: str = None) -> List[str]:
from pymongo import MongoClient
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository, NOTICE_TED_ID

mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice_ids = []
Expand Down
6 changes: 2 additions & 4 deletions dags/selector_raw_notices_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from airflow.decorators import dag, task
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.operators.DagBatchPipelineOperator import TriggerNoticeBatchPipelineOperator, NOTICE_IDS_KEY, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import TriggerNoticeBatchPipelineOperator, NOTICE_IDS_KEY
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
Expand Down Expand Up @@ -32,9 +31,8 @@ def selector_raw_notices_process_orchestrator():
def select_all_raw_notices():
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM)
notice_ids = notice_ids_selector_by_status(notice_statuses=[NoticeStatus.RAW], start_date=start_date,
end_date=end_date, xsd_version=xsd_version)
end_date=end_date)
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
Expand Down
3 changes: 1 addition & 2 deletions dags/selector_repackage_process_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_process_workflow import NOTICE_PACKAGE_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
Expand Down

0 comments on commit 6d310df

Please sign in to comment.