Skip to content

Commit

Permalink
Merge pull request #9146 from ministryofjustice/ELM-3031_Partitioned_…
Browse files Browse the repository at this point in the history
…DV_Job_v7

partitioned_parquet_output_data_validation - 1912 - 1
  • Loading branch information
madhu-k-sr2 authored Dec 19, 2024
2 parents e7fb0af + 27a3f83 commit 0a97ce7
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ resource "aws_glue_job" "etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm" {
"--rds_db_table_hashed_rows_parent_dir" = "rds_tables_rows_hashed"
"--incremental_run_bool" = "false"
"--rds_query_where_clause" = ""
"--skip_columns_for_hashing" = ""
"--coalesce_int" = 0
"--extra-py-files" = "s3://${module.s3-glue-job-script-bucket.bucket.id}/${aws_s3_object.aws_s3_object_pyzipfile_to_s3folder.id}"
"--hashed_output_s3_bucket_name" = module.s3-dms-data-validation-bucket.bucket.id
Expand Down Expand Up @@ -821,6 +822,7 @@ resource "aws_glue_job" "dms_dv_on_rows_hashvalue_partitionby_yyyy_mm" {
"--dms_prq_table_folder" = ""
"--rds_only_where_clause" = ""
"--prq_df_where_clause" = ""
"--skip_columns_for_hashing" = ""
"--rds_hashed_rows_prq_bucket" = module.s3-dms-data-validation-bucket.bucket.id
"--glue_catalog_dv_bucket" = module.s3-dms-data-validation-bucket.bucket.id
"--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name
Expand Down Expand Up @@ -854,4 +856,4 @@ EOF
}
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@

OPTIONAL_INPUTS = [
"rds_only_where_clause",
"prq_df_where_clause"
"prq_df_where_clause",
"skip_columns_for_hashing"
]

AVAILABLE_ARGS_LIST = CustomPysparkMethods.resolve_args(DEFAULT_INPUTS_LIST+OPTIONAL_INPUTS)
Expand Down Expand Up @@ -236,15 +237,41 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name):
# |1970|12 |1130650220 |5330506101 |9 |
# --------------------------------------------------------------------------------------

rds_db_table_empty_df = rds_jdbc_conn_obj.get_rds_db_table_empty_df(rds_table_orignal_name)

skip_columns_for_hashing_str = args.get("skip_columns_for_hashing", None)

skip_columns_for_hashing = list()
skipped_struct_fields_list = list()

if skip_columns_for_hashing_str is not None:
skip_columns_for_hashing = [f"""{col_name.strip().strip("'").strip('"')}"""
for col_name in skip_columns_for_hashing_str.split(",")]
LOGGER.warn(f"""WARNING ! >> Given skip_columns_for_hashing = {skip_columns_for_hashing}""")

for e in rds_db_table_empty_df.schema:
if e.name in skip_columns_for_hashing:
skipped_struct_fields_list.append(f"""T.{e}""")

LOGGER.warn(f"""WARNING ! >> skipped_struct_fields_list = {skipped_struct_fields_list}""")


group_by_cols_list = ['year', 'month']
prq_df_where_clause = args.get("prq_df_where_clause", None)


rds_hashed_rows_prq_df = CustomPysparkMethods.get_s3_parquet_df_v2(
if skipped_struct_fields_list:
rds_hashed_rows_prq_df = CustomPysparkMethods.get_s3_parquet_df_v2(
rds_hashed_rows_fulls3path,
CustomPysparkMethods.get_pyspark_hashed_table_schema(
TABLE_PKEY_COLUMN, skipped_struct_fields_list)
)
else:
rds_hashed_rows_prq_df = CustomPysparkMethods.get_s3_parquet_df_v2(
rds_hashed_rows_fulls3path,
CustomPysparkMethods.get_pyspark_hashed_table_schema(
TABLE_PKEY_COLUMN)
)
TABLE_PKEY_COLUMN)
)

if prq_df_where_clause is not None:
rds_hashed_rows_prq_df = rds_hashed_rows_prq_df.where(f"{prq_df_where_clause}")
Expand Down Expand Up @@ -273,8 +300,6 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name):
# |1970|12 |1130650220 |5330506101 |9 |
# --------------------------------------------------------------------------------------

rds_db_table_empty_df = rds_jdbc_conn_obj.get_rds_db_table_empty_df(rds_table_orignal_name)

migrated_prq_yyyy_mm_df = CustomPysparkMethods.get_s3_parquet_df_v3(
dms_output_fulls3path,
rds_db_table_empty_df.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"coalesce_int",
"parallel_jdbc_conn_num",
"pkey_lower_bound_int",
"pkey_upper_bound_int"
"pkey_upper_bound_int",
"skip_columns_for_hashing"
]

AVAILABLE_ARGS_LIST = CustomPysparkMethods.resolve_args(DEFAULT_INPUTS_LIST+OPTIONAL_INPUTS)
Expand Down Expand Up @@ -246,13 +247,21 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame,
sys.exit(1)
# ---------------------------------------

skip_columns_for_hashing_str = args.get("skip_columns_for_hashing", None)
skip_columns_for_hashing = list()
if skip_columns_for_hashing_str is not None:
skip_columns_for_hashing = [f"""{col_name.strip().strip("'").strip('"')}"""
for col_name in skip_columns_for_hashing_str.split(",")]
LOGGER.warn(f"""WARNING ! >> Given skip_columns_for_hashing = {skip_columns_for_hashing}""")

all_columns_except_pkey = list()
conversion_col_list = list()
if TRANSFORM_COLS_FOR_HASHING_DICT.get(f"{db_sch_tbl}", None) is not None:
conversion_col_list = list(TRANSFORM_COLS_FOR_HASHING_DICT[f"{db_sch_tbl}"].keys())

for e in rds_db_table_empty_df.schema.fields:
if e.name == rds_db_tbl_pkey_column:
if (e.name == rds_db_tbl_pkey_column) \
or (e.name in skip_columns_for_hashing):
continue

if e.name in conversion_col_list:
Expand Down Expand Up @@ -286,8 +295,12 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame,
# VERIFY GIVEN INPUTS - END
# -----------------------------------------

partial_select_str = f"""SELECT {rds_db_tbl_pkey_column}, """
if skip_columns_for_hashing_str is not None:
partial_select_str = partial_select_str + ', '.join(skip_columns_for_hashing)

rds_db_hash_cols_query_str = f"""
SELECT {rds_db_tbl_pkey_column},
{partial_select_str}
LOWER(SUBSTRING(CONVERT(VARCHAR(66),
HASHBYTES('SHA2_256', CONCAT_WS('', {', '.join(all_columns_except_pkey)})), 1), 3, 66)) AS RowHash,
YEAR({date_partition_column_name}) AS year,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,8 +821,20 @@ def get_rds_db_tbl_customized_cols_schema_object(in_df_rds: DataFrame,
return altered_schema_object

@staticmethod
def get_pyspark_hashed_table_schema(in_pkey_column):
return T.StructType([
T.StructField(f"{in_pkey_column}", T.LongType(), False),
T.StructField("RowHash", T.StringType(), False)]
)
def get_pyspark_hashed_table_schema(in_pkey_column, sf_list=None):
if sf_list is None:
return T.StructType([
T.StructField(f"{in_pkey_column}", T.LongType(), False),
T.StructField("RowHash", T.StringType(), False)]
)
else:
schema = T.StructType([
T.StructField(f"{in_pkey_column}", T.LongType(), False)]
)

for sf in sf_list:
schema = schema.add(sf)

schema = schema.add(T.StructField("RowHash", T.StringType(), False))

return schema

0 comments on commit 0a97ce7

Please sign in to comment.