diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py index b46d60daecc..f89022871b2 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_sqlserver_query_to_s3_parquet.py @@ -396,7 +396,8 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, jdbc_read_partitions_num = int(args.get('jdbc_read_partitions_num', 0)) - jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 else jdbc_read_partitions_num + jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 \ + else jdbc_read_partitions_num LOGGER.info(f"""jdbc_read_partitions_num = {jdbc_read_partitions_num}""") agg_row_dict = rds_jdbc_conn_obj.get_min_max_pkey_filter( @@ -424,7 +425,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, f"""df_rds_query_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_query_read.rdd.getNumPartitions()}""") df_rds_query_read_columns = df_rds_query_read.columns - LOGGER.info(f"""1. df_rds_query_read_columns = {df_rds_query_read_columns}""") + LOGGER.info(f"""df_rds_query_read_columns = {df_rds_query_read_columns}""") df_rds_query_read_schema = df_rds_query_read.schema LOGGER.info(f"""df_rds_query_read_schema = \n{[obj for obj in df_rds_query_read_schema]}""") @@ -439,19 +440,20 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, f"""df_rds_query_read: After Repartitioning -> {int_repartitions} partitions.""") # ---------------------------------------------------- - rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '') - if rename_output_table_folder == '': - rds_db_table_name = rds_sqlserver_db_table - else: - rds_db_table_name = rename_output_table_folder + rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', None) + prq_table_folder_name = rds_sqlserver_db_table if rename_output_table_folder is None \ + else rename_output_table_folder # --------------------------------------- - prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{rds_db_table_name}""" + prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{prq_table_folder_name}""" LOGGER.info(f"""prq_table_folder_path = {prq_table_folder_path}""") validation_only_run = args['validation_only_run'] validation_sample_fraction_float = float(args.get('validation_sample_fraction_float', 0)) + validation_sample_fraction_float = 1.0 if validation_sample_fraction_float > 1 \ + else validation_sample_fraction_float + temp_msg = f"""validation_sample_fraction_float = {validation_sample_fraction_float}""" if validation_only_run != "true": if validation_sample_fraction_float != 0: @@ -467,13 +469,14 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, validation_sample_fraction_float ) write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) + df_rds_query_read.unpersist() else: write_rds_to_s3parquet(df_rds_query_read, prq_table_folder_path) print_existing_s3parquet_stats(prq_table_folder_path) - LOGGER.info(f"""{temp_msg}\nValidation not enabled. Skipping ...""") + LOGGER.warn(f"""{temp_msg}\nValidation not enabled. Skipping ...""") else: - LOGGER.info(f""">> validation_only_run - ENABLED <<""") + LOGGER.warn(f""">> validation_only_run - ENABLED <<""") print_existing_s3parquet_stats(prq_table_folder_path) if validation_sample_fraction_float != 0: @@ -487,7 +490,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame, ) write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) else: - LOGGER.info(f""">> Skipping validation: {temp_msg} <<""") + LOGGER.warn(f"""{temp_msg} => Skipping Validation !""") # --------------------------------------------------------------- job.commit()