Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EM: Edit step function to include athena log validation #6604

Merged
merged 49 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
f4c687d
add new step function config
matt-heery Jun 14, 2024
f564e5f
finalising the step func
matt-heery Jun 14, 2024
c7d8a70
correct structure
matt-heery Jun 14, 2024
3a0e48e
comma!
matt-heery Jun 14, 2024
df50d05
jsonencoding
matt-heery Jun 14, 2024
0676ad9
removing constantly repeating tf
matt-heery Jun 14, 2024
7fceb4c
small edits to get athena working
matt-heery Jun 14, 2024
d288257
strings
matt-heery Jun 14, 2024
5e75509
iam update and context
matt-heery Jun 14, 2024
ce6b237
new lambda to transform output
matt-heery Jun 14, 2024
5e7c2cb
s3 bucket perms
matt-heery Jun 14, 2024
d0f670d
partitions
matt-heery Jun 14, 2024
f6270e4
remove result set
matt-heery Jun 14, 2024
d9751c5
id to arn
matt-heery Jun 14, 2024
8c2b276
small changes
matt-heery Jun 14, 2024
336e823
logging
matt-heery Jun 14, 2024
572141f
copy paste
matt-heery Jun 14, 2024
b014594
string
matt-heery Jun 14, 2024
8bec685
adding more falses
matt-heery Jun 14, 2024
acfe00c
update step func
matt-heery Jun 17, 2024
660e8b5
updating bucket version
matt-heery Jun 17, 2024
056e0e7
test in dev
matt-heery Jun 17, 2024
a053698
CI/CD database naming
matt-heery Jun 17, 2024
7cde83d
selector for dev
matt-heery Jun 17, 2024
8976449
changing glue job to include date
matt-heery Jun 17, 2024
829da47
adding new col in glue job
matt-heery Jun 18, 2024
ff0347b
glue fix
matt-heery Jun 18, 2024
314cd08
update step func
matt-heery Jun 18, 2024
1b4457f
add iam perms
matt-heery Jun 18, 2024
3143455
making pandas available
matt-heery Jun 18, 2024
57b3f92
containerise
matt-heery Jun 18, 2024
48987d4
make this as straightforward as possible
matt-heery Jun 18, 2024
0c38031
arn64
matt-heery Jun 18, 2024
6ddcfd6
removing timeout and handler
matt-heery Jun 18, 2024
270a5d0
iam update
matt-heery Jun 18, 2024
ccf55d1
arnarnarn
matt-heery Jun 18, 2024
fe26fe4
minor updates
matt-heery Jun 18, 2024
af264fa
small tweaks to lambda
matt-heery Jun 18, 2024
8c214dc
table to ap??
matt-heery Jun 18, 2024
b6f49b4
to to to t
matt-heery Jun 18, 2024
b458807
removing commented lines and adding locals
matt-heery Jun 18, 2024
0b90b4a
Merge branch 'main' into edit-step-function-athena-validation
matt-heery Jun 18, 2024
c49e27b
adding madhu old change after i removed
matt-heery Jun 19, 2024
53b7890
small tweak to glue job
matt-heery Jun 19, 2024
e6af545
minor updates to PR
matt-heery Jun 19, 2024
e2877f8
lambdas update
matt-heery Jun 19, 2024
e306d31
Merge branch 'main' into edit-step-function-athena-validation
matt-heery Jun 19, 2024
2058cf2
update glue job w date null value
matt-heery Jun 19, 2024
9e11330
remove csv value
matt-heery Jun 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
module "dms_task" {
source = "./modules/dms"

for_each = toset(var.database_list)
for_each = toset(local.is-production? [
"g4s_cap_dw",
"g4s_emsys_mvp"
] : ["test"])

database_name = each.key

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"rules": [
{
"rule-type": "selection",
"rule-id": "01",
"rule-name": "all",
"object-locator": {
"schema-name": "%",
"table-name": "%"
},
"rule-action": "include",
"filters": [],
"parallel-load": null,
"isAutoSegmentationChecked": false
}
]
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
# This variable needs to be supplied with the list of database names to be migrated
variable "database_list" {
type = list(string)
# cap_dw
default = [
"g4s_cap_dw",
"g4s_emsys_mvp"
]
}

variable "dms_replication_instance_class" {
description = "Name of the replication instance class to be used"
type = string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module "ecr_lambda_repo" {
source = "./modules/ecr"
ecr_name = "lambdas/update_log_table"
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def has_query_succeeded(execution_id):
CREATE EXTERNAL TABLE IF NOT EXISTS `{GLUE_CATALOG_DB_NAME}`.`{GLUE_CATALOG_TBL_NAME}`(
`run_datetime` timestamp,
`json_row` string,
`validation_msg` string)
`validation_msg` string,
`table_to_ap` string)
PARTITIONED BY (
`database_name` string,
`full_table_name` string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def resolve_args(args_list):

NVL_DTYPE_DICT = {'string': "''", 'int': 0, 'double': 0, 'float': 0, 'smallint': 0, 'bigint':0,
'boolean': False,
'timestamp': "to_timestamp('1900-01-01', 'yyyy-MM-dd')"}
'timestamp': "to_timestamp('1900-01-01', 'yyyy-MM-dd')", 'date': "to_date('1900-01-01', 'yyyy-MM-dd')"}

# ===============================================================================
# USER-DEFINED-FUNCTIONS
Expand Down Expand Up @@ -311,7 +311,8 @@ def process_dv_for_table(rds_db_name, rds_tbl_name, total_files, total_size_mb,
cast(null as string) as json_row,
cast(null as string) as validation_msg,
cast(null as string) as database_name,
cast(null as string) as full_table_name
cast(null as string) as full_table_name,
cast(null as string) as table_to_ap
""".strip()

df_dv_output = spark.sql(sql_select_str).repartition(input_repartition_factor)
Expand Down Expand Up @@ -381,7 +382,8 @@ def process_dv_for_table(rds_db_name, rds_tbl_name, total_files, total_size_mb,
"'' as json_row",
f"""'{rds_tbl_name} - Validated.{additional_message}' as validation_msg""",
f"""'{rds_db_name}' as database_name""",
f"""'{db_sch_tbl}' as full_table_name"""
f"""'{db_sch_tbl}' as full_table_name""",
"""'False' as table_to_ap"""
)
LOGGER.info(f"Validation Successful - 1")
df_dv_output = df_dv_output.union(df_temp)
Expand All @@ -396,7 +398,8 @@ def process_dv_for_table(rds_db_name, rds_tbl_name, total_files, total_size_mb,
"json_row",
validation_msg,
f"""'{rds_db_name}' as database_name""",
f"""'{db_sch_tbl}' as full_table_name"""
f"""'{db_sch_tbl}' as full_table_name""",
"""'False' as table_to_ap"""
)
LOGGER.warn(f"Validation Failed - 2")
df_dv_output = df_dv_output.union(df_temp)
Expand All @@ -408,7 +411,8 @@ def process_dv_for_table(rds_db_name, rds_tbl_name, total_files, total_size_mb,
"'' as json_row",
validation_msg,
f"""'{rds_db_name}' as database_name""",
f"""'{db_sch_tbl}' as full_table_name"""
f"""'{db_sch_tbl}' as full_table_name""",
"""'False' as table_to_ap"""
)
LOGGER.warn(f"Validation Failed - 3")
df_dv_output = df_dv_output.union(df_temp)
Expand All @@ -423,7 +427,8 @@ def process_dv_for_table(rds_db_name, rds_tbl_name, total_files, total_size_mb,
"'' as json_row",
f"""'{db_sch_tbl} - S3-Parquet folder path does not exist !' as validation_msg""",
f"""'{rds_db_name}' as database_name""",
f"""'{db_sch_tbl}' as full_table_name"""
f"""'{db_sch_tbl}' as full_table_name""",
"""'False' as table_to_ap"""
)
LOGGER.warn(f"Validation not applicable - 4")
df_dv_output = df_dv_output.union(df_temp)
Expand Down Expand Up @@ -482,19 +487,24 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, table):

# -------------------------------------------------------
if args.get("select_rds_db_tbls", None) is None:

exclude_rds_db_tbls_list = [f"""{args['rds_sqlserver_db']}_{given_rds_sqlserver_db_schema}_{tbl.strip().strip("'").strip('"')}"""
for tbl in args['exclude_rds_db_tbls'].split(",")]
LOGGER.warn(f"""Given list of tables being exluded:\n{exclude_rds_db_tbls_list}""")

filtered_rds_sqlserver_db_tbl_list = [tbl for tbl in rds_sqlserver_db_tbl_list

if args.get("exclude_rds_db_tbls", None) is None:
exclude_rds_db_tbls_list = list()
else:
exclude_rds_db_tbls_list = [f"""{args['rds_sqlserver_db']}_{given_rds_sqlserver_db_schema}_{tbl.strip().strip("'").strip('"')}"""
for tbl in args['exclude_rds_db_tbls'].split(",")]
LOGGER.warn(f"""Given list of tables being exluded:\n{exclude_rds_db_tbls_list}""")
filtered_rds_sqlserver_db_tbl_list = [tbl for tbl in rds_sqlserver_db_tbl_list
if tbl not in exclude_rds_db_tbls_list]

if not filtered_rds_sqlserver_db_tbl_list:
LOGGER.error(f"""filtered_rds_sqlserver_db_tbl_list - is empty. Exiting ...!""")
LOGGER.error(
f"""filtered_rds_sqlserver_db_tbl_list - is empty. Exiting ...!""")
sys.exit(1)
else:
LOGGER.info(f"""List of tables to be processed: {filtered_rds_sqlserver_db_tbl_list}""")
LOGGER.info(
f"""List of tables to be processed: {filtered_rds_sqlserver_db_tbl_list}""")


for db_sch_tbl in filtered_rds_sqlserver_db_tbl_list:
rds_db_name, rds_tbl_name = db_sch_tbl.split(f"_{given_rds_sqlserver_db_schema}_")[0], \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ resource "aws_iam_policy" "glue_user_restricted_notebook_service_role_iam_policy
],
"Resource": [
"arn:aws:iam::*:role/service-role/AwsGlueSessionServiceRoleUserRestrictedForNotebook*",
"arn:aws:iam::976799291502:role/${aws_iam_role.glue_notebook_iam_role.name}",
"arn:aws:iam::800964199911:role/${aws_iam_role.glue_notebook_iam_role.name}"
"arn:aws:iam::${local.env_account_id}:role/${aws_iam_role.glue_notebook_iam_role.name}"
],
"Condition": {
"StringLike": {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
def handler(event, context):
data = event["queryOutput"]["ResultSet"]["Rows"][1:]
output_list = [{row["Data"][0]["VarCharValue"]: row["Data"][1]["VarCharValue"]} for row in data]
return output_list
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ def handler(event, context):
logger.error(msg)
raise Exception(msg)

return {"statusCode": 200, "body": json.dumps(f"{copy_object} has been Successfully Copied to the AP")}
return (database_name, schema_name, table_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM public.ecr.aws/lambda/python:3.11

COPY requirements.txt .

RUN pip install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

COPY update_log_table.py ${LAMBDA_TASK_ROOT}

CMD ["update_log_table.handler"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
APP_NAME = update_log_table
APP_VERSION := $(shell terraform -chdir=../../ output -raw account_suffix)

.PHONY: print-account-suffix
print-account-suffix:
@echo APP_VERSION=$(APP_VERSION)

AWS_ECR_ACCOUNT_ID := $(shell terraform -chdir=../../ output -raw account_id)

.PHONY: print-account-id
print-account-id:
@echo AWS_ECR_ACCOUNT_ID=$(AWS_ECR_ACCOUNT_ID)


AWS_ECR_REGION = eu-west-2
AWS_ECR_REPO = lambdas/$(APP_NAME)

TAG = $(APP_VERSION)

.PHONY: docker/build docker/push docker/run docker/test

docker/build :
docker build -t $(APP_NAME):$(APP_VERSION) .

docker/push: docker/build
aws ecr get-login-password --region $(AWS_ECR_REGION) | docker login --username AWS --password-stdin $(AWS_ECR_ACCOUNT_ID).dkr.ecr.$(AWS_ECR_REGION).amazonaws.com
docker tag $(APP_NAME):$(APP_VERSION) $(AWS_ECR_ACCOUNT_ID).dkr.ecr.$(AWS_ECR_REGION).amazonaws.com/$(AWS_ECR_REPO):$(TAG)
docker push $(AWS_ECR_ACCOUNT_ID).dkr.ecr.$(AWS_ECR_REGION).amazonaws.com/$(AWS_ECR_REPO):$(TAG)

docker/run:
docker run -p 9000:8080 $(AWS_ECR_ACCOUNT_ID).dkr.ecr.$(AWS_ECR_REGION).amazonaws.com/$(AWS_ECR_REPO):$(TAG)

docker/test:
curl -XPOST 'http://localhost:9000/2015-03-31/functions/function/invocations' -d '{"input": {"test/*/Financials": "s3://dms-rds-to-parquet-20240606142913727200000001/test/dbo/Financials/LOAD00000001.parquet","db_info": ["test","dbo","Financials"]},"inputDetails": {"truncated": false},"resource": "arn:aws:lambda:eu-west-2:800964199911:function:update_log_table"}'
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pandas[pyarrow]==2.2.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pandas as pd
import boto3
import logging
import os

logger = logging.getLogger(__name__)

logger.setLevel(logging.INFO)

S3_LOG_BUCKET = os.environ.get("S3_LOG_BUCKET")
DATABASE_NAME = os.envrion.get("DATABASE_NAME")
TABLE_NAME = os.environ.get("TABLE_NAME")

def s3_path_to_bucket_key(s3_path):
"""
Splits out s3 file path to bucket key combination
"""
return s3_path.replace("s3://", "").split("/", 1)


def bucket_key_to_s3_path(bucket, key):
"""
Takes an S3 bucket and key combination and returns the
full S3 path to that location.
"""
return f"s3://{bucket}/{key}"


def _add_slash(s):
"""
Adds slash to end of string
"""
return s if s[-1] == "/" else s + "/"


def get_filepaths_from_s3_folder(
s3_folder_path, file_extension=None, exclude_zero_byte_files=True
):
"""
Get a list of filepaths from a bucket. If extension is set to a string
then only return files with that extension otherwise if set to None (default)
all filepaths are returned.
:param s3_folder_path: "s3://...."
:param extension: file extension, e.g. .json
:param exclude_zero_byte_files: Whether to filter out results of zero size: True
:return: A list of full s3 paths that were in the given s3 folder path
"""

s3_resource = boto3.resource("s3")

if file_extension is not None:
if file_extension[0] != ".":
file_extension = "." + file_extension

# This guarantees that the path the user has given is really a 'folder'.
s3_folder_path = _add_slash(s3_folder_path)

bucket, key = s3_path_to_bucket_key(s3_folder_path)

s3b = s3_resource.Bucket(bucket)
obs = s3b.objects.filter(Prefix=key)

if file_extension is not None:
obs = [o for o in obs if o.key.endswith(file_extension)]

if exclude_zero_byte_files:
obs = [o for o in obs if o.size != 0]

ob_keys = [o.key for o in obs]

paths = sorted([bucket_key_to_s3_path(bucket, o) for o in ob_keys])

return paths

def handler(event, context):
database_name, schema_name, table_name = event.get("db_info")
s3_path = f"s3://{S3_LOG_BUCKET}/{DATBASE_NAME}/{TABLE_NAME}/database_name={database_name}/full_table_name={database_name}_{schema_name}_{table_name}"
file_names = [file.split("/")[-1] for file in get_filepaths_from_s3_folder(s3_path)]
log_table = pd.read_parquet(s3_path)
log_table["table_to_ap"] = "True"
try:
log_table.to_parquet(f"{s3_path}/{file_names[0]}")
except Exception as e:
msg = f"An error has occured: {e}"
logger.error(msg)
raise msg
return {}
Loading