Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor airflow-dags module to use Pydantic #127

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- remove explicit module manifest account/region mappings from `fmops-qna-rag`
- changed ECR encryption to KMS_MANAGED
- changed encryption for each bucket to KMS_MANAGED
- refactor `airflow-dags` module to use Pydantic

## v1.2.0

Expand Down
52 changes: 30 additions & 22 deletions modules/examples/airflow-dags/app.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,38 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import os

import aws_cdk
from aws_cdk import App, CfnOutput
import cdk_nag
from aws_cdk import App
from pydantic import ValidationError

from settings import ApplicationSettings
from stack import DagResources

project_name = os.getenv("SEEDFARMER_PROJECT_NAME", "")
deployment_name = os.getenv("SEEDFARMER_DEPLOYMENT_NAME", "")
module_name = os.getenv("SEEDFARMER_MODULE_NAME", "")
app_prefix = f"{project_name}-{deployment_name}-{module_name}"

mwaa_exec_role = os.getenv("SEEDFARMER_PARAMETER_MWAA_EXEC_ROLE_ARN", "")
bucket_policy_arn = os.getenv("SEEDFARMER_PARAMETER_BUCKET_POLICY_ARN")
permission_boundary_arn = os.getenv("SEEDFARMER_PERMISSION_BOUNDARY_ARN")

app = App()

try:
app_settings = ApplicationSettings()
except ValidationError as e:
print(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the error output easier to read in my opinion. Any parameter validation error will be in the standard output rather than buried in the stack trace.

raise e

stack = DagResources(
scope=app,
id=app_prefix,
project_name=project_name,
deployment_name=deployment_name,
module_name=module_name,
mwaa_exec_role=mwaa_exec_role,
bucket_policy_arn=bucket_policy_arn,
permission_boundary_arn=permission_boundary_arn,
id=app_settings.seedfarmer_settings.app_prefix,
project_name=app_settings.seedfarmer_settings.project_name,
deployment_name=app_settings.seedfarmer_settings.deployment_name,
module_name=app_settings.seedfarmer_settings.module_name,
mwaa_exec_role=app_settings.module_settings.mwaa_exec_role_arn,
bucket_policy_arn=app_settings.module_settings.bucket_policy_arn,
permission_boundary_arn=app_settings.module_settings.permission_boundary_arn,
env=aws_cdk.Environment(
account=os.environ["CDK_DEFAULT_ACCOUNT"],
region=os.environ["CDK_DEFAULT_REGION"],
account=app_settings.cdk_settings.account,
region=app_settings.cdk_settings.region,
),
)

CfnOutput(
aws_cdk.CfnOutput(
scope=stack,
id="metadata",
value=stack.to_json_string(
Expand All @@ -46,4 +44,14 @@
),
)

aws_cdk.Aspects.of(stack).add(cdk_nag.AwsSolutionsChecks(log_ignores=True))

if app_settings.module_settings.tags:
for tag_key, tag_value in app_settings.module_settings.tags.items():
aws_cdk.Tags.of(app).add(tag_key, tag_value)

aws_cdk.Tags.of(app).add("SeedFarmerDeploymentName", app_settings.seedfarmer_settings.deployment_name)
aws_cdk.Tags.of(app).add("SeedFarmerModuleName", app_settings.seedfarmer_settings.module_name)
aws_cdk.Tags.of(app).add("SeedFarmerProjectName", app_settings.seedfarmer_settings.project_name)

app.synth(force=True)
10 changes: 5 additions & 5 deletions modules/examples/airflow-dags/deployspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ deploy:
- >
echo "SEEDFARMER_PARAMETER_MWAA_EXEC_ROLE_ARN: ${SEEDFARMER_PARAMETER_MWAA_EXEC_ROLE_ARN}"
- cdk deploy --require-approval never --progress events --app "python app.py" --outputs-file ./cdk-exports.json
- export SEEDFARMER_MODULE_METADATA=$(python -c "import json; file=open('cdk-exports.json'); print(json.load(file)['${SEEDFARMER_PROJECT_NAME}-${SEEDFARMER_DEPLOYMENT_NAME}-${SEEDFARMER_MODULE_NAME}']['metadata'])")
- export MLOPS_BUCKET=$(echo ${SEEDFARMER_MODULE_METADATA} | jq -r ".MlOpsBucket")
- export DAG_IAM_ROLE=$(echo ${SEEDFARMER_MODULE_METADATA} | jq -r ".DagRoleArn")
- export SAGEMAKER_IAM_ROLE=$(echo ${SEEDFARMER_MODULE_METADATA} | jq -r ".SageMakerExecutionRole")
- seedfarmer metadata convert -f cdk-exports.json || true
- export MLOPS_BUCKET=$(cat SEEDFARMER_MODULE_METADATA | jq -r ".MlOpsBucket")
- export DAG_IAM_ROLE=$(cat SEEDFARMER_MODULE_METADATA | jq -r ".DagRoleArn")
- export SAGEMAKER_IAM_ROLE=$(cat SEEDFARMER_MODULE_METADATA | jq -r ".SageMakerExecutionRole")
- >
echo "MLOPS_BUCKET: ${MLOPS_BUCKET}"
- >
Expand All @@ -42,4 +42,4 @@ destroy:
- export MLOPS_BUCKET=$(echo ${SEEDFARMER_MODULE_METADATA} | jq -r ".MlOpsBucket")
- aws s3 rm --recursive s3://$SEEDFARMER_PARAMETER_DAG_BUCKET_NAME/$SEEDFARMER_PARAMETER_DAG_PATH/mlops/
- aws s3 rm --recursive s3://$MLOPS_BUCKET/
- cdk destroy --force --app "python app.py"
- cdk destroy --force --app "python app.py"
27 changes: 10 additions & 17 deletions modules/examples/airflow-dags/modulestack.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
AWSTemplateFormatVersion: 2010-09-09
AWSTemplateFormatVersion: "2010-09-09"
Description: This stack deploys a Module specific IAM permissions

Parameters:
# DeploymentName:
# Type: String
# Description: The name of the deployment
# ModuleName:
# Type: String
# Description: The name of the Module
RoleName:
Type: String
Description: The name of the IAM Role
Expand All @@ -20,20 +14,19 @@ Parameters:

Resources:
Policy:
Type: 'AWS::IAM::Policy'
Type: "AWS::IAM::Policy"
Properties:
PolicyName: "modulespecific-policy"
Roles:
- !Ref RoleName
PolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- 's3:Create*'
- 's3:Put*'
- 's3:Delete*'
- 's3:Get*'
- 's3:List*'
- "s3:Put*"
- "s3:Delete*"
- "s3:Get*"
- "s3:List*"
Effect: Allow
Resource:
- 'arn:aws:s3:::cdk*'
- !Sub "arn:aws:s3:::${DagBucketName}/${DagPath}/*"
Version: 2012-10-17
PolicyName: "mlops-modulespecific-policy"
Roles: [!Ref RoleName]
7 changes: 5 additions & 2 deletions modules/examples/airflow-dags/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ fixable = ["ALL"]
[tool.mypy]
python_version = "3.8"
strict = true
ignore_missing_imports = true
disallow_untyped_decorators = false
exclude = "codeseeder.out/|example_dags/"
exclude = "codeseeder.out/|dags/"
warn_unused_ignores = false

plugins = [
"pydantic.mypy"
]

[tool.pytest.ini_options]
addopts = "-v --cov=. --cov-report term"
pythonpath = [
Expand Down
8 changes: 5 additions & 3 deletions modules/examples/airflow-dags/requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
aws-cdk-lib==2.137.0
cdk-nag==2.28.89
aws-cdk-lib~=2.137.0
cdk-nag~=2.28.89
boto3~=1.34.84
attrs==23.2.0
attrs~=23.2.0
pydantic~=2.7.4
pydantic-settings~=2.3.3
23 changes: 17 additions & 6 deletions modules/examples/airflow-dags/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --output-file=requirements.txt requirements.in
#
annotated-types==0.7.0
# via pydantic
attrs==23.2.0
# via
# -r requirements.in
Expand Down Expand Up @@ -33,8 +35,6 @@ constructs==10.1.37
# via
# aws-cdk-lib
# cdk-nag
exceptiongroup==1.2.0
# via cattrs
importlib-resources==6.4.0
# via jsii
jmespath==1.0.0
Expand All @@ -58,10 +58,20 @@ publication==0.0.3
# cdk-nag
# constructs
# jsii
pydantic==2.7.4
# via
# -r requirements.in
# pydantic-settings
pydantic-core==2.18.4
# via pydantic
pydantic-settings==2.3.3
# via -r requirements.in
python-dateutil==2.8.2
# via
# botocore
# jsii
python-dotenv==1.0.1
# via pydantic-settings
s3transfer==0.10.1
# via boto3
six==1.16.0
Expand All @@ -75,8 +85,9 @@ typeguard==2.13.3
# cdk-nag
# jsii
typing-extensions==4.7.1
# via jsii
# via
# jsii
# pydantic
# pydantic-core
urllib3==1.26.18
# via botocore
zipp==3.18.1
# via importlib-resources
74 changes: 74 additions & 0 deletions modules/examples/airflow-dags/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Defines the stack settings."""

from abc import ABC
from typing import Dict, Optional

from pydantic import Field, computed_field
from pydantic_settings import BaseSettings, SettingsConfigDict


class CdkBaseSettings(BaseSettings, ABC):
"""Defines common configuration for settings."""

model_config = SettingsConfigDict(
case_sensitive=False,
env_nested_delimiter="__",
protected_namespaces=(),
extra="ignore",
populate_by_name=True,
)


class ModuleSettings(CdkBaseSettings):
"""Seedfarmer Parameters.

These parameters are required for the module stack.
"""

model_config = SettingsConfigDict(env_prefix="SEEDFARMER_PARAMETER_")

mwaa_exec_role_arn: str
bucket_policy_arn: Optional[str] = Field(default=None)
permission_boundary_arn: Optional[str] = Field(default=None)

tags: Optional[Dict[str, str]] = Field(default=None)


class SeedFarmerSettings(CdkBaseSettings):
"""Seedfarmer Settings.

These parameters comes from seedfarmer by default.
"""

model_config = SettingsConfigDict(env_prefix="SEEDFARMER_")

project_name: str = Field(default="")
deployment_name: str = Field(default="")
module_name: str = Field(default="")

@computed_field # type: ignore
@property
def app_prefix(self) -> str:
"""Application prefix."""
prefix = "-".join([self.project_name, self.deployment_name, self.module_name])
return prefix


class CDKSettings(CdkBaseSettings):
"""CDK Default Settings.

These parameters comes from AWS CDK by default.
"""

model_config = SettingsConfigDict(env_prefix="CDK_DEFAULT_")

account: str
region: str


class ApplicationSettings(CdkBaseSettings):
"""Application settings."""

seedfarmer_settings: SeedFarmerSettings = Field(default_factory=SeedFarmerSettings)
module_settings: ModuleSettings = Field(default_factory=ModuleSettings)
cdk_settings: CDKSettings = Field(default_factory=CDKSettings)
28 changes: 7 additions & 21 deletions modules/examples/airflow-dags/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
# SPDX-License-Identifier: Apache-2.0

import logging
from typing import Any, Optional, cast
from typing import Any, Optional

import aws_cdk.aws_iam as aws_iam
import aws_cdk.aws_s3 as aws_s3
import cdk_nag
from aws_cdk import Aspects, Aws, RemovalPolicy, Stack, Tags
from aws_cdk import Aws, RemovalPolicy, Stack
from cdk_nag import NagPackSuppression, NagSuppressions
from constructs import Construct, IConstruct
from constructs import Construct

_logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -39,7 +38,6 @@ def __init__(
description="This stack deploys Example DAGs resources for MLOps",
**kwargs,
)
Tags.of(scope=cast(IConstruct, self)).add(key="Deployment", value=f"mlops-{deployment_name}")
dep_mod = f"{project_name}-{deployment_name}-{module_name}"
account: str = Aws.ACCOUNT_ID
region: str = Aws.REGION
Expand All @@ -56,6 +54,7 @@ def __init__(
)

self.mlops_assets_bucket = mlops_assets_bucket

# Create Dag IAM Role and policy
dag_statement = aws_iam.PolicyDocument(
statements=[
Expand Down Expand Up @@ -110,26 +109,13 @@ def __init__(
role_name=f"SageMakerExecutionRole-{self.stack_name}",
)

# Add policy to allow access to S3 bucket
sagemaker_execution_role.add_to_policy(
aws_iam.PolicyStatement(
actions=["s3:*"],
resources=[
mlops_assets_bucket.bucket_arn,
f"{mlops_assets_bucket.bucket_arn}/*",
],
)
)

dag_role.add_to_policy(
aws_iam.PolicyStatement(actions=["iam:PassRole"], resources=[sagemaker_execution_role.role_arn])
)
# Add policy to allow access to S3 bucket and IAM pass role
mlops_assets_bucket.grant_read_write(sagemaker_execution_role)
sagemaker_execution_role.grant_pass_role(dag_role)

self.dag_role = dag_role
self.sagemaker_execution_role = sagemaker_execution_role

Aspects.of(self).add(cdk_nag.AwsSolutionsChecks())

NagSuppressions.add_resource_suppressions(
self,
apply_to_children=True,
Expand Down
Loading