From 9aa3b4c22cc08b967c739dccc74bfcc646144b2a Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Wed, 29 May 2024 09:57:26 -0500 Subject: [PATCH 1/2] chore: Format airflow-dags examplemodule --- modules/examples/airflow-dags/app.py | 1 + modules/examples/airflow-dags/coverage.ini | 3 - .../examples/airflow-dags/dags/evaluation.py | 7 +- .../examples/airflow-dags/dags/mlops_dag.py | 12 ++-- .../airflow-dags/dags/preprocessing.py | 31 +++------ modules/examples/airflow-dags/dags/train.py | 2 +- modules/examples/airflow-dags/pyproject.toml | 66 ++++++++++--------- modules/examples/airflow-dags/setup.cfg | 27 -------- modules/examples/airflow-dags/stack.py | 38 +++-------- 9 files changed, 63 insertions(+), 124 deletions(-) delete mode 100644 modules/examples/airflow-dags/coverage.ini delete mode 100644 modules/examples/airflow-dags/setup.cfg diff --git a/modules/examples/airflow-dags/app.py b/modules/examples/airflow-dags/app.py index adcdcd57..cf1bc2fe 100644 --- a/modules/examples/airflow-dags/app.py +++ b/modules/examples/airflow-dags/app.py @@ -5,6 +5,7 @@ import aws_cdk from aws_cdk import App, CfnOutput + from stack import DagResources project_name = os.getenv("SEEDFARMER_PROJECT_NAME", "") diff --git a/modules/examples/airflow-dags/coverage.ini b/modules/examples/airflow-dags/coverage.ini deleted file mode 100644 index c3878739..00000000 --- a/modules/examples/airflow-dags/coverage.ini +++ /dev/null @@ -1,3 +0,0 @@ -[run] -omit = - tests/* \ No newline at end of file diff --git a/modules/examples/airflow-dags/dags/evaluation.py b/modules/examples/airflow-dags/dags/evaluation.py index 5cee0f63..6a21b90a 100644 --- a/modules/examples/airflow-dags/dags/evaluation.py +++ b/modules/examples/airflow-dags/dags/evaluation.py @@ -3,9 +3,8 @@ import tarfile import pandas as pd - from sklearn.externals import joblib -from sklearn.metrics import classification_report, roc_auc_score, accuracy_score +from sklearn.metrics import accuracy_score, classification_report, roc_auc_score if __name__ == "__main__": model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz") @@ -30,9 +29,7 @@ print("Classification report:\n{}".format(report_dict)) - evaluation_output_path = os.path.join( - "/opt/ml/processing/evaluation", "evaluation.json" - ) + evaluation_output_path = os.path.join("/opt/ml/processing/evaluation", "evaluation.json") print("Saving classification report to {}".format(evaluation_output_path)) with open(evaluation_output_path, "w") as f: diff --git a/modules/examples/airflow-dags/dags/mlops_dag.py b/modules/examples/airflow-dags/dags/mlops_dag.py index a7867ae7..04786be3 100644 --- a/modules/examples/airflow-dags/dags/mlops_dag.py +++ b/modules/examples/airflow-dags/dags/mlops_dag.py @@ -12,9 +12,9 @@ import boto3 from airflow import DAG from airflow.operators.python import PythonOperator -from config import MLOPS_DATA_BUCKET, SAGEMAKER_EXECUTION_ROLE, DAG_EXECUTION_ROLE -from sagemaker.processing import ProcessingInput, ProcessingOutput +from config import DAG_EXECUTION_ROLE, MLOPS_DATA_BUCKET, SAGEMAKER_EXECUTION_ROLE from sagemaker.inputs import TrainingInput +from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.session import Session from sagemaker.sklearn.estimator import SKLearn from sagemaker.sklearn.processing import SKLearnProcessor @@ -26,9 +26,7 @@ } dag = DAG("SciKitLearn_MLOps", default_args=default_args, schedule_interval=None) -pre_processing_input = ( - f"s3://sagemaker-sample-data-{os.environ['AWS_REGION']}/processing/census" -) +pre_processing_input = f"s3://sagemaker-sample-data-{os.environ['AWS_REGION']}/processing/census" test_data_s3_path = f"s3://{MLOPS_DATA_BUCKET}/processing/test" train_data_s3_path = f"s3://{MLOPS_DATA_BUCKET}/processing/train" model_path = f"s3://{MLOPS_DATA_BUCKET}/train/models/" @@ -134,9 +132,7 @@ def evaluation(model_path): # type: ignore[no-untyped-def] code=os.path.join(os.path.dirname(__file__), "evaluation.py"), inputs=[ ProcessingInput(source=model_path, destination="/opt/ml/processing/model"), - ProcessingInput( - source=test_data_s3_path, destination="/opt/ml/processing/test" - ), + ProcessingInput(source=test_data_s3_path, destination="/opt/ml/processing/test"), ], outputs=[ ProcessingOutput( diff --git a/modules/examples/airflow-dags/dags/preprocessing.py b/modules/examples/airflow-dags/dags/preprocessing.py index 292eb844..d5242413 100644 --- a/modules/examples/airflow-dags/dags/preprocessing.py +++ b/modules/examples/airflow-dags/dags/preprocessing.py @@ -2,17 +2,16 @@ import os import warnings -import pandas as pd import numpy as np +import pandas as pd +from sklearn.compose import make_column_transformer +from sklearn.exceptions import DataConversionWarning from sklearn.model_selection import train_test_split from sklearn.preprocessing import ( - StandardScaler, - OneHotEncoder, KBinsDiscretizer, + OneHotEncoder, + StandardScaler, ) -from sklearn.compose import make_column_transformer - -from sklearn.exceptions import DataConversionWarning warnings.filterwarnings(action="ignore", category=DataConversionWarning) @@ -90,27 +89,17 @@ def print_shape(df): # type: ignore[no-untyped-def] print("Train data shape after preprocessing: {}".format(train_features.shape)) print("Test data shape after preprocessing: {}".format(test_features.shape)) - train_features_output_path = os.path.join( - "/opt/ml/processing/train", "train_features.csv" - ) - train_labels_output_path = os.path.join( - "/opt/ml/processing/train", "train_labels.csv" - ) + train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv") + train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv") - test_features_output_path = os.path.join( - "/opt/ml/processing/test", "test_features.csv" - ) + test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv") test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv") print("Saving training features to {}".format(train_features_output_path)) - pd.DataFrame(train_features).to_csv( - train_features_output_path, header=False, index=False - ) + pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False) print("Saving test features to {}".format(test_features_output_path)) - pd.DataFrame(test_features).to_csv( - test_features_output_path, header=False, index=False - ) + pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False) print("Saving training labels to {}".format(train_labels_output_path)) y_train.to_csv(train_labels_output_path, header=False, index=False) diff --git a/modules/examples/airflow-dags/dags/train.py b/modules/examples/airflow-dags/dags/train.py index 2870a9c1..a34cbd23 100644 --- a/modules/examples/airflow-dags/dags/train.py +++ b/modules/examples/airflow-dags/dags/train.py @@ -1,8 +1,8 @@ import os import pandas as pd -from sklearn.linear_model import LogisticRegression from sklearn.externals import joblib +from sklearn.linear_model import LogisticRegression if __name__ == "__main__": print("Starting training") diff --git a/modules/examples/airflow-dags/pyproject.toml b/modules/examples/airflow-dags/pyproject.toml index d21eb12f..03dee6eb 100644 --- a/modules/examples/airflow-dags/pyproject.toml +++ b/modules/examples/airflow-dags/pyproject.toml @@ -1,36 +1,42 @@ -[tool.black] +[tool.ruff] +exclude = [ + ".eggs", + ".git", + ".hg", + ".mypy_cache", + ".ruff_cache", + ".tox", + ".venv", + ".env", + "_build", + "buck-out", + "build", + "dist", + "codeseeder.out", +] line-length = 120 -target-version = ["py36", "py37", "py38"] -exclude = ''' -/( - \.eggs - | \.git - | \.hg - | \.mypy_cache - | \.tox - | \.venv - | \.env - | _build - | buck-out - | build - | dist - | codeseeder.out -)/ -''' +target-version = "py38" -[tool.isort] -multi_line_output = 3 -include_trailing_comma = true -force_grid_wrap = 0 -use_parentheses = true -ensure_newline_before_comments = true -line_length = 120 -src_paths = ["example_dags"] -py_version = 36 -skip_gitignore = false +[tool.ruff.lint] +select = ["F", "I", "E", "W"] +fixable = ["ALL"] + +[tool.mypy] +python_version = "3.8" +strict = true +ignore_missing_imports = true +disallow_untyped_decorators = false +exclude = "codeseeder.out/|example_dags/" +warn_unused_ignores = false [tool.pytest.ini_options] -addopts = "-v --cov=. --cov-report term --cov-config=coverage.ini" +addopts = "-v --cov=. --cov-report term" pythonpath = [ "." -] \ No newline at end of file +] + +[tool.coverage.run] +omit = ["tests/*"] + +[tool.coverage.report] +fail_under = 80 diff --git a/modules/examples/airflow-dags/setup.cfg b/modules/examples/airflow-dags/setup.cfg deleted file mode 100644 index 6f1278ad..00000000 --- a/modules/examples/airflow-dags/setup.cfg +++ /dev/null @@ -1,27 +0,0 @@ -[metadata] -license_files = - LICENSE - NOTICE - VERSION - -[flake8] -max-line-length = 120 -extend-ignore = E203, W503 -exclude = - .git, - __pycache__, - docs/source/conf.py, - old, - build, - dist, - .venv, - codeseeder.out, - bundle - -[mypy] -python_version = 3.7 -strict = True -ignore_missing_imports = True -allow_untyped_decorators = True -exclude = - codeseeder.out/|example_dags/ diff --git a/modules/examples/airflow-dags/stack.py b/modules/examples/airflow-dags/stack.py index 13e99dfa..a766ed7d 100755 --- a/modules/examples/airflow-dags/stack.py +++ b/modules/examples/airflow-dags/stack.py @@ -5,10 +5,10 @@ from typing import Any, Optional, cast import aws_cdk.aws_iam as aws_iam -import cdk_nag import aws_cdk.aws_s3 as aws_s3 -from aws_cdk import Aspects, Stack, Tags, RemovalPolicy, Aws -from cdk_nag import NagSuppressions, NagPackSuppression +import cdk_nag +from aws_cdk import Aspects, Aws, RemovalPolicy, Stack, Tags +from cdk_nag import NagPackSuppression, NagSuppressions from constructs import Construct, IConstruct _logger: logging.Logger = logging.getLogger(__name__) @@ -39,9 +39,7 @@ def __init__( description="This stack deploys Example DAGs resources for MLOps", **kwargs, ) - Tags.of(scope=cast(IConstruct, self)).add( - key="Deployment", value=f"mlops-{deployment_name}" - ) + Tags.of(scope=cast(IConstruct, self)).add(key="Deployment", value=f"mlops-{deployment_name}") dep_mod = f"{project_name}-{deployment_name}-{module_name}" account: str = Aws.ACCOUNT_ID region: str = Aws.REGION @@ -73,11 +71,7 @@ def __init__( ) managed_policies = ( - [ - aws_iam.ManagedPolicy.from_managed_policy_arn( - self, "bucket-policy", bucket_policy_arn - ) - ] + [aws_iam.ManagedPolicy.from_managed_policy_arn(self, "bucket-policy", bucket_policy_arn)] if bucket_policy_arn else [] ) @@ -103,27 +97,15 @@ def __init__( path="/", ) - dag_role.add_managed_policy( - aws_iam.ManagedPolicy.from_aws_managed_policy_name( - "AmazonSageMakerFullAccess" - ) - ) - dag_role.add_managed_policy( - aws_iam.ManagedPolicy.from_aws_managed_policy_name( - "CloudWatchLogsFullAccess" - ) - ) + dag_role.add_managed_policy(aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")) + dag_role.add_managed_policy(aws_iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchLogsFullAccess")) # Define the IAM role sagemaker_execution_role = aws_iam.Role( self, "SageMakerExecutionRole", assumed_by=aws_iam.ServicePrincipal("sagemaker.amazonaws.com"), - managed_policies=[ - aws_iam.ManagedPolicy.from_aws_managed_policy_name( - "AmazonSageMakerFullAccess" - ) - ], + managed_policies=[aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")], path="/", role_name=f"SageMakerExecutionRole-{self.stack_name}", ) @@ -140,9 +122,7 @@ def __init__( ) dag_role.add_to_policy( - aws_iam.PolicyStatement( - actions=["iam:PassRole"], resources=[sagemaker_execution_role.role_arn] - ) + aws_iam.PolicyStatement(actions=["iam:PassRole"], resources=[sagemaker_execution_role.role_arn]) ) self.dag_role = dag_role From 633dc69c95ca354a8203b5c1241ad25cb82b1804 Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Wed, 29 May 2024 10:25:19 -0500 Subject: [PATCH 2/2] fix coverage --- modules/examples/airflow-dags/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/examples/airflow-dags/pyproject.toml b/modules/examples/airflow-dags/pyproject.toml index 03dee6eb..ccb3cf25 100644 --- a/modules/examples/airflow-dags/pyproject.toml +++ b/modules/examples/airflow-dags/pyproject.toml @@ -36,7 +36,7 @@ pythonpath = [ ] [tool.coverage.run] -omit = ["tests/*"] +omit = ["tests/*", "dags/*"] [tool.coverage.report] fail_under = 80