Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Format airflow-dags example module #99

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/examples/airflow-dags/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import aws_cdk
from aws_cdk import App, CfnOutput

from stack import DagResources

project_name = os.getenv("SEEDFARMER_PROJECT_NAME", "")
Expand Down
3 changes: 0 additions & 3 deletions modules/examples/airflow-dags/coverage.ini

This file was deleted.

7 changes: 2 additions & 5 deletions modules/examples/airflow-dags/dags/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import tarfile

import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score

if __name__ == "__main__":
model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz")
Expand All @@ -30,9 +29,7 @@

print("Classification report:\n{}".format(report_dict))

evaluation_output_path = os.path.join(
"/opt/ml/processing/evaluation", "evaluation.json"
)
evaluation_output_path = os.path.join("/opt/ml/processing/evaluation", "evaluation.json")
print("Saving classification report to {}".format(evaluation_output_path))

with open(evaluation_output_path, "w") as f:
Expand Down
12 changes: 4 additions & 8 deletions modules/examples/airflow-dags/dags/mlops_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import boto3
from airflow import DAG
from airflow.operators.python import PythonOperator
from config import MLOPS_DATA_BUCKET, SAGEMAKER_EXECUTION_ROLE, DAG_EXECUTION_ROLE
from sagemaker.processing import ProcessingInput, ProcessingOutput
from config import DAG_EXECUTION_ROLE, MLOPS_DATA_BUCKET, SAGEMAKER_EXECUTION_ROLE
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.session import Session
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.processing import SKLearnProcessor
Expand All @@ -26,9 +26,7 @@
}
dag = DAG("SciKitLearn_MLOps", default_args=default_args, schedule_interval=None)

pre_processing_input = (
f"s3://sagemaker-sample-data-{os.environ['AWS_REGION']}/processing/census"
)
pre_processing_input = f"s3://sagemaker-sample-data-{os.environ['AWS_REGION']}/processing/census"
test_data_s3_path = f"s3://{MLOPS_DATA_BUCKET}/processing/test"
train_data_s3_path = f"s3://{MLOPS_DATA_BUCKET}/processing/train"
model_path = f"s3://{MLOPS_DATA_BUCKET}/train/models/"
Expand Down Expand Up @@ -134,9 +132,7 @@ def evaluation(model_path): # type: ignore[no-untyped-def]
code=os.path.join(os.path.dirname(__file__), "evaluation.py"),
inputs=[
ProcessingInput(source=model_path, destination="/opt/ml/processing/model"),
ProcessingInput(
source=test_data_s3_path, destination="/opt/ml/processing/test"
),
ProcessingInput(source=test_data_s3_path, destination="/opt/ml/processing/test"),
],
outputs=[
ProcessingOutput(
Expand Down
31 changes: 10 additions & 21 deletions modules/examples/airflow-dags/dags/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
import os
import warnings

import pandas as pd
import numpy as np
import pandas as pd
from sklearn.compose import make_column_transformer
from sklearn.exceptions import DataConversionWarning
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
StandardScaler,
OneHotEncoder,
KBinsDiscretizer,
OneHotEncoder,
StandardScaler,
)
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning

warnings.filterwarnings(action="ignore", category=DataConversionWarning)

Expand Down Expand Up @@ -90,27 +89,17 @@ def print_shape(df): # type: ignore[no-untyped-def]
print("Train data shape after preprocessing: {}".format(train_features.shape))
print("Test data shape after preprocessing: {}".format(test_features.shape))

train_features_output_path = os.path.join(
"/opt/ml/processing/train", "train_features.csv"
)
train_labels_output_path = os.path.join(
"/opt/ml/processing/train", "train_labels.csv"
)
train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv")
train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv")

test_features_output_path = os.path.join(
"/opt/ml/processing/test", "test_features.csv"
)
test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv")
test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv")

print("Saving training features to {}".format(train_features_output_path))
pd.DataFrame(train_features).to_csv(
train_features_output_path, header=False, index=False
)
pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False)

print("Saving test features to {}".format(test_features_output_path))
pd.DataFrame(test_features).to_csv(
test_features_output_path, header=False, index=False
)
pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False)

print("Saving training labels to {}".format(train_labels_output_path))
y_train.to_csv(train_labels_output_path, header=False, index=False)
Expand Down
2 changes: 1 addition & 1 deletion modules/examples/airflow-dags/dags/train.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib
from sklearn.linear_model import LogisticRegression

if __name__ == "__main__":
print("Starting training")
Expand Down
66 changes: 36 additions & 30 deletions modules/examples/airflow-dags/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
[tool.black]
[tool.ruff]
exclude = [
".eggs",
".git",
".hg",
".mypy_cache",
".ruff_cache",
".tox",
".venv",
".env",
"_build",
"buck-out",
"build",
"dist",
"codeseeder.out",
]
line-length = 120
target-version = ["py36", "py37", "py38"]
exclude = '''
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| \.env
| _build
| buck-out
| build
| dist
| codeseeder.out
)/
'''
target-version = "py38"

[tool.isort]
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
ensure_newline_before_comments = true
line_length = 120
src_paths = ["example_dags"]
py_version = 36
skip_gitignore = false
[tool.ruff.lint]
select = ["F", "I", "E", "W"]
fixable = ["ALL"]

[tool.mypy]
python_version = "3.8"
strict = true
ignore_missing_imports = true
disallow_untyped_decorators = false
exclude = "codeseeder.out/|example_dags/"
warn_unused_ignores = false

[tool.pytest.ini_options]
addopts = "-v --cov=. --cov-report term --cov-config=coverage.ini"
addopts = "-v --cov=. --cov-report term"
pythonpath = [
"."
]
]

[tool.coverage.run]
omit = ["tests/*", "dags/*"]

[tool.coverage.report]
fail_under = 80
27 changes: 0 additions & 27 deletions modules/examples/airflow-dags/setup.cfg

This file was deleted.

38 changes: 9 additions & 29 deletions modules/examples/airflow-dags/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from typing import Any, Optional, cast

import aws_cdk.aws_iam as aws_iam
import cdk_nag
import aws_cdk.aws_s3 as aws_s3
from aws_cdk import Aspects, Stack, Tags, RemovalPolicy, Aws
from cdk_nag import NagSuppressions, NagPackSuppression
import cdk_nag
from aws_cdk import Aspects, Aws, RemovalPolicy, Stack, Tags
from cdk_nag import NagPackSuppression, NagSuppressions
from constructs import Construct, IConstruct

_logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,9 +39,7 @@ def __init__(
description="This stack deploys Example DAGs resources for MLOps",
**kwargs,
)
Tags.of(scope=cast(IConstruct, self)).add(
key="Deployment", value=f"mlops-{deployment_name}"
)
Tags.of(scope=cast(IConstruct, self)).add(key="Deployment", value=f"mlops-{deployment_name}")
dep_mod = f"{project_name}-{deployment_name}-{module_name}"
account: str = Aws.ACCOUNT_ID
region: str = Aws.REGION
Expand Down Expand Up @@ -73,11 +71,7 @@ def __init__(
)

managed_policies = (
[
aws_iam.ManagedPolicy.from_managed_policy_arn(
self, "bucket-policy", bucket_policy_arn
)
]
[aws_iam.ManagedPolicy.from_managed_policy_arn(self, "bucket-policy", bucket_policy_arn)]
if bucket_policy_arn
else []
)
Expand All @@ -103,27 +97,15 @@ def __init__(
path="/",
)

dag_role.add_managed_policy(
aws_iam.ManagedPolicy.from_aws_managed_policy_name(
"AmazonSageMakerFullAccess"
)
)
dag_role.add_managed_policy(
aws_iam.ManagedPolicy.from_aws_managed_policy_name(
"CloudWatchLogsFullAccess"
)
)
dag_role.add_managed_policy(aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess"))
dag_role.add_managed_policy(aws_iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchLogsFullAccess"))

# Define the IAM role
sagemaker_execution_role = aws_iam.Role(
self,
"SageMakerExecutionRole",
assumed_by=aws_iam.ServicePrincipal("sagemaker.amazonaws.com"),
managed_policies=[
aws_iam.ManagedPolicy.from_aws_managed_policy_name(
"AmazonSageMakerFullAccess"
)
],
managed_policies=[aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")],
path="/",
role_name=f"SageMakerExecutionRole-{self.stack_name}",
)
Expand All @@ -140,9 +122,7 @@ def __init__(
)

dag_role.add_to_policy(
aws_iam.PolicyStatement(
actions=["iam:PassRole"], resources=[sagemaker_execution_role.role_arn]
)
aws_iam.PolicyStatement(actions=["iam:PassRole"], resources=[sagemaker_execution_role.role_arn])
)

self.dag_role = dag_role
Expand Down
Loading