From a8249de9559d3bfdc886bc4e21e0da92545edd0f Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 22 Nov 2024 11:44:29 +0000 Subject: [PATCH 1/7] Glue Job added - 2211 - 1 --- .../dms_data_validation_glue_job_v2.tf | 75 +++- ...hash_rows_to_s3_prq_partitionby_yyyy_mm.py | 418 ++++++++++++++++++ .../etl_table_rows_hashvalue_to_parquet.py | 2 +- 3 files changed, 492 insertions(+), 3 deletions(-) create mode 100644 terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf index a36b3d400fd..158ce888cd8 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf @@ -190,8 +190,8 @@ resource "aws_glue_job" "etl_rds_to_s3_parquet_partitionby_yyyy_mm" { "--rds_df_repartition_num" = 0 "--coalesce_int" = 0 "--rename_migrated_prq_tbl_folder" = "" - "--year_partition_bool" = "false" - "--month_partition_bool" = "false" + "--year_partition_bool" = "true" + "--month_partition_bool" = "true" "--extra-py-files" = "s3://${module.s3-glue-job-script-bucket.bucket.id}/${aws_s3_object.aws_s3_object_pyzipfile_to_s3folder.id}" "--rds_to_parquet_output_s3_bucket" = module.s3-dms-target-store-bucket.bucket.id "--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.etl_rds_to_s3_parquet_partitionby_yyyy_mm.name}" @@ -503,3 +503,74 @@ EOF ) } + + + +resource "aws_cloudwatch_log_group" "etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm" { + name = "etl-rds-tbl-hash-rows-to-s3-prq-partitionby-yyyy-mm" + retention_in_days = 14 +} + +resource "aws_s3_object" "etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm" { + bucket = module.s3-glue-job-script-bucket.bucket.id + key = "etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py" + source = "glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py" + etag = filemd5("glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py") +} + +resource "aws_glue_job" "etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm" { + count = local.gluejob_count + + name = "etl-rds-tbl-hash-rows-to-s3-prq-partitionby-yyyy-mm" + description = "Table migration & validation Glue-Job (PySpark)." + role_arn = aws_iam_role.glue_mig_and_val_iam_role.arn + glue_version = "4.0" + worker_type = "G.2X" + number_of_workers = 4 + default_arguments = { + "--script_bucket_name" = module.s3-glue-job-script-bucket.bucket.id + "--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0] + "--rds_db_pwd" = aws_db_instance.database_2022.password + "--rds_sqlserver_db" = "" + "--rds_sqlserver_db_schema" = "dbo" + "--rds_sqlserver_db_table" = "" + "--rds_db_tbl_pkey_column" = "" + "--date_partition_column_name" = "" + "--parallel_jdbc_conn_num" = 1 + "--rds_yyyy_mm_df_repartition_num" = 0 + "--year_partition_bool" = "true" + "--month_partition_bool" = "true" + "--rds_db_table_hashed_rows_parent_dir" = "rds_tables_rows_hashed" + "--rds_query_where_clause" = "" + "--coalesce_int" = 0 + "--extra-py-files" = "s3://${module.s3-glue-job-script-bucket.bucket.id}/${aws_s3_object.aws_s3_object_pyzipfile_to_s3folder.id}" + "--hashed_output_s3_bucket_name" = module.s3-dms-data-validation-bucket.bucket.id + "--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name + "--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.name}" + "--enable-continuous-cloudwatch-log" = "true" + "--enable-continuous-log-filter" = "true" + "--enable-metrics" = "true" + "--enable-auto-scaling" = "true" + "--conf" = < {s3_table_folder_path}/""") + + # ddl_refresh_table_partitions = f"msck repair table {catalog_db.lower()}.{catalog_db_tbl.lower()}" + # LOGGER.info(f"""ddl_refresh_table_partitions:> \n{ddl_refresh_table_partitions}""") + + # # Refresh table prtitions + # execution_id = run_athena_query(ddl_refresh_table_partitions) + # LOGGER.info(f"SQL-Statement execution id: {execution_id}") + + # # Check query execution + # query_status = has_query_succeeded(execution_id=execution_id) + # LOGGER.info(f"Query state: {query_status}") + + +def write_rds_df_to_s3_parquet(df_rds_write: DataFrame, + partition_by_cols, + prq_table_folder_path): + + # s3://dms-rds-to-parquet-20240606144708618700000001/g4s_cap_dw/dbo/F_History/ + + s3_table_folder_path = f"""s3://{HASHED_OUTPUT_S3_BUCKET_NAME}/{prq_table_folder_path}""" + + if S3Methods.check_s3_folder_path_if_exists(HASHED_OUTPUT_S3_BUCKET_NAME, + prq_table_folder_path): + + LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}""") + glueContext.purge_s3_path(s3_table_folder_path, options={"retentionPeriod": 0}) + # -------------------------------------------------------------------- + + # catalog_db, catalog_db_tbl = prq_table_folder_path.split(f"""/{args['rds_sqlserver_db_schema']}/""") + + dydf = DynamicFrame.fromDF(df_rds_write, glueContext, "final_spark_df") + + glueContext.write_dynamic_frame.from_options(frame=dydf, connection_type='s3', format='parquet', + connection_options={ + 'path': f"""{s3_table_folder_path}/""", + "partitionKeys": partition_by_cols + }, + format_options={ + 'useGlueParquetWriter': True, + 'compression': 'snappy', + 'blockSize': 13421773, + 'pageSize': 1048576 + }) + LOGGER.info(f"""'{db_sch_tbl}' table data written to -> {s3_table_folder_path}/""") + +# =================================================================================================== + + +if __name__ == "__main__": + + # VERIFY GIVEN INPUTS - START + # ------------------------------------------- + + if args.get("rds_sqlserver_db", None) is None: + LOGGER.error(f"""'rds_sqlserver_db' runtime input is missing! Exiting ...""") + sys.exit(1) + else: + rds_sqlserver_db = args["rds_sqlserver_db"] + LOGGER.info(f"""Given rds_sqlserver_db = {rds_sqlserver_db}""") + + if args.get("rds_sqlserver_db_schema", None) is None: + LOGGER.error(f"""'rds_sqlserver_db_schema' runtime input is missing! Exiting ...""") + sys.exit(1) + else: + rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"] + LOGGER.info(f"""Given rds_sqlserver_db_schema = {rds_sqlserver_db_schema}""") + # ------------------------------------------- + + rds_jdbc_conn_obj = RDS_JDBC_CONNECTION(RDS_DB_HOST_ENDPOINT, + RDS_DB_INSTANCE_PWD, + rds_sqlserver_db, + rds_sqlserver_db_schema) + + try: + rds_db_name = rds_jdbc_conn_obj.check_if_rds_db_exists()[0] + except IndexError: + LOGGER.error(f"""Given database name not found! >> {args['rds_sqlserver_db']} <<""") + sys.exit(1) + except Exception as e: + LOGGER.error(e) + # ------------------------------------------------------- + + rds_sqlserver_db_tbl_list = rds_jdbc_conn_obj.get_rds_db_tbl_list() + if not rds_sqlserver_db_tbl_list: + LOGGER.error(f"""rds_sqlserver_db_tbl_list - is empty. Exiting ...!""") + sys.exit(1) + else: + message_prefix = f"""Total List of tables available in {rds_db_name}.{rds_sqlserver_db_schema}""" + LOGGER.info(f"""{message_prefix}\n{rds_sqlserver_db_tbl_list}""") + # ------------------------------------------------------- + + if args.get("rds_sqlserver_db_table", None) is None: + LOGGER.error(f"""'rds_sqlserver_db_table' runtime input is missing! Exiting ...""") + sys.exit(1) + else: + rds_sqlserver_db_table = args["rds_sqlserver_db_table"] + table_name_prefix = f"""{rds_db_name}_{rds_sqlserver_db_schema}""" + db_sch_tbl = f"""{table_name_prefix}_{rds_sqlserver_db_table}""" + # -------------------------------------------------------------------- + + if db_sch_tbl not in rds_sqlserver_db_tbl_list: + LOGGER.error(f"""'{db_sch_tbl}' - is not an existing table! Exiting ...""") + sys.exit(1) + else: + LOGGER.info(f""">> Given RDS SqlServer-DB Table: {rds_sqlserver_db_table} <<""") + # ------------------------------------------------------- + + rds_db_tbl_pkey_column = args['rds_db_tbl_pkey_column'] + LOGGER.info(f""">> rds_db_tbl_pkey_column = {rds_db_tbl_pkey_column} <<""") + + rds_db_table_empty_df = rds_jdbc_conn_obj.get_rds_db_table_empty_df( + rds_sqlserver_db_table) + + df_rds_dtype_dict = CustomPysparkMethods.get_dtypes_dict(rds_db_table_empty_df) + int_dtypes_colname_list = [colname for colname, dtype in df_rds_dtype_dict.items() + if dtype in INT_DATATYPES_LIST] + + if rds_db_tbl_pkey_column not in int_dtypes_colname_list: + LOGGER.error( + f"""PrimaryKey column-'{rds_db_tbl_pkey_column}' is not an integer datatype !""") + sys.exit(1) + # --------------------------------------- + + all_columns_except_pkey = list() + conversion_col_list = list( + TBL_COLS_CONVERT_FMT_DICT[ + f"{rds_sqlserver_db_table}"].keys() + ) + for e in rds_db_table_empty_df.schema.fields: + if e.name == rds_db_tbl_pkey_column: + continue + + if e.name in conversion_col_list: + all_columns_except_pkey.append( + TBL_COLS_CONVERT_FMT_DICT[f"{rds_sqlserver_db_table}"][f"{e.name}"] + ) + else: + all_columns_except_pkey.append(f"{e.name}") + + LOGGER.info(f""">> all_columns_except_pkey = {all_columns_except_pkey} <<""") + # --------------------------------------- + + date_partition_column_name = args['date_partition_column_name'] + LOGGER.info(f"""date_partition_column_name = {date_partition_column_name}""") + + parallel_jdbc_conn_num = int(args['parallel_jdbc_conn_num']) + LOGGER.info(f"""parallel_jdbc_conn_num = {parallel_jdbc_conn_num}""") + + rds_yyyy_mm_df_repartition_num = int(args['rds_yyyy_mm_df_repartition_num']) + LOGGER.info(f"""rds_yyyy_mm_df_repartition_num = {rds_yyyy_mm_df_repartition_num}""") + + yyyy_mm_partition_by_cols = list() + if args['year_partition_bool'] == 'true': + yyyy_mm_partition_by_cols.append("year") + + if args['month_partition_bool'] == 'true': + yyyy_mm_partition_by_cols.append("month") + + LOGGER.info(f"""yyyy_mm_partition_by_cols = {yyyy_mm_partition_by_cols}""") + + prq_table_folder_path = f""" + {RDS_DB_TABLE_HASHED_ROWS_PARENT_DIR}/{rds_db_name}/{rds_sqlserver_db_schema}/{rds_sqlserver_db_table}""".lstrip() + # ----------------------------------------- + # VERIFY GIVEN INPUTS - END + # ----------------------------------------- + + agg_row_dict_list = rds_jdbc_conn_obj.get_min_max_groupby_month( + rds_sqlserver_db_table, + date_partition_column_name, + rds_db_tbl_pkey_column, + args.get('rds_query_where_clause', None) + ) + LOGGER.info(f"""agg_row_dict_list:>\n{[agg_row_dict for agg_row_dict in agg_row_dict_list]}""") + + rds_db_select_query_str = f""" + SELECT {rds_db_tbl_pkey_column}, + LOWER(SUBSTRING(CONVERT(VARCHAR(66), + HASHBYTES('SHA2_256', CONCAT_WS('', {', '.join(all_columns_except_pkey)})), 1), 3, 66)) AS RowHash, + {date_partition_column_name} + FROM {rds_sqlserver_db_schema}.[{rds_sqlserver_db_table}] + """.strip() + + rds_query_where_clause = args.get('rds_query_where_clause', None) + + + for agg_row_dict in agg_row_dict_list: + + agg_row_year = agg_row_dict['year'] + agg_row_month = agg_row_dict['month'] + min_pkey_value = agg_row_dict['min_pkey_value'] + max_pkey_value = agg_row_dict['max_pkey_value'] + LOGGER.info(f"""agg_row_year = {agg_row_year}""") + LOGGER.info(f"""agg_row_month = {agg_row_month}""") + LOGGER.info(f"""min_pkey_value = {min_pkey_value}""") + LOGGER.info(f"""max_pkey_value = {max_pkey_value}""") + + pkey_between_clause_str = f""" + WHERE {rds_db_tbl_pkey_column} between {min_pkey_value} and {max_pkey_value}""".strip() + + rds_db_select_query_str = rds_db_select_query_str + pkey_between_clause_str + + if rds_query_where_clause is not None: + rds_query_where_clause = rds_query_where_clause.strip() + rds_db_select_query_str = rds_db_select_query_str + \ + f""" AND {rds_query_where_clause}""" + + rds_hashed_rows_df = rds_jdbc_conn_obj.get_rds_df_read_query_pkey_parallel( + rds_db_select_query_str, + rds_db_tbl_pkey_column, + min_pkey_value, + max_pkey_value, + parallel_jdbc_conn_num + ) + # ---------------------------------------------------------- + temp_msg = f"""{agg_row_year}_{agg_row_month}-rds_hashed_rows_df""" + LOGGER.info( + f"""{temp_msg}: READ PARTITIONS = {rds_hashed_rows_df.rdd.getNumPartitions()}""") + + if 'year' in yyyy_mm_partition_by_cols \ + and 'year' not in rds_hashed_rows_df.columns: + rds_hashed_rows_df = rds_hashed_rows_df.withColumn( + "year", F.year(date_partition_column_name)) + + if 'month' in yyyy_mm_partition_by_cols \ + and 'month' not in rds_hashed_rows_df.columns: + rds_hashed_rows_df = rds_hashed_rows_df.withColumn( + "month", F.month(date_partition_column_name)) + + rds_hashed_rows_df = rds_hashed_rows_df.where( + f"""year = {agg_row_year} and month = {agg_row_month}""") + + if rds_yyyy_mm_df_repartition_num != 0: + # Note: Default 'partitionby_columns' values may not be appropriate for all the scenarios. + # So, the user can edit the list-'partitionby_columns' value(s) if required at runtime. + # Example: partitionby_columns = ['month'] + # The above scenario may be when the rds-source-dataframe filtered on single 'year' value. + partitionby_columns = yyyy_mm_partition_by_cols + [rds_db_tbl_pkey_column] + + LOGGER.info(f"""{temp_msg}: Repartitioning on {partitionby_columns}""") + rds_hashed_rows_df = rds_hashed_rows_df.repartition(rds_yyyy_mm_df_repartition_num, *partitionby_columns) + + LOGGER.info( + f"""{temp_msg}: After Repartitioning -> {rds_hashed_rows_df.rdd.getNumPartitions()} partitions.""") + # ---------------------------------------------------- + + # Note: If many small size parquet files are created for each partition, + # consider using 'orderBy', 'coalesce' features appropriately before writing dataframe into S3 bucket. + # df_rds_write = rds_hashed_rows_df.coalesce(1) + + # NOTE: When filtered rows (ex: based on 'year') are used in separate consecutive batch runs, + # consider to appropriately use the parquet write functions with features in built as per the below details. + # - write_rds_df_to_s3_parquet(): Overwrites the existing partitions by default. + # - write_rds_df_to_s3_parquet_v2(): Adds the new partitions & also the corresponding partitions are updated in athena tables. + coalesce_int = int(args.get('coalesce_int', 0)) + if coalesce_int != 0: + LOGGER.warn(f"""{temp_msg}:> coalesce_int = {coalesce_int}""") + rds_hashed_rows_df_write = rds_hashed_rows_df.coalesce(coalesce_int) + else: + rds_hashed_rows_df_write = rds_hashed_rows_df.alias("rds_hashed_rows_df_write") + + write_rds_df_to_s3_parquet(rds_hashed_rows_df_write, + yyyy_mm_partition_by_cols, + prq_table_folder_path) + + LOGGER.info(f"""Partition - '{prq_table_folder_path}/{agg_row_year}/{agg_row_month}' writing completed.""") + # ----------------------------------------------- + + total_files, total_size = S3Methods.get_s3_folder_info(HASHED_OUTPUT_S3_BUCKET_NAME, + prq_table_folder_path) + msg_part_1 = f"""total_files={total_files}""" + msg_part_2 = f"""total_size_mb={total_size/1024/1024:.2f}""" + LOGGER.info(f"""'{prq_table_folder_path}': {msg_part_1}, {msg_part_2}""") + + job.commit() diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_table_rows_hashvalue_to_parquet.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_table_rows_hashvalue_to_parquet.py index bc6896c1eb6..ffba0e5cad5 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_table_rows_hashvalue_to_parquet.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_table_rows_hashvalue_to_parquet.py @@ -201,7 +201,7 @@ def write_parquet_to_s3(hashed_rows_prq_df_write: DataFrame, hashed_rows_prq_ful FROM {rds_sqlserver_db_schema}.[{rds_sqlserver_db_table}] """.strip() - parallel_jdbc_conn_num = args['parallel_jdbc_conn_num'] + parallel_jdbc_conn_num = int(args['parallel_jdbc_conn_num']) parquet_df_write_repartition_num = int(args.get('parquet_df_write_repartition_num', 0)) From 3f432de77a92589d3affb2a2c65e0473f4d42222 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 22 Nov 2024 12:23:15 +0000 Subject: [PATCH 2/7] Glue Job added - 2211 - 2 --- ...rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py index 0f8219e9da9..2e0d27ae292 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py @@ -266,10 +266,13 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame, # --------------------------------------- all_columns_except_pkey = list() - conversion_col_list = list( - TBL_COLS_CONVERT_FMT_DICT[ - f"{rds_sqlserver_db_table}"].keys() - ) + conversion_col_list = list() + if TBL_COLS_CONVERT_FMT_DICT.get( + f"{rds_sqlserver_db_table}", None) is not None: + conversion_col_list = list( + TBL_COLS_CONVERT_FMT_DICT[ + f"{rds_sqlserver_db_table}"].keys() + ) for e in rds_db_table_empty_df.schema.fields: if e.name == rds_db_tbl_pkey_column: continue From 21130b0488d600bfbadbb0bfef6842c150657cff Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 22 Nov 2024 15:04:38 +0000 Subject: [PATCH 3/7] Glue Job added - 2211 - 3 --- .../etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py index 2e0d27ae292..fb83cc43065 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm.py @@ -323,7 +323,8 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame, SELECT {rds_db_tbl_pkey_column}, LOWER(SUBSTRING(CONVERT(VARCHAR(66), HASHBYTES('SHA2_256', CONCAT_WS('', {', '.join(all_columns_except_pkey)})), 1), 3, 66)) AS RowHash, - {date_partition_column_name} + YEAR({date_partition_column_name}) AS year, + MONTH({date_partition_column_name}) AS month FROM {rds_sqlserver_db_schema}.[{rds_sqlserver_db_table}] """.strip() From 52a8b6d9ccef23ac60b49171502f6860e04d0c14 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Mon, 25 Nov 2024 19:04:16 +0000 Subject: [PATCH 4/7] new GlueJob Added - 2511 - 1 --- .../dms_data_validation_glue_job_v2.tf | 64 +++- ...g4s_emsys_tpims_task_tables_selection.json | 10 + .../etl_rds_sqlserver_query_to_s3_parquet.py | 276 ++++++++++++++++++ .../glue_data_validation_lib.py | 12 +- 4 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf index 158ce888cd8..423999a7702 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf @@ -539,7 +539,7 @@ resource "aws_glue_job" "etl_rds_tbl_hash_rows_to_s3_prq_partitionby_yyyy_mm" { "--parallel_jdbc_conn_num" = 1 "--rds_yyyy_mm_df_repartition_num" = 0 "--year_partition_bool" = "true" - "--month_partition_bool" = "true" + "--month_partition_bool" = "true" "--rds_db_table_hashed_rows_parent_dir" = "rds_tables_rows_hashed" "--rds_query_where_clause" = "" "--coalesce_int" = 0 @@ -574,3 +574,65 @@ EOF ) } + +resource "aws_cloudwatch_log_group" "etl_rds_sqlserver_query_to_s3_parquet" { + name = "etl-rds-sqlserver-query-to-s3-parquet" + retention_in_days = 14 +} + +resource "aws_s3_object" "etl_rds_sqlserver_query_to_s3_parquet" { + bucket = module.s3-glue-job-script-bucket.bucket.id + key = "etl_rds_sqlserver_query_to_s3_parquet.py" + source = "glue-job/etl_rds_sqlserver_query_to_s3_parquet.py" + etag = filemd5("glue-job/etl_rds_sqlserver_query_to_s3_parquet.py") +} + +resource "aws_glue_job" "etl_rds_sqlserver_query_to_s3_parquet" { + count = local.gluejob_count + + name = "etl-rds-sqlserver-query-to-s3-parquet" + description = "DMS Data Validation Glue-Job (PySpark)." + role_arn = aws_iam_role.dms_dv_glue_job_iam_role.arn + glue_version = "4.0" + worker_type = "G.1X" + number_of_workers = 4 + default_arguments = { + "--script_bucket_name" = module.s3-glue-job-script-bucket.bucket.id + "--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0] + "--rds_db_pwd" = aws_db_instance.database_2022.password + "--jdbc_read_partitions_num" = "" + "--rds_sqlserver_db" = "" + "--rds_sqlserver_db_schema" = "dbo" + "--rds_sqlserver_db_table" = "" + "--rds_db_tbl_pkey_column" = "" + "--rds_df_repartition_num" = 0 + "--rename_migrated_prq_tbl_folder" = "" + "--extra-py-files" = "s3://${module.s3-glue-job-script-bucket.bucket.id}/${aws_s3_object.aws_s3_object_pyzipfile_to_s3folder.id}" + "--rds_to_parquet_output_s3_bucket" = module.s3-dms-target-store-bucket.bucket.id + "--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.etl_rds_sqlserver_query_to_s3_parquet.name}" + "--enable-continuous-cloudwatch-log" = "true" + "--enable-continuous-log-filter" = "true" + "--enable-metrics" = "true" + "--enable-auto-scaling" = "true" + "--conf" = < {s3_table_folder_path}/""") + +# =================================================================================================== + + +if __name__ == "__main__": + + # ------------------------------------------- + if args.get("rds_sqlserver_db", None) is None: + LOGGER.error(f"""'rds_sqlserver_db' runtime input is missing! Exiting ...""") + sys.exit(1) + else: + rds_sqlserver_db = args["rds_sqlserver_db"] + LOGGER.info(f"""Given rds_sqlserver_db = {rds_sqlserver_db}""") + + if args.get("rds_sqlserver_db_schema", None) is None: + LOGGER.error( + f"""'rds_sqlserver_db_schema' runtime input is missing! Exiting ...""") + sys.exit(1) + else: + rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"] + LOGGER.info( + f"""Given rds_sqlserver_db_schema = {rds_sqlserver_db_schema}""") + # ------------------------------------------- + + rds_jdbc_conn_obj = RDS_JDBC_CONNECTION(RDS_DB_HOST_ENDPOINT, + RDS_DB_INSTANCE_PWD, + rds_sqlserver_db, + rds_sqlserver_db_schema) + # ------------------------------------------- + + try: + rds_db_name = rds_jdbc_conn_obj.check_if_rds_db_exists()[0] + except IndexError: + LOGGER.error( + f"""Given database name not found! >> {args['rds_sqlserver_db']} <<""") + sys.exit(1) + except Exception as e: + LOGGER.error(e) + # ------------------------------------------------------- + + rds_sqlserver_db_tbl_list = rds_jdbc_conn_obj.get_rds_db_tbl_list() + if not rds_sqlserver_db_tbl_list: + LOGGER.error(f"""rds_sqlserver_db_tbl_list - is empty. Exiting ...!""") + sys.exit(1) + else: + message_prefix = f"""Total List of tables available in {rds_db_name}.{rds_sqlserver_db_schema}""" + LOGGER.info(f"""{message_prefix}\n{rds_sqlserver_db_tbl_list}""") + # ------------------------------------------------------- + + if args.get("rds_sqlserver_db_table", None) is None: + LOGGER.error( + f"""'rds_sqlserver_db_table' runtime input is missing! Exiting ...""") + sys.exit(1) + else: + rds_sqlserver_db_table = args["rds_sqlserver_db_table"] + table_name_prefix = f"""{rds_db_name}_{rds_sqlserver_db_schema}""" + db_sch_tbl = f"""{table_name_prefix}_{rds_sqlserver_db_table}""" + # ------------------------------------------------------- + + if db_sch_tbl not in rds_sqlserver_db_tbl_list: + LOGGER.error(f"""'{db_sch_tbl}' - is not an existing table! Exiting ...""") + sys.exit(1) + else: + LOGGER.info(f""">> Given RDS SqlServer-DB Table: {rds_sqlserver_db_table} <<""") + # ------------------------------------------------------- + + rds_db_tbl_pkey_column = args['rds_db_tbl_pkey_column'] + LOGGER.info(f"""rds_db_tbl_pkey_column = {rds_db_tbl_pkey_column}""") + # ----------------------------------------- + + rds_db_table_empty_df = rds_jdbc_conn_obj.get_rds_db_table_empty_df(rds_sqlserver_db_table) + + df_rds_dtype_dict = CustomPysparkMethods.get_dtypes_dict(rds_db_table_empty_df) + int_dtypes_colname_list = [colname for colname, dtype in df_rds_dtype_dict.items() + if dtype in INT_DATATYPES_LIST] + + if rds_db_tbl_pkey_column not in int_dtypes_colname_list: + LOGGER.error(f"""rds_db_tbl_pkey_column = {rds_db_tbl_pkey_column} is not an integer datatype column! + """.strip()) + sys.exit(1) + # ---------------------------------------------------- + + jdbc_read_partitions_num = int(args.get('jdbc_read_partitions_num', 0)) + + jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 else jdbc_read_partitions_num + LOGGER.info(f"""jdbc_read_partitions_num = {jdbc_read_partitions_num}""") + + agg_row_dict = rds_jdbc_conn_obj.get_min_max_pkey_filter( + rds_sqlserver_db_table, + rds_db_tbl_pkey_column + ) + min_pkey = agg_row_dict['min_value'] + LOGGER.info(f"""min_pkey = {min_pkey}""") + + max_pkey = agg_row_dict['max_value'] + LOGGER.info(f"""max_pkey = {max_pkey}""") + + rds_transformed_query = QUERY_DICT[f"{rds_sqlserver_db_table}"] + LOGGER.info(f"""rds_transformed_query = \n{rds_transformed_query}""") + + df_rds_query_read = rds_jdbc_conn_obj.get_rds_df_read_query_pkey_parallel( + rds_transformed_query, + rds_db_tbl_pkey_column, + min_pkey, + max_pkey, + jdbc_read_partitions_num + ) + + LOGGER.info( + f"""df_rds_query_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_query_read.rdd.getNumPartitions()}""") + + df_rds_query_read_columns = df_rds_query_read.columns + LOGGER.info(f"""1. df_rds_query_read_columns = {df_rds_query_read_columns}""") + + df_rds_query_read_schema = df_rds_query_read.schema + LOGGER.info(f"""df_rds_query_read_schema = \n{[obj for obj in df_rds_query_read_schema]}""") + + rds_df_repartition_num = int(args['rds_df_repartition_num']) + + if rds_df_repartition_num != 0: + df_rds_query_read = df_rds_query_read.repartition(rds_df_repartition_num, rds_db_tbl_pkey_column) + int_repartitions = df_rds_query_read.rdd.getNumPartitions() + LOGGER.info( + f"""df_rds_query_read: After Repartitioning -> {int_repartitions} partitions.""") + # ---------------------------------------------------- + + rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '') + if rename_output_table_folder == '': + rds_db_table_name = rds_sqlserver_db_table + else: + rds_db_table_name = rename_output_table_folder + # --------------------------------------- + + prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{rds_db_table_name}""" + LOGGER.info(f"""prq_table_folder_path = {prq_table_folder_path}""") + + write_parquet_to_s3(df_rds_query_read, prq_table_folder_path) + + job.commit() diff --git a/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py b/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py index 3862c16a1b3..af3f2311797 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py @@ -193,7 +193,7 @@ def get_rds_df_read_query_pkey_parallel(self, jdbc_partition_column, jdbc_partition_col_lowerbound, jdbc_partition_col_upperbound, - jdbc_read_partitions_num + jdbc_read_partitions_num=1 ) -> DataFrame: numPartitions = jdbc_read_partitions_num @@ -219,6 +219,16 @@ def get_rds_df_read_query_pkey_parallel(self, .option("numPartitions", numPartitions) .load()) + def get_rds_df_read_query(self, in_db_query) -> DataFrame: + + return (self.spark.read.format("jdbc") + .option("url", self.rds_jdbc_url_v2) + .option("driver", self.RDS_DB_INSTANCE_DRIVER) + .option("user", self.RDS_DB_INSTANCE_USER) + .option("password", self.RDS_DB_INSTANCE_PWD) + .option("dbtable", f"""({in_db_query}) as t""") + .load()) + def get_rds_df_query_min_max_count(self, rds_table_name, From 6d3de1da52ef14a68dc97c8beecd9ba58c265c34 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Tue, 26 Nov 2024 12:00:45 +0000 Subject: [PATCH 5/7] new GlueJob modified - 2611 - 1 --- .../dms_data_validation_glue_job_v2.tf | 44 ++-- .../etl_rds_sqlserver_query_to_s3_parquet.py | 225 +++++++++++++++++- 2 files changed, 246 insertions(+), 23 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf index 423999a7702..1ae59975f47 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job_v2.tf @@ -592,29 +592,35 @@ resource "aws_glue_job" "etl_rds_sqlserver_query_to_s3_parquet" { name = "etl-rds-sqlserver-query-to-s3-parquet" description = "DMS Data Validation Glue-Job (PySpark)." - role_arn = aws_iam_role.dms_dv_glue_job_iam_role.arn + role_arn = aws_iam_role.glue_mig_and_val_iam_role.arn glue_version = "4.0" worker_type = "G.1X" number_of_workers = 4 default_arguments = { - "--script_bucket_name" = module.s3-glue-job-script-bucket.bucket.id - "--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0] - "--rds_db_pwd" = aws_db_instance.database_2022.password - "--jdbc_read_partitions_num" = "" - "--rds_sqlserver_db" = "" - "--rds_sqlserver_db_schema" = "dbo" - "--rds_sqlserver_db_table" = "" - "--rds_db_tbl_pkey_column" = "" - "--rds_df_repartition_num" = 0 - "--rename_migrated_prq_tbl_folder" = "" - "--extra-py-files" = "s3://${module.s3-glue-job-script-bucket.bucket.id}/${aws_s3_object.aws_s3_object_pyzipfile_to_s3folder.id}" - "--rds_to_parquet_output_s3_bucket" = module.s3-dms-target-store-bucket.bucket.id - "--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.etl_rds_sqlserver_query_to_s3_parquet.name}" - "--enable-continuous-cloudwatch-log" = "true" - "--enable-continuous-log-filter" = "true" - "--enable-metrics" = "true" - "--enable-auto-scaling" = "true" - "--conf" = < total_files={total_files}""" + msg_part_2 = f"""> total_size_mb={total_size/1024/1024:.2f}""" + LOGGER.info(f"""{msg_part_1}, {msg_part_2}""") + + +def compare_rds_parquet_samples(rds_jdbc_conn_obj, + rds_db_table_name, + df_rds_query_read: DataFrame, + jdbc_partition_column, + prq_table_folder_path, + validation_sample_fraction_float) -> DataFrame: + + df_dv_output_schema = T.StructType( + [T.StructField("run_datetime", T.TimestampType(), True), + T.StructField("json_row", T.StringType(), True), + T.StructField("validation_msg", T.StringType(), True), + T.StructField("database_name", T.StringType(), True), + T.StructField("full_table_name", T.StringType(), True), + T.StructField("table_to_ap", T.StringType(), True)]) + + df_dv_output = CustomPysparkMethods.get_pyspark_empty_df(df_dv_output_schema) + + s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{prq_table_folder_path}""" + LOGGER.info(f"""Parquet Source being used for comparison: {s3_table_folder_path}""") + + df_parquet_read = spark.read.schema(df_rds_query_read.schema).parquet(s3_table_folder_path) + + df_parquet_read_sample = df_parquet_read.sample(validation_sample_fraction_float) + + df_parquet_read_sample_t1 = df_parquet_read_sample.selectExpr( + *CustomPysparkMethods.get_nvl_select_list( + df_parquet_read_sample, + rds_jdbc_conn_obj, + rds_db_table_name + ) + ) + + validation_sample_df_repartition_num = int(args['validation_sample_df_repartition_num']) + if validation_sample_df_repartition_num != 0: + df_parquet_read_sample_t1 = df_parquet_read_sample_t1.repartition( + validation_sample_df_repartition_num, + jdbc_partition_column + ) + # -------- + + df_rds_read_sample = df_rds_query_read.join(df_parquet_read_sample, + on=jdbc_partition_column, + how='leftsemi') + + df_rds_read_sample_t1 = df_rds_read_sample.selectExpr( + *CustomPysparkMethods.get_nvl_select_list( + df_rds_read_sample, + rds_jdbc_conn_obj, + rds_db_table_name + ) + ) + if validation_sample_df_repartition_num != 0: + df_rds_read_sample_t1 = df_rds_read_sample_t1.repartition( + validation_sample_df_repartition_num, + jdbc_partition_column + ) + # -------- + + df_prq_leftanti_rds = df_parquet_read_sample_t1.alias("L")\ + .join(df_rds_read_sample_t1.alias("R"), + on=df_parquet_read_sample_t1.columns, + how='leftanti') + + # df_prq_leftanti_rds = df_parquet_read_sample_t1.alias("L")\ + # .join(df_rds_read_sample_t1.alias("R"), + # on=jdbc_partition_column, how='left')\ + # .where(" or ".join([f"L.{column} != R.{column}" + # for column in df_rds_read_sample_t1.columns + # if column != jdbc_partition_column]))\ + # .select("L.*") + + df_prq_read_filtered_count = df_prq_leftanti_rds.count() + + LOGGER.info(f"""Rows sample taken = {df_parquet_read_sample.count()}""") + + if df_prq_read_filtered_count == 0: + temp_msg = f"""{validation_sample_fraction_float}-Sample Rows Validated.""" + df_temp_row = spark.sql(f"""select + current_timestamp() as run_datetime, + '' as json_row, + "{temp_msg}" as validation_msg, + '{rds_jdbc_conn_obj.rds_db_name}' as database_name, + '{db_sch_tbl}' as full_table_name, + 'False' as table_to_ap + """.strip()) + + LOGGER.info(f"{rds_db_table_name}: Validation Successful - 1") + df_dv_output = df_dv_output.union(df_temp_row) + else: + + LOGGER.warn( + f"""Parquet-RDS Subtract Report: ({df_prq_read_filtered_count}): Row(s) differences found!""") + + df_subtract_temp = (df_prq_leftanti_rds + .withColumn('json_row', F.to_json(F.struct(*[F.col(c) + for c in df_rds_query_read.columns]))) + .selectExpr("json_row") + .limit(100)) + + temp_msg = f"""{validation_sample_fraction_float}-Rows Sample Used:\n""" + df_subtract_temp = df_subtract_temp.selectExpr( + "current_timestamp as run_datetime", + "json_row", + f""""{temp_msg}>{df_prq_read_filtered_count} Rows - Validation Failed !" as validation_msg""", + f"""'{rds_jdbc_conn_obj.rds_db_name}' as database_name""", + f"""'{db_sch_tbl}' as full_table_name""", + """'False' as table_to_ap""" + ) + LOGGER.warn(f"{rds_db_table_name}: Validation Failed - 2") + df_dv_output = df_dv_output.union(df_subtract_temp) + # ----------------------------------------------------- + + return df_dv_output + + +def write_rds_to_s3parquet(df_rds_query_read: DataFrame, prq_table_folder_path): s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{prq_table_folder_path}""" @@ -136,6 +275,44 @@ def write_parquet_to_s3(df_rds_query_read: DataFrame, prq_table_folder_path): }) LOGGER.info(f"""df_rds_query_read - dataframe written to -> {s3_table_folder_path}/""") + +def write_dv_report_to_s3parquet(df_dv_output: DataFrame, + rds_jdbc_conn_obj, + db_sch_tbl_name): + + db_name = rds_jdbc_conn_obj.rds_db_name + df_dv_output = df_dv_output.repartition(1) + + prq_table_folder_path = f"""{args["glue_catalog_db_name"]}/{args["glue_catalog_tbl_name"]}""" + s3_table_folder_path = f'''s3://{GLUE_CATALOG_DV_BUCKET}/{prq_table_folder_path}''' + + if S3Methods.check_s3_folder_path_if_exists(GLUE_CATALOG_DV_BUCKET, + f'''{prq_table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}''' + ): + LOGGER.info( + f"""Purging S3-path: {s3_table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}""") + + glueContext.purge_s3_path(f"""{s3_table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}""", + options={"retentionPeriod": 0} + ) + # --------------------------------------------------------------------- + + dydf = DynamicFrame.fromDF(df_dv_output, glueContext, "final_spark_df") + + glueContext.write_dynamic_frame.from_options(frame=dydf, connection_type='s3', format='parquet', + connection_options={ + 'path': f"""{s3_table_folder_path}/""", + "partitionKeys": ["database_name", "full_table_name"] + }, + format_options={ + 'useGlueParquetWriter': True, + 'compression': 'snappy', + 'blockSize': 13421773, + 'pageSize': 1048576 + }) + LOGGER.info( + f"""'{db_sch_tbl_name}' validation report written to -> {s3_table_folder_path}/""") + # =================================================================================================== @@ -255,7 +432,8 @@ def write_parquet_to_s3(df_rds_query_read: DataFrame, prq_table_folder_path): rds_df_repartition_num = int(args['rds_df_repartition_num']) if rds_df_repartition_num != 0: - df_rds_query_read = df_rds_query_read.repartition(rds_df_repartition_num, rds_db_tbl_pkey_column) + df_rds_query_read = df_rds_query_read.repartition(rds_df_repartition_num, + rds_db_tbl_pkey_column) int_repartitions = df_rds_query_read.rdd.getNumPartitions() LOGGER.info( f"""df_rds_query_read: After Repartitioning -> {int_repartitions} partitions.""") @@ -271,6 +449,45 @@ def write_parquet_to_s3(df_rds_query_read: DataFrame, prq_table_folder_path): prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{rds_db_table_name}""" LOGGER.info(f"""prq_table_folder_path = {prq_table_folder_path}""") - write_parquet_to_s3(df_rds_query_read, prq_table_folder_path) + validation_only_run = args['validation_only_run'] + + validation_sample_fraction_float = float(args.get('validation_sample_fraction_float', 0)) + temp_msg = f"""validation_sample_fraction_float = {validation_sample_fraction_float}""" + if validation_only_run != "true": + if validation_sample_fraction_float != 0: + df_rds_query_read = df_rds_query_read.cache() + write_rds_to_s3parquet(df_rds_query_read, prq_table_folder_path) + print_existing_s3parquet_stats(prq_table_folder_path) + LOGGER.info(f"""> Starting validation: {temp_msg}""") + df_dv_output = compare_rds_parquet_samples(rds_jdbc_conn_obj, + rds_db_table_name, + df_rds_query_read, + rds_db_tbl_pkey_column, + prq_table_folder_path, + validation_sample_fraction_float + ) + write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) + else: + write_rds_to_s3parquet(df_rds_query_read, prq_table_folder_path) + print_existing_s3parquet_stats(prq_table_folder_path) + LOGGER.info(f"""{temp_msg}\nValidation not enabled. Skipping ...""") + else: + LOGGER.info(f""">> validation_only_run - ENABLED <<""") + print_existing_s3parquet_stats(prq_table_folder_path) + + if validation_sample_fraction_float != 0: + LOGGER.info(f"""> Starting validation: {temp_msg}""") + df_dv_output = compare_rds_parquet_samples(rds_jdbc_conn_obj, + rds_db_table_name, + df_rds_query_read, + rds_db_tbl_pkey_column, + prq_table_folder_path, + validation_sample_fraction_float + ) + write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) + else: + LOGGER.info(f""">> Skipping validation: {temp_msg} <<""") + # --------------------------------------------------------------- + job.commit() From 279074849e4f59e57d2969c6cdcfc01fe3870ff0 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Tue, 26 Nov 2024 13:31:52 +0000 Subject: [PATCH 6/7] new GlueJob modified - 2611 - 2 --- .../glue-job/etl_rds_sqlserver_query_to_s3_parquet.py | 4 ++-- .../glue-job/reusable_module/glue_data_validation_lib.py | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py index 87ed7c1ed11..b46d60daecc 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py @@ -460,7 +460,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, print_existing_s3parquet_stats(prq_table_folder_path) LOGGER.info(f"""> Starting validation: {temp_msg}""") df_dv_output = compare_rds_parquet_samples(rds_jdbc_conn_obj, - rds_db_table_name, + rds_sqlserver_db_table, df_rds_query_read, rds_db_tbl_pkey_column, prq_table_folder_path, @@ -479,7 +479,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, if validation_sample_fraction_float != 0: LOGGER.info(f"""> Starting validation: {temp_msg}""") df_dv_output = compare_rds_parquet_samples(rds_jdbc_conn_obj, - rds_db_table_name, + rds_sqlserver_db_table, df_rds_query_read, rds_db_tbl_pkey_column, prq_table_folder_path, diff --git a/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py b/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py index af3f2311797..a78700aaac3 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/reusable_module/glue_data_validation_lib.py @@ -705,10 +705,8 @@ def get_rds_tbl_col_attr_dict(df_col_stats: DataFrame) -> DataFrame: def get_nvl_select_list(in_rds_df: DataFrame, rds_jdbc_conn_obj, in_rds_tbl_name): - df_col_attr = rds_jdbc_conn_obj.get_rds_tbl_col_attributes( - in_rds_tbl_name) - df_col_attr_dict = CustomPysparkMethods.get_rds_tbl_col_attr_dict( - df_col_attr) + df_col_attr = rds_jdbc_conn_obj.get_rds_tbl_col_attributes(in_rds_tbl_name) + df_col_attr_dict = CustomPysparkMethods.get_rds_tbl_col_attr_dict(df_col_attr) df_col_dtype_dict = CustomPysparkMethods.get_dtypes_dict(in_rds_df) temp_select_list = list() From 07927cc54011161dedb00384ce790aa30b51326c Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Tue, 26 Nov 2024 14:32:51 +0000 Subject: [PATCH 7/7] new GlueJob modified - 2611 - 3 --- .../etl_rds_sqlserver_query_to_s3_parquet.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py index b46d60daecc..f89022871b2 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py @@ -396,7 +396,8 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, jdbc_read_partitions_num = int(args.get('jdbc_read_partitions_num', 0)) - jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 else jdbc_read_partitions_num + jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 \ + else jdbc_read_partitions_num LOGGER.info(f"""jdbc_read_partitions_num = {jdbc_read_partitions_num}""") agg_row_dict = rds_jdbc_conn_obj.get_min_max_pkey_filter( @@ -424,7 +425,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, f"""df_rds_query_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_query_read.rdd.getNumPartitions()}""") df_rds_query_read_columns = df_rds_query_read.columns - LOGGER.info(f"""1. df_rds_query_read_columns = {df_rds_query_read_columns}""") + LOGGER.info(f"""df_rds_query_read_columns = {df_rds_query_read_columns}""") df_rds_query_read_schema = df_rds_query_read.schema LOGGER.info(f"""df_rds_query_read_schema = \n{[obj for obj in df_rds_query_read_schema]}""") @@ -439,19 +440,20 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, f"""df_rds_query_read: After Repartitioning -> {int_repartitions} partitions.""") # ---------------------------------------------------- - rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '') - if rename_output_table_folder == '': - rds_db_table_name = rds_sqlserver_db_table - else: - rds_db_table_name = rename_output_table_folder + rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', None) + prq_table_folder_name = rds_sqlserver_db_table if rename_output_table_folder is None \ + else rename_output_table_folder # --------------------------------------- - prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{rds_db_table_name}""" + prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{prq_table_folder_name}""" LOGGER.info(f"""prq_table_folder_path = {prq_table_folder_path}""") validation_only_run = args['validation_only_run'] validation_sample_fraction_float = float(args.get('validation_sample_fraction_float', 0)) + validation_sample_fraction_float = 1.0 if validation_sample_fraction_float > 1 \ + else validation_sample_fraction_float + temp_msg = f"""validation_sample_fraction_float = {validation_sample_fraction_float}""" if validation_only_run != "true": if validation_sample_fraction_float != 0: @@ -467,13 +469,14 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, validation_sample_fraction_float ) write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) + df_rds_query_read.unpersist() else: write_rds_to_s3parquet(df_rds_query_read, prq_table_folder_path) print_existing_s3parquet_stats(prq_table_folder_path) - LOGGER.info(f"""{temp_msg}\nValidation not enabled. Skipping ...""") + LOGGER.warn(f"""{temp_msg}\nValidation not enabled. Skipping ...""") else: - LOGGER.info(f""">> validation_only_run - ENABLED <<""") + LOGGER.warn(f""">> validation_only_run - ENABLED <<""") print_existing_s3parquet_stats(prq_table_folder_path) if validation_sample_fraction_float != 0: @@ -487,7 +490,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, ) write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) else: - LOGGER.info(f""">> Skipping validation: {temp_msg} <<""") + LOGGER.warn(f"""{temp_msg} => Skipping Validation !""") # --------------------------------------------------------------- job.commit()