From ea4d4ee4176064e380779f8b29d9a5cd22d711dd Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 08:50:55 +0100 Subject: [PATCH 1/8] CodeLogicImprovements --- .../dms_data_validation_glue_job.tf | 1 + .../glue-job/rds_to_s3_parquet_migration.py | 81 ++++++++++--------- 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index 1c4ab9f49ef..f7ce13cd200 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -243,6 +243,7 @@ resource "aws_glue_job" "rds_to_s3_parquet_migration" { "--year_partition" = "false" "--month_partition" = "false" "--day_partition" = "false" + "--validation_only_run" = "false" "--rds_to_parquet_output_s3_bucket" = aws_s3_bucket.dms_target_ep_s3_bucket.id "--dv_parquet_output_s3_bucket" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id "--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name diff --git a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py index 5b5155c5dc6..edcf7410c7a 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py @@ -71,7 +71,8 @@ def resolve_args(args_list): "rds_table_total_rows", "year_partition", "month_partition", - "day_partition" + "day_partition", + "validation_only_run" ] OPTIONAL_INPUTS = [ @@ -368,10 +369,10 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}""" LOGGER.info(f"""Parquet Source being used for comparison: {s3_table_folder_path}""") - df_parquet_read = spark.read.schema(df_rds_read.schema).parquet(s3_table_folder_path).cache() + df_parquet_read = spark.read.schema(df_rds_read.schema).parquet(s3_table_folder_path) df_compare_columns_list = [col for col in df_parquet_read.columns - if col not in ['year', 'month', 'day']] + if col not in ['year', 'month', 'day']] df_parquet_read_sample = df_parquet_read.sample(validation_sample_fraction_float)\ .select(*df_compare_columns_list) @@ -454,8 +455,6 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, df_dv_output = df_dv_output.union(df_subtract_temp) # ----------------------------------------------------- - df_parquet_read.unpersist(True) - return df_dv_output @@ -525,7 +524,6 @@ def write_to_s3_parquet(df_dv_output: DataFrame, rds_jdbc_conn_obj = RDS_JDBC_CONNECTION(args['rds_sqlserver_db'], args['rds_sqlserver_db_schema'], args['rds_sqlserver_db_table']) - # ------------------------------------------- try: @@ -562,6 +560,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, jdbc_read_partitions_num = int(int(int(args['rds_table_total_size_mb'])/1024)/2) else: jdbc_read_partitions_num = 1 + # ------------------------------ jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 \ else jdbc_read_partitions_num @@ -573,8 +572,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, df_rds_dtype_dict = get_dtypes_dict(rds_db_table_empty_df) int_dtypes_colname_list = [colname for colname, dtype in df_rds_dtype_dict.items() if dtype in INT_DATATYPES_LIST] - # ------------------------------------------------------- - + if args.get('rds_db_tbl_pkeys_col_list', None) is None: try: rds_db_tbl_pkeys_col_list = [column.strip() @@ -609,40 +607,40 @@ def write_to_s3_parquet(df_dv_output: DataFrame, jdbc_read_partitions_num)) LOGGER.info(f"""df_rds_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_read.rdd.getNumPartitions()}""") - partition_by_cols = list() + if args['validation_only_run'] != "true": + partition_by_cols = list() - if args.get('date_partition_column_name', None) is not None: - given_date_column = args['date_partition_column_name'] - LOGGER.info(f"""given_date_column = {given_date_column}""") + if args.get('date_partition_column_name', None) is not None: + given_date_column = args['date_partition_column_name'] + LOGGER.info(f"""given_date_column = {given_date_column}""") - if args['year_partition'] == 'true': - df_rds_read = df_rds_read.withColumn("year", F.year(given_date_column)) - partition_by_cols.append("year") + if args['year_partition'] == 'true': + df_rds_read = df_rds_read.withColumn("year", F.year(given_date_column)) + partition_by_cols.append("year") - if args['month_partition'] == 'true': - df_rds_read = df_rds_read.withColumn("month", F.month(given_date_column)) - partition_by_cols.append("month") + if args['month_partition'] == 'true': + df_rds_read = df_rds_read.withColumn("month", F.month(given_date_column)) + partition_by_cols.append("month") - if args['day_partition'] == 'true': - df_rds_read = df_rds_read.withColumn("day", F.dayofmonth(given_date_column)) - partition_by_cols.append("day") + if args['day_partition'] == 'true': + df_rds_read = df_rds_read.withColumn("day", F.dayofmonth(given_date_column)) + partition_by_cols.append("day") - #df_rds_read = df_rds_read.repartition("year", "month", "day") - # ---------------------------------------------------- + #df_rds_read = df_rds_read.repartition("year", "month", "day") + # ---------------------------------------------------- - if args.get('other_partitionby_columns', None) is not None: - other_partitionby_columns = [f"""{column.strip().strip("'").strip('"')}""" - for column in args['other_partitionby_columns'].split(",")] - LOGGER.info(f"""other_partitionby_columns = {other_partitionby_columns}""") - partition_by_cols.extend(other_partitionby_columns) - # ---------------------------------------------------- + if args.get('other_partitionby_columns', None) is not None: + other_partitionby_columns = [f"""{column.strip().strip("'").strip('"')}""" + for column in args['other_partitionby_columns'].split(",")] + LOGGER.info(f"""other_partitionby_columns = {other_partitionby_columns}""") + partition_by_cols.extend(other_partitionby_columns) + # ---------------------------------------------------- - if partition_by_cols: - LOGGER.info(f"""df_rds_read-Repartitioning on columns: {partition_by_cols}""") - df_rds_read = df_rds_read.repartition(jdbc_read_partitions_num, *partition_by_cols).cache() - else: - df_rds_read = df_rds_read.cache() - # ----------------------------------- + if partition_by_cols: + LOGGER.info(f"""df_rds_read-OrderBy on partitionBy columns: {partition_by_cols}""") + df_rds_read = df_rds_read.orderBy(*partition_by_cols) + # ----------------------------------- + # --------------------------------------- rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '') if rename_output_table_folder == '': @@ -651,10 +649,13 @@ def write_to_s3_parquet(df_dv_output: DataFrame, table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{rename_output_table_folder}""" # --------------------------------------- + df_rds_read = df_rds_read.cache() - write_rds_df_to_s3_parquet(df_rds_read, - partition_by_cols, - table_folder_path) + if args['validation_only_run'] != "true": + write_rds_df_to_s3_parquet(df_rds_read, + partition_by_cols, + table_folder_path) + # ----------------------------------------------- total_files, total_size = get_s3_folder_info(PARQUET_OUTPUT_S3_BUCKET_NAME, table_folder_path) msg_part_1 = f"""total_files={total_files}""" @@ -670,10 +671,10 @@ def write_to_s3_parquet(df_dv_output: DataFrame, table_folder_path, validation_sample_fraction_float) - df_rds_read.unpersist(True) - write_to_s3_parquet(df_dv_output, rds_db_name, db_sch_tbl) # ------------------------------------------------------------ + df_rds_read.unpersist(True) + job.commit() From 75af11f6fdcf239171556728ae5c173fa732f1e9 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 09:07:29 +0100 Subject: [PATCH 2/8] CodeLogicImprovements-2 --- .../dms_data_validation_glue_job.tf | 1 + .../glue-job/rds_to_s3_parquet_migration.py | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index f7ce13cd200..02579a74e90 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -231,6 +231,7 @@ resource "aws_glue_job" "rds_to_s3_parquet_migration" { "--rds_db_tbl_pkeys_col_list" = "" "--rds_table_total_size_mb" = "" "--rds_table_total_rows" = "" + "--rds_df_repartition_num" = 0 "--date_partition_column_name" = "" "--other_partitionby_columns" = "" "--validation_sample_fraction_float" = 0 diff --git a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py index edcf7410c7a..bab1f27f53c 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py @@ -69,6 +69,7 @@ def resolve_args(args_list): "rds_db_tbl_pkeys_col_list", "rds_table_total_size_mb", "rds_table_total_rows", + "rds_df_repartition_num", "year_partition", "month_partition", "day_partition", @@ -607,6 +608,13 @@ def write_to_s3_parquet(df_dv_output: DataFrame, jdbc_read_partitions_num)) LOGGER.info(f"""df_rds_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_read.rdd.getNumPartitions()}""") + rds_df_repartition_num = args['rds_df_repartition_num'] + if rds_df_repartition_num != 0: + LOGGER.info(f"""df_rds_read-Repartitioning: {rds_df_repartition_num}""") + df_rds_read = df_rds_read.repartition(rds_df_repartition_num, + jdbc_read_partitions_num) + # --------------------------------------------------------------- + if args['validation_only_run'] != "true": partition_by_cols = list() From 58ec59c4515d1323ca60e7fe0c539b766c48daa0 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 12:51:35 +0100 Subject: [PATCH 3/8] CodeCorrections-0719-1 --- .../dms_data_validation_glue_job.tf | 66 ++++++++--------- .../glue-job/rds_to_s3_parquet_migration.py | 70 +++++++++++++------ 2 files changed, 82 insertions(+), 54 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index 02579a74e90..6c8e47ed5d2 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -222,39 +222,39 @@ resource "aws_glue_job" "rds_to_s3_parquet_migration" { worker_type = "G.2X" number_of_workers = 5 default_arguments = { - "--script_bucket_name" = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id - "--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0] - "--rds_db_pwd" = aws_db_instance.database_2022.password - "--rds_sqlserver_db" = "" - "--rds_sqlserver_db_schema" = "dbo" - "--rds_sqlserver_db_table" = "" - "--rds_db_tbl_pkeys_col_list" = "" - "--rds_table_total_size_mb" = "" - "--rds_table_total_rows" = "" - "--rds_df_repartition_num" = 0 - "--date_partition_column_name" = "" - "--other_partitionby_columns" = "" - "--validation_sample_fraction_float" = 0 - "--validation_sample_df_repartition" = 0 - "--jdbc_read_256mb_partitions" = "false" - "--jdbc_read_512mb_partitions" = "false" - "--jdbc_read_1gb_partitions" = "false" - "--jdbc_read_2gb_partitions" = "false" - "--rename_migrated_prq_tbl_folder" = "" - "--year_partition" = "false" - "--month_partition" = "false" - "--day_partition" = "false" - "--validation_only_run" = "false" - "--rds_to_parquet_output_s3_bucket" = aws_s3_bucket.dms_target_ep_s3_bucket.id - "--dv_parquet_output_s3_bucket" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id - "--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name - "--glue_catalog_tbl_name" = "glue_df_output" - "--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.rds_to_s3_parquet_migration.name}" - "--enable-continuous-cloudwatch-log" = "true" - "--enable-continuous-log-filter" = "true" - "--enable-metrics" = "true" - "--enable-auto-scaling" = "true" - "--conf" = < DataFrame: .option("query", f"""{query_str}""") .load()) + # def get_min_max_pkey_given_year(self, + # pkey_col_name, + # year, + # agg_str) -> DataFrame: + + # if agg_str not in ['min', 'max']: + # raise ValueError(""">> The 'aggregate' function must be either 'min' or 'max' <<""") + # sys.exit(1) + + # query_str = f""" + # SELECT {agg_str}({pkey_col_name}) as agg_value + # FROM {self.rds_db_schema_name}.[{self.rds_db_table_name}] + # WHERE YEAR(RecordedDatetime) = {year} + # """.strip() + + # return (spark.read.format("jdbc") + # .option("url", self.rds_jdbc_url_v2) + # .option("driver", RDS_DB_INSTANCE_DRIVER) + # .option("user", RDS_DB_INSTANCE_USER) + # .option("password", RDS_DB_INSTANCE_PWD) + # .option("query", f"""{query_str}""") + # .load()).collect()[0].agg_value + # --------------------------------------------------------------------- # PYTHON CLASS 'RDS_JDBC_CONNECTION' - END # --------------------------------------------------------------------- @@ -382,9 +405,9 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, *get_nvl_select_list(rds_jdbc_conn_obj, df_parquet_read_sample)) - validation_sample_df_repartition = int(args['validation_sample_df_repartition']) - if validation_sample_df_repartition != 0: - df_parquet_read_sample_t1 = df_parquet_read_sample_t1.repartition(validation_sample_df_repartition, + validation_sample_df_repartition_num = int(args['validation_sample_df_repartition_num']) + if validation_sample_df_repartition_num != 0: + df_parquet_read_sample_t1 = df_parquet_read_sample_t1.repartition(validation_sample_df_repartition_num, jdbc_partition_column) # -------- @@ -396,8 +419,8 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, df_rds_read_sample_t1 = df_rds_read_sample.selectExpr( *get_nvl_select_list(rds_jdbc_conn_obj, df_rds_read_sample)) - if validation_sample_df_repartition != 0: - df_rds_read_sample_t1 = df_rds_read_sample_t1.repartition(validation_sample_df_repartition, + if validation_sample_df_repartition_num != 0: + df_rds_read_sample_t1 = df_rds_read_sample_t1.repartition(validation_sample_df_repartition_num, jdbc_partition_column) # -------- @@ -459,21 +482,22 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, return df_dv_output -def write_to_s3_parquet(df_dv_output: DataFrame, - database, +def write_to_s3_parquet(df_dv_output: DataFrame, + rds_jdbc_conn_obj, db_sch_tbl_name): + db_name = rds_jdbc_conn_obj.rds_db_name df_dv_output = df_dv_output.repartition(1) table_folder_path = f"""{args["glue_catalog_db_name"]}/{args["glue_catalog_tbl_name"]}""" s3_table_folder_path = f'''s3://{DV_PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}''' if check_s3_folder_path_if_exists(DV_PARQUET_OUTPUT_S3_BUCKET_NAME, - f'''{table_folder_path}/database_name={database}/full_table_name={db_sch_tbl_name}''' + f'''{table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}''' ): - LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}/database_name={database}/full_table_name={db_sch_tbl_name}""") + LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}""") - glueContext.purge_s3_path(f"""{s3_table_folder_path}/database_name={database}/full_table_name={db_sch_tbl_name}""", + glueContext.purge_s3_path(f"""{s3_table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}""", options={"retentionPeriod": 0} ) # --------------------------------------------------------------------- @@ -615,22 +639,23 @@ def write_to_s3_parquet(df_dv_output: DataFrame, jdbc_read_partitions_num) # --------------------------------------------------------------- - if args['validation_only_run'] != "true": + validation_only_run = args['validation_only_run'] + if validation_only_run != "true": partition_by_cols = list() if args.get('date_partition_column_name', None) is not None: given_date_column = args['date_partition_column_name'] LOGGER.info(f"""given_date_column = {given_date_column}""") - if args['year_partition'] == 'true': + if args['year_partition_bool'] == 'true': df_rds_read = df_rds_read.withColumn("year", F.year(given_date_column)) partition_by_cols.append("year") - if args['month_partition'] == 'true': + if args['month_partition_bool'] == 'true': df_rds_read = df_rds_read.withColumn("month", F.month(given_date_column)) partition_by_cols.append("month") - if args['day_partition'] == 'true': + if args['day_partition_bool'] == 'true': df_rds_read = df_rds_read.withColumn("day", F.dayofmonth(given_date_column)) partition_by_cols.append("day") @@ -645,9 +670,12 @@ def write_to_s3_parquet(df_dv_output: DataFrame, # ---------------------------------------------------- if partition_by_cols: + orderby_columns = partition_by_cols + [jdbc_read_partitions_num] LOGGER.info(f"""df_rds_read-OrderBy on partitionBy columns: {partition_by_cols}""") - df_rds_read = df_rds_read.orderBy(*partition_by_cols) + df_rds_read = df_rds_read.orderBy(*orderby_columns) # ----------------------------------- + else: + LOGGER.info(f"""** validation_only_run = {validation_only_run} **""") # --------------------------------------- rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '') @@ -659,7 +687,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, df_rds_read = df_rds_read.cache() - if args['validation_only_run'] != "true": + if validation_only_run != "true": write_rds_df_to_s3_parquet(df_rds_read, partition_by_cols, table_folder_path) @@ -680,7 +708,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, validation_sample_fraction_float) - write_to_s3_parquet(df_dv_output, rds_db_name, db_sch_tbl) + write_to_s3_parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) # ------------------------------------------------------------ df_rds_read.unpersist(True) From 13d14e9ffb86eeb7f90ceb9d7975fad1b6360ae9 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 13:22:50 +0100 Subject: [PATCH 4/8] CodeCorrections-0719-2 --- .../glue-job/rds_to_s3_parquet_migration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py index 075d945ca69..7b4b29f7b4b 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py @@ -632,11 +632,11 @@ def write_to_s3_parquet(df_dv_output: DataFrame, jdbc_read_partitions_num)) LOGGER.info(f"""df_rds_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_read.rdd.getNumPartitions()}""") - rds_df_repartition_num = args['rds_df_repartition_num'] + rds_df_repartition_num = int(args['rds_df_repartition_num']) if rds_df_repartition_num != 0: LOGGER.info(f"""df_rds_read-Repartitioning: {rds_df_repartition_num}""") df_rds_read = df_rds_read.repartition(rds_df_repartition_num, - jdbc_read_partitions_num) + jdbc_partition_column) # --------------------------------------------------------------- validation_only_run = args['validation_only_run'] @@ -675,7 +675,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, df_rds_read = df_rds_read.orderBy(*orderby_columns) # ----------------------------------- else: - LOGGER.info(f"""** validation_only_run = {validation_only_run} **""") + LOGGER.info(f"""** validation_only_run = '{validation_only_run}' **""") # --------------------------------------- rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '') From 73bcaca07ac4e10254a4e580d57f89c2e6a08bd5 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 13:53:28 +0100 Subject: [PATCH 5/8] CodeCorrections-0719-3 --- .../glue-job/rds_to_s3_parquet_migration.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py index 7b4b29f7b4b..b37d76db4e2 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py @@ -195,7 +195,8 @@ def get_rds_db_tbl_list(self): def get_df_read_rds_db_tbl_int_pkey(self, jdbc_partition_column, jdbc_partition_col_upperbound, - jdbc_read_partitions_num + jdbc_read_partitions_num, + jdbc_partition_col_lowerbound=0, ) -> DataFrame: numPartitions = jdbc_read_partitions_num @@ -215,7 +216,7 @@ def get_df_read_rds_db_tbl_int_pkey(self, .option("password", RDS_DB_INSTANCE_PWD) .option("dbtable", f"""({query_str}) as t""") .option("partitionColumn", jdbc_partition_column) - .option("lowerBound", 0) + .option("lowerBound", jdbc_partition_col_lowerbound) .option("upperBound", jdbc_partition_col_upperbound) .option("numPartitions", numPartitions) .load()) From 51755148297f9be7e666feed3ff8c3691c2463e0 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 15:03:12 +0100 Subject: [PATCH 6/8] CodeCorrections-0719-4 --- .../glue-job/rds_to_s3_parquet_migration.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py index b37d76db4e2..6ecfb52ca00 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py @@ -350,21 +350,29 @@ def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, table_folder_path): # s3://dms-rds-to-parquet-20240606144708618700000001/g4s_cap_dw/dbo/F_History/ + # s3://dms-rds-to-parquet-20240606144708618700000001/g4s_emsys_mvp/dbo/GPSPosition/ s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}""" - if check_s3_folder_path_if_exists(PARQUET_OUTPUT_S3_BUCKET_NAME, table_folder_path): + # if check_s3_folder_path_if_exists(PARQUET_OUTPUT_S3_BUCKET_NAME, table_folder_path): - LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}""") - glueContext.purge_s3_path(s3_table_folder_path, options={"retentionPeriod": 0}) + # LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}""") + # glueContext.purge_s3_path(s3_table_folder_path, options={"retentionPeriod": 0}) # -------------------------------------------------------------------- + catalog_db, catalog_db_tbl = table_folder_path.split(f"""/{args['rds_sqlserver_db_schema']}/""") + dydf = DynamicFrame.fromDF(df_rds_read, glueContext, "final_spark_df") glueContext.write_dynamic_frame.from_options(frame=dydf, connection_type='s3', format='parquet', connection_options={ 'path': f"""{s3_table_folder_path}/""", - "partitionKeys": partition_by_cols + "partitionKeys": partition_by_cols, + "mode": "overwrite", + "enableUpdateCatalog": True, # Enable updating the Glue Data Catalog + "updateBehavior": "UPDATE_IN_DATABASE", + "database": catalog_db.lower(), # Glue database name + "tableName": catalog_db_tbl.lower() # Glue table name }, format_options={ 'useGlueParquetWriter': True, From dc443a30540fecf5298202a876e2a891b937a718 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 16:06:30 +0100 Subject: [PATCH 7/8] CodeCorrections-0719-5 --- .../glue-job/rds_to_s3_parquet_migration.py | 50 ++++++++++++++----- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py index 6ecfb52ca00..d6fbd878c7d 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/rds_to_s3_parquet_migration.py @@ -345,6 +345,37 @@ def check_s3_folder_path_if_exists(in_bucket_name, in_folder_path): # ================================================================== +def write_rds_df_to_s3_parquet_v2(df_rds_read: DataFrame, + partition_by_cols, + table_folder_path): + """ + Write dynamic frame in S3 and catalog it. + """ + s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}""" + + catalog_db, catalog_db_tbl = table_folder_path.split(f"""/{args['rds_sqlserver_db_schema']}/""") + + dynamic_df_write = glueContext.getSink( + format_options = { + "compression": "snappy", + "useGlueParquetWriter": True + }, + path = f"""{s3_table_folder_path}/""", + connection_type = "s3", + updateBehavior = "UPDATE_IN_DATABASE", + partitionKeys = partition_by_cols, + enableUpdateCatalog = True, + transformation_ctx = "dynamic_df_write", + ) + dynamic_df_write.setCatalogInfo( + catalogDatabase = catalog_db.lower(), + catalogTableName = catalog_db_tbl.lower() + ) + dynamic_df_write.setFormat("glueparquet") + dynamic_df_write.writeFrame(df_rds_read) + LOGGER.info(f"""{db_sch_tbl} table data written to -> {s3_table_folder_path}/""") + + def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, partition_by_cols, table_folder_path): @@ -354,25 +385,20 @@ def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}""" - # if check_s3_folder_path_if_exists(PARQUET_OUTPUT_S3_BUCKET_NAME, table_folder_path): + if check_s3_folder_path_if_exists(PARQUET_OUTPUT_S3_BUCKET_NAME, table_folder_path): - # LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}""") - # glueContext.purge_s3_path(s3_table_folder_path, options={"retentionPeriod": 0}) + LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}""") + glueContext.purge_s3_path(s3_table_folder_path, options={"retentionPeriod": 0}) # -------------------------------------------------------------------- - catalog_db, catalog_db_tbl = table_folder_path.split(f"""/{args['rds_sqlserver_db_schema']}/""") + # catalog_db, catalog_db_tbl = table_folder_path.split(f"""/{args['rds_sqlserver_db_schema']}/""") dydf = DynamicFrame.fromDF(df_rds_read, glueContext, "final_spark_df") glueContext.write_dynamic_frame.from_options(frame=dydf, connection_type='s3', format='parquet', connection_options={ 'path': f"""{s3_table_folder_path}/""", - "partitionKeys": partition_by_cols, - "mode": "overwrite", - "enableUpdateCatalog": True, # Enable updating the Glue Data Catalog - "updateBehavior": "UPDATE_IN_DATABASE", - "database": catalog_db.lower(), # Glue database name - "tableName": catalog_db_tbl.lower() # Glue table name + "partitionKeys": partition_by_cols }, format_options={ 'useGlueParquetWriter': True, @@ -380,7 +406,7 @@ def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, 'blockSize': 13421773, 'pageSize': 1048576 }) - LOGGER.info(f"""{db_sch_tbl} table data written to -> {s3_table_folder_path}/""") + LOGGER.info(f"""'{db_sch_tbl}' table data written to -> {s3_table_folder_path}/""") def compare_rds_parquet_samples(rds_jdbc_conn_obj, @@ -524,7 +550,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, 'blockSize': 13421773, 'pageSize': 1048576 }) - LOGGER.info(f"""{db_sch_tbl_name} validation report written to -> {s3_table_folder_path}/""") + LOGGER.info(f"""'{db_sch_tbl_name}' validation report written to -> {s3_table_folder_path}/""") # =================================================================================================== From 4b1d8968a4dc553a58650eb5a30c6ada05112bd8 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Fri, 19 Jul 2024 16:44:22 +0100 Subject: [PATCH 8/8] CodeCorrections-0719-6 --- .../dms_data_validation_glue_job.tf | 1 + .../glue-job/rds_to_s3_parquet_migration.py | 20 ++++++++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index 6c8e47ed5d2..6a2983a398d 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -256,6 +256,7 @@ resource "aws_glue_job" "rds_to_s3_parquet_migration" { "--enable-auto-scaling" = "true" "--conf" = < {s3_table_folder_path}/""") @@ -381,7 +392,6 @@ def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, table_folder_path): # s3://dms-rds-to-parquet-20240606144708618700000001/g4s_cap_dw/dbo/F_History/ - # s3://dms-rds-to-parquet-20240606144708618700000001/g4s_emsys_mvp/dbo/GPSPosition/ s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}"""