Skip to content

Commit

Permalink
new GlueJob modified - 2611 - 3
Browse files Browse the repository at this point in the history
  • Loading branch information
madhu-k-sr2 committed Nov 26, 2024
1 parent 2790748 commit 07927cc
Showing 1 changed file with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame,

jdbc_read_partitions_num = int(args.get('jdbc_read_partitions_num', 0))

jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 else jdbc_read_partitions_num
jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 \
else jdbc_read_partitions_num
LOGGER.info(f"""jdbc_read_partitions_num = {jdbc_read_partitions_num}""")

agg_row_dict = rds_jdbc_conn_obj.get_min_max_pkey_filter(
Expand Down Expand Up @@ -424,7 +425,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame,
f"""df_rds_query_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_query_read.rdd.getNumPartitions()}""")

df_rds_query_read_columns = df_rds_query_read.columns
LOGGER.info(f"""1. df_rds_query_read_columns = {df_rds_query_read_columns}""")
LOGGER.info(f"""df_rds_query_read_columns = {df_rds_query_read_columns}""")

df_rds_query_read_schema = df_rds_query_read.schema
LOGGER.info(f"""df_rds_query_read_schema = \n{[obj for obj in df_rds_query_read_schema]}""")
Expand All @@ -439,19 +440,20 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame,
f"""df_rds_query_read: After Repartitioning -> {int_repartitions} partitions.""")
# ----------------------------------------------------

rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '')
if rename_output_table_folder == '':
rds_db_table_name = rds_sqlserver_db_table
else:
rds_db_table_name = rename_output_table_folder
rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', None)
prq_table_folder_name = rds_sqlserver_db_table if rename_output_table_folder is None \
else rename_output_table_folder
# ---------------------------------------

prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{rds_db_table_name}"""
prq_table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{prq_table_folder_name}"""
LOGGER.info(f"""prq_table_folder_path = {prq_table_folder_path}""")

validation_only_run = args['validation_only_run']

validation_sample_fraction_float = float(args.get('validation_sample_fraction_float', 0))
validation_sample_fraction_float = 1.0 if validation_sample_fraction_float > 1 \
else validation_sample_fraction_float

temp_msg = f"""validation_sample_fraction_float = {validation_sample_fraction_float}"""
if validation_only_run != "true":
if validation_sample_fraction_float != 0:
Expand All @@ -467,13 +469,14 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame,
validation_sample_fraction_float
)
write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl)
df_rds_query_read.unpersist()
else:
write_rds_to_s3parquet(df_rds_query_read, prq_table_folder_path)
print_existing_s3parquet_stats(prq_table_folder_path)
LOGGER.info(f"""{temp_msg}\nValidation not enabled. Skipping ...""")
LOGGER.warn(f"""{temp_msg}\nValidation not enabled. Skipping ...""")

else:
LOGGER.info(f""">> validation_only_run - ENABLED <<""")
LOGGER.warn(f""">> validation_only_run - ENABLED <<""")
print_existing_s3parquet_stats(prq_table_folder_path)

if validation_sample_fraction_float != 0:
Expand All @@ -487,7 +490,7 @@ def write_dv_report_to_s3parquet(df_dv_output: DataFrame,
)
write_dv_report_to_s3parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl)
else:
LOGGER.info(f""">> Skipping validation: {temp_msg} <<""")
LOGGER.warn(f"""{temp_msg} => Skipping Validation !""")
# ---------------------------------------------------------------

job.commit()

0 comments on commit 07927cc

Please sign in to comment.