diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf index 5025ae3cc0c..559b87ded21 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf @@ -578,6 +578,7 @@ resource "aws_glue_job" "etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm" { "--rds_db_table_hashed_rows_parent_dir" = "rds_tables_rows_hashed" "--incremental_run_bool" = "false" "--rds_query_where_clause" = "" + "--skip_columns_for_hashing" = "" "--coalesce_int" = 0 "--extra-py-files" = "s3://${module.s3-glue-job-script-bucket.bucket.id}/${aws_s3_object.aws_s3_object_pyzipfile_to_s3folder.id}" "--hashed_output_s3_bucket_name" = module.s3-dms-data-validation-bucket.bucket.id @@ -821,6 +822,7 @@ resource "aws_glue_job" "dms_dv_on_rows_hashvalue_partitionby_yyyy_mm" { "--dms_prq_table_folder" = "" "--rds_only_where_clause" = "" "--prq_df_where_clause" = "" + "--skip_columns_for_hashing" = "" "--rds_hashed_rows_prq_bucket" = module.s3-dms-data-validation-bucket.bucket.id "--glue_catalog_dv_bucket" = module.s3-dms-data-validation-bucket.bucket.id "--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name @@ -854,4 +856,4 @@ EOF } ) -} \ No newline at end of file +} diff --git a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py index c94a5039bd2..03e7274d4db 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py @@ -60,7 +60,8 @@ OPTIONAL_INPUTS = [ "rds_only_where_clause", - "prq_df_where_clause" + "prq_df_where_clause", + "skip_columns_for_hashing" ] AVAILABLE_ARGS_LIST = CustomPysparkMethods.resolve_args(DEFAULT_INPUTS_LIST+OPTIONAL_INPUTS) @@ -236,15 +237,41 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name): # |1970|12 |1130650220 |5330506101 |9 | # -------------------------------------------------------------------------------------- + rds_db_table_empty_df = rds_jdbc_conn_obj.get_rds_db_table_empty_df(rds_table_orignal_name) + + skip_columns_for_hashing_str = args.get("skip_columns_for_hashing", None) + + skip_columns_for_hashing = list() + skipped_struct_fields_list = list() + + if skip_columns_for_hashing_str is not None: + skip_columns_for_hashing = [f"""{col_name.strip().strip("'").strip('"')}""" + for col_name in skip_columns_for_hashing_str.split(",")] + LOGGER.warn(f"""WARNING ! >> Given skip_columns_for_hashing = {skip_columns_for_hashing}""") + + for e in rds_db_table_empty_df.schema: + if e.name in skip_columns_for_hashing: + skipped_struct_fields_list.append(f"""T.{e}""") + + LOGGER.warn(f"""WARNING ! >> skipped_struct_fields_list = {skipped_struct_fields_list}""") + + group_by_cols_list = ['year', 'month'] prq_df_where_clause = args.get("prq_df_where_clause", None) - rds_hashed_rows_prq_df = CustomPysparkMethods.get_s3_parquet_df_v2( + if skipped_struct_fields_list: + rds_hashed_rows_prq_df = CustomPysparkMethods.get_s3_parquet_df_v2( + rds_hashed_rows_fulls3path, + CustomPysparkMethods.get_pyspark_hashed_table_schema( + TABLE_PKEY_COLUMN, skipped_struct_fields_list) + ) + else: + rds_hashed_rows_prq_df = CustomPysparkMethods.get_s3_parquet_df_v2( rds_hashed_rows_fulls3path, CustomPysparkMethods.get_pyspark_hashed_table_schema( - TABLE_PKEY_COLUMN) - ) + TABLE_PKEY_COLUMN) + ) if prq_df_where_clause is not None: rds_hashed_rows_prq_df = rds_hashed_rows_prq_df.where(f"{prq_df_where_clause}") @@ -273,8 +300,6 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name): # |1970|12 |1130650220 |5330506101 |9 | # -------------------------------------------------------------------------------------- - rds_db_table_empty_df = rds_jdbc_conn_obj.get_rds_db_table_empty_df(rds_table_orignal_name) - migrated_prq_yyyy_mm_df = CustomPysparkMethods.get_s3_parquet_df_v3( dms_output_fulls3path, rds_db_table_empty_df.schema) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py index 7a02275c150..294027bf303 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py @@ -64,7 +64,8 @@ "coalesce_int", "parallel_jdbc_conn_num", "pkey_lower_bound_int", - "pkey_upper_bound_int" + "pkey_upper_bound_int", + "skip_columns_for_hashing" ] AVAILABLE_ARGS_LIST = CustomPysparkMethods.resolve_args(DEFAULT_INPUTS_LIST+OPTIONAL_INPUTS) @@ -246,13 +247,21 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame, sys.exit(1) # --------------------------------------- + skip_columns_for_hashing_str = args.get("skip_columns_for_hashing", None) + skip_columns_for_hashing = list() + if skip_columns_for_hashing_str is not None: + skip_columns_for_hashing = [f"""{col_name.strip().strip("'").strip('"')}""" + for col_name in skip_columns_for_hashing_str.split(",")] + LOGGER.warn(f"""WARNING ! >> Given skip_columns_for_hashing = {skip_columns_for_hashing}""") + all_columns_except_pkey = list() conversion_col_list = list() if TRANSFORM_COLS_FOR_HASHING_DICT.get(f"{db_sch_tbl}", None) is not None: conversion_col_list = list(TRANSFORM_COLS_FOR_HASHING_DICT[f"{db_sch_tbl}"].keys()) for e in rds_db_table_empty_df.schema.fields: - if e.name == rds_db_tbl_pkey_column: + if (e.name == rds_db_tbl_pkey_column) \ + or (e.name in skip_columns_for_hashing): continue if e.name in conversion_col_list: @@ -286,8 +295,12 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame, # VERIFY GIVEN INPUTS - END # ----------------------------------------- + partial_select_str = f"""SELECT {rds_db_tbl_pkey_column}, """ + if skip_columns_for_hashing_str is not None: + partial_select_str = partial_select_str + ', '.join(skip_columns_for_hashing) + rds_db_hash_cols_query_str = f""" - SELECT {rds_db_tbl_pkey_column}, + {partial_select_str} LOWER(SUBSTRING(CONVERT(VARCHAR(66), HASHBYTES('SHA2_256', CONCAT_WS('', {', '.join(all_columns_except_pkey)})), 1), 3, 66)) AS RowHash, YEAR({date_partition_column_name}) AS year, diff --git a/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py b/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py index b9474b7c9b9..fadd9ae1f96 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py @@ -821,8 +821,20 @@ def get_rds_db_tbl_customized_cols_schema_object(in_df_rds: DataFrame, return altered_schema_object @staticmethod - def get_pyspark_hashed_table_schema(in_pkey_column): - return T.StructType([ - T.StructField(f"{in_pkey_column}", T.LongType(), False), - T.StructField("RowHash", T.StringType(), False)] - ) + def get_pyspark_hashed_table_schema(in_pkey_column, sf_list=None): + if sf_list is None: + return T.StructType([ + T.StructField(f"{in_pkey_column}", T.LongType(), False), + T.StructField("RowHash", T.StringType(), False)] + ) + else: + schema = T.StructType([ + T.StructField(f"{in_pkey_column}", T.LongType(), False)] + ) + + for sf in sf_list: + schema = schema.add(sf) + + schema = schema.add(T.StructField("RowHash", T.StringType(), False)) + + return schema