Skip to content

Commit

Permalink
chore: Format airflow-dags example module (#99)
Browse files Browse the repository at this point in the history
* chore: Format airflow-dags examplemodule

* fix coverage
  • Loading branch information
LeonLuttenberger authored May 29, 2024
1 parent 89e277a commit ac4c715
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 124 deletions.
1 change: 1 addition & 0 deletions modules/examples/airflow-dags/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import aws_cdk
from aws_cdk import App, CfnOutput

from stack import DagResources

project_name = os.getenv("SEEDFARMER_PROJECT_NAME", "")
Expand Down
3 changes: 0 additions & 3 deletions modules/examples/airflow-dags/coverage.ini

This file was deleted.

7 changes: 2 additions & 5 deletions modules/examples/airflow-dags/dags/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import tarfile

import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score

if __name__ == "__main__":
model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz")
Expand All @@ -30,9 +29,7 @@

print("Classification report:\n{}".format(report_dict))

evaluation_output_path = os.path.join(
"/opt/ml/processing/evaluation", "evaluation.json"
)
evaluation_output_path = os.path.join("/opt/ml/processing/evaluation", "evaluation.json")
print("Saving classification report to {}".format(evaluation_output_path))

with open(evaluation_output_path, "w") as f:
Expand Down
12 changes: 4 additions & 8 deletions modules/examples/airflow-dags/dags/mlops_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import boto3
from airflow import DAG
from airflow.operators.python import PythonOperator
from config import MLOPS_DATA_BUCKET, SAGEMAKER_EXECUTION_ROLE, DAG_EXECUTION_ROLE
from sagemaker.processing import ProcessingInput, ProcessingOutput
from config import DAG_EXECUTION_ROLE, MLOPS_DATA_BUCKET, SAGEMAKER_EXECUTION_ROLE
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.session import Session
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.processing import SKLearnProcessor
Expand All @@ -26,9 +26,7 @@
}
dag = DAG("SciKitLearn_MLOps", default_args=default_args, schedule_interval=None)

pre_processing_input = (
f"s3://sagemaker-sample-data-{os.environ['AWS_REGION']}/processing/census"
)
pre_processing_input = f"s3://sagemaker-sample-data-{os.environ['AWS_REGION']}/processing/census"
test_data_s3_path = f"s3://{MLOPS_DATA_BUCKET}/processing/test"
train_data_s3_path = f"s3://{MLOPS_DATA_BUCKET}/processing/train"
model_path = f"s3://{MLOPS_DATA_BUCKET}/train/models/"
Expand Down Expand Up @@ -134,9 +132,7 @@ def evaluation(model_path): # type: ignore[no-untyped-def]
code=os.path.join(os.path.dirname(__file__), "evaluation.py"),
inputs=[
ProcessingInput(source=model_path, destination="/opt/ml/processing/model"),
ProcessingInput(
source=test_data_s3_path, destination="/opt/ml/processing/test"
),
ProcessingInput(source=test_data_s3_path, destination="/opt/ml/processing/test"),
],
outputs=[
ProcessingOutput(
Expand Down
31 changes: 10 additions & 21 deletions modules/examples/airflow-dags/dags/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
import os
import warnings

import pandas as pd
import numpy as np
import pandas as pd
from sklearn.compose import make_column_transformer
from sklearn.exceptions import DataConversionWarning
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
StandardScaler,
OneHotEncoder,
KBinsDiscretizer,
OneHotEncoder,
StandardScaler,
)
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning

warnings.filterwarnings(action="ignore", category=DataConversionWarning)

Expand Down Expand Up @@ -90,27 +89,17 @@ def print_shape(df): # type: ignore[no-untyped-def]
print("Train data shape after preprocessing: {}".format(train_features.shape))
print("Test data shape after preprocessing: {}".format(test_features.shape))

train_features_output_path = os.path.join(
"/opt/ml/processing/train", "train_features.csv"
)
train_labels_output_path = os.path.join(
"/opt/ml/processing/train", "train_labels.csv"
)
train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv")
train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv")

test_features_output_path = os.path.join(
"/opt/ml/processing/test", "test_features.csv"
)
test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv")
test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv")

print("Saving training features to {}".format(train_features_output_path))
pd.DataFrame(train_features).to_csv(
train_features_output_path, header=False, index=False
)
pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False)

print("Saving test features to {}".format(test_features_output_path))
pd.DataFrame(test_features).to_csv(
test_features_output_path, header=False, index=False
)
pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False)

print("Saving training labels to {}".format(train_labels_output_path))
y_train.to_csv(train_labels_output_path, header=False, index=False)
Expand Down
2 changes: 1 addition & 1 deletion modules/examples/airflow-dags/dags/train.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib
from sklearn.linear_model import LogisticRegression

if __name__ == "__main__":
print("Starting training")
Expand Down
66 changes: 36 additions & 30 deletions modules/examples/airflow-dags/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
[tool.black]
[tool.ruff]
exclude = [
".eggs",
".git",
".hg",
".mypy_cache",
".ruff_cache",
".tox",
".venv",
".env",
"_build",
"buck-out",
"build",
"dist",
"codeseeder.out",
]
line-length = 120
target-version = ["py36", "py37", "py38"]
exclude = '''
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| \.env
| _build
| buck-out
| build
| dist
| codeseeder.out
)/
'''
target-version = "py38"

[tool.isort]
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
ensure_newline_before_comments = true
line_length = 120
src_paths = ["example_dags"]
py_version = 36
skip_gitignore = false
[tool.ruff.lint]
select = ["F", "I", "E", "W"]
fixable = ["ALL"]

[tool.mypy]
python_version = "3.8"
strict = true
ignore_missing_imports = true
disallow_untyped_decorators = false
exclude = "codeseeder.out/|example_dags/"
warn_unused_ignores = false

[tool.pytest.ini_options]
addopts = "-v --cov=. --cov-report term --cov-config=coverage.ini"
addopts = "-v --cov=. --cov-report term"
pythonpath = [
"."
]
]

[tool.coverage.run]
omit = ["tests/*", "dags/*"]

[tool.coverage.report]
fail_under = 80
27 changes: 0 additions & 27 deletions modules/examples/airflow-dags/setup.cfg

This file was deleted.

38 changes: 9 additions & 29 deletions modules/examples/airflow-dags/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from typing import Any, Optional, cast

import aws_cdk.aws_iam as aws_iam
import cdk_nag
import aws_cdk.aws_s3 as aws_s3
from aws_cdk import Aspects, Stack, Tags, RemovalPolicy, Aws
from cdk_nag import NagSuppressions, NagPackSuppression
import cdk_nag
from aws_cdk import Aspects, Aws, RemovalPolicy, Stack, Tags
from cdk_nag import NagPackSuppression, NagSuppressions
from constructs import Construct, IConstruct

_logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,9 +39,7 @@ def __init__(
description="This stack deploys Example DAGs resources for MLOps",
**kwargs,
)
Tags.of(scope=cast(IConstruct, self)).add(
key="Deployment", value=f"mlops-{deployment_name}"
)
Tags.of(scope=cast(IConstruct, self)).add(key="Deployment", value=f"mlops-{deployment_name}")
dep_mod = f"{project_name}-{deployment_name}-{module_name}"
account: str = Aws.ACCOUNT_ID
region: str = Aws.REGION
Expand Down Expand Up @@ -73,11 +71,7 @@ def __init__(
)

managed_policies = (
[
aws_iam.ManagedPolicy.from_managed_policy_arn(
self, "bucket-policy", bucket_policy_arn
)
]
[aws_iam.ManagedPolicy.from_managed_policy_arn(self, "bucket-policy", bucket_policy_arn)]
if bucket_policy_arn
else []
)
Expand All @@ -103,27 +97,15 @@ def __init__(
path="/",
)

dag_role.add_managed_policy(
aws_iam.ManagedPolicy.from_aws_managed_policy_name(
"AmazonSageMakerFullAccess"
)
)
dag_role.add_managed_policy(
aws_iam.ManagedPolicy.from_aws_managed_policy_name(
"CloudWatchLogsFullAccess"
)
)
dag_role.add_managed_policy(aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess"))
dag_role.add_managed_policy(aws_iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchLogsFullAccess"))

# Define the IAM role
sagemaker_execution_role = aws_iam.Role(
self,
"SageMakerExecutionRole",
assumed_by=aws_iam.ServicePrincipal("sagemaker.amazonaws.com"),
managed_policies=[
aws_iam.ManagedPolicy.from_aws_managed_policy_name(
"AmazonSageMakerFullAccess"
)
],
managed_policies=[aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")],
path="/",
role_name=f"SageMakerExecutionRole-{self.stack_name}",
)
Expand All @@ -140,9 +122,7 @@ def __init__(
)

dag_role.add_to_policy(
aws_iam.PolicyStatement(
actions=["iam:PassRole"], resources=[sagemaker_execution_role.role_arn]
)
aws_iam.PolicyStatement(actions=["iam:PassRole"], resources=[sagemaker_execution_role.role_arn])
)

self.dag_role = dag_role
Expand Down

0 comments on commit ac4c715

Please sign in to comment.