From affdd08820499885a3f743e9139b71a41f7db2de Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Thu, 27 Jun 2024 11:51:14 +0100 Subject: [PATCH 1/6] DV_GlueJob_V3_Altered-1 --- .../dms_data_validation_glue_job.tf | 5 +- .../dms_dv_rds_and_s3_parquet_write_v3.py | 117 +++++++++++++++--- 2 files changed, 101 insertions(+), 21 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index 69b4ffac234..824a00e3c5b 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -154,8 +154,11 @@ resource "aws_glue_job" "dms_dv_glue_job_v3" { "--rds_db_tbl_pkeys_col_list" = "" "--rds_df_trim_str_col_list" = "" "--rds_df_trim_micro_sec_ts_col_list" = "" + "--jdbc_read_500mb_partitions" = "false" + "--jdbc_read_1gb_partitions" = "true" + "--jdbc_read_2gb_partitions" = "false" "--rds_read_rows_fetch_size" = 100000 - "--dataframe_repartitions" = 8 + "--dataframe_repartitions" = 0 "--parquet_src_bucket_name" = aws_s3_bucket.dms_target_ep_s3_bucket.id "--parquet_output_bucket_name" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id "--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name diff --git a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py index f2b0a56bf35..70007a786b4 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py @@ -55,6 +55,9 @@ def resolve_args(args_list): "rds_sqlserver_db_table", "rds_db_tbl_pkeys_col_list", "dataframe_repartitions", + "jdbc_read_500mb_partitions", + "jdbc_read_1gb_partitions", + "jdbc_read_2gb_partitions", "rds_read_rows_fetch_size" ] @@ -100,6 +103,8 @@ def resolve_args(args_list): 'timestamp': "to_timestamp('1900-01-01', 'yyyy-MM-dd')", 'date': "to_date('1900-01-01', 'yyyy-MM-dd')"} +INT_DATATYPES_LIST = ['tinyint', 'smallint', 'int', 'bigint'] + """ # Use the below query to fetch the existing primary keys defined in RDS-DB-Schema. # ------------------------------------------------------------------------------------- @@ -221,23 +226,43 @@ def get_rds_db_table_row_count(in_rds_db_name, .load()).collect()[0].row_count -def get_df_read_rds_db_query(in_rds_db_name, - in_table_name, - select_col_list, - num_of_jdbc_connections=None - ) -> DataFrame: +def get_rds_db_table_pkey_col_max_value(in_rds_db_name, in_table_name, + in_pkey_col_name) -> DataFrame: given_rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"] - num_of_rows_per_trip = args.get("rds_read_rows_fetch_size", None) - fetchSize = 10000 if num_of_rows_per_trip is None else num_of_rows_per_trip - # The JDBC fetch size, which determines how many rows to fetch per round trip. - # This can help performance on JDBC drivers which default to low fetch size (e.g. Oracle with 10 rows). + query_str = f""" + SELECT max({in_pkey_col_name}) as max_value + FROM {given_rds_sqlserver_db_schema}.[{in_table_name}] + """.strip() + + return (spark.read.format("jdbc") + .option("url", get_rds_db_jdbc_url(in_rds_db_name)) + .option("driver", RDS_DB_INSTANCE_DRIVER) + .option("user", RDS_DB_INSTANCE_USER) + .option("password", RDS_DB_INSTANCE_PWD) + .option("query", f"""{query_str}""") + .load()).collect()[0].max_value + + +def get_df_read_rds_db_tbl_int_pkey(in_rds_db_name, in_table_name, select_col_list, + jdbc_partition_column, + jdbc_partition_col_upperbound, + jdbc_read_partitions_num, + jdbc_rows_fetch_size + ) -> DataFrame: + given_rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"] - numPartitions = 8 if num_of_jdbc_connections is None else num_of_jdbc_connections + numPartitions = jdbc_read_partitions_num # Note: numPartitions is normally equal to number of executors defined. # The maximum number of partitions that can be used for parallelism in table reading and writing. # This also determines the maximum number of concurrent JDBC connections. - + + fetchSize = jdbc_rows_fetch_size + # The JDBC fetch size, which determines how many rows to fetch per round trip. + # This can help performance on JDBC drivers which default to low fetch size (e.g. Oracle with 10 rows). + # Too Small: => frequent round trips to database + # Too Large: => Consume a lot of memory + query_str = f""" SELECT {', '.join(select_col_list)} FROM {given_rds_sqlserver_db_schema}.[{in_table_name}] @@ -252,9 +277,12 @@ def get_df_read_rds_db_query(in_rds_db_name, .option("driver", RDS_DB_INSTANCE_DRIVER) .option("user", RDS_DB_INSTANCE_USER) .option("password", RDS_DB_INSTANCE_PWD) - .option("query", f"""{query_str} as t""") - .option("fetchSize", fetchSize) + .option("dbtable", f"""({query_str}) as t""") + .option("partitionColumn", jdbc_partition_column) + .option("lowerBound", "0") + .option("upperBound", jdbc_partition_col_upperbound) .option("numPartitions", numPartitions) + .option("fetchSize", fetchSize) .load()) @@ -456,6 +484,7 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in LOGGER.error(f"""Global Dictionary - 'RECORDED_PKEYS_LIST' has no key '{rds_tbl_name}'!""") sys.exit(1) else: + LOGGER.info("""rds_db_tbl_pkeys_col_list = {rds_db_tbl_pkeys_col_list}""") rds_db_tbl_pkeys_col_list = [f"""{column.strip().strip("'").strip('"')}""" for column in args['rds_db_tbl_pkeys_col_list'].split(",")] # ------------------------------------------------------- @@ -463,7 +492,6 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in df_dv_output = get_pyspark_empty_df(df_dv_output_schema) rds_db_table_empty_df = get_rds_db_table_empty_df(rds_db_name, rds_tbl_name) - df_rds_columns_list = rds_db_table_empty_df.columns df_rds_count = get_rds_db_table_row_count(rds_db_name, rds_tbl_name, @@ -510,6 +538,48 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in transform_msg_2 =f"""- micro-seconds trimmed.""" # ------------------------------------------------------- + df_rds_columns_list = rds_db_table_empty_df.columns + df_rds_dtype_dict = get_dtypes_dict(rds_db_table_empty_df) + int_dtypes_colname_list = [colname for colname, dtype in df_rds_dtype_dict.items() + if dtype in INT_DATATYPES_LIST] + if len(rds_db_tbl_pkeys_col_list) == 1 and \ + (rds_db_tbl_pkeys_col_list[0] in int_dtypes_colname_list): + jdbc_partition_column = rds_db_tbl_pkeys_col_list[0] + pkey_max_value = get_rds_db_table_pkey_col_max_value(rds_db_name, rds_tbl_name, + jdbc_partition_column) + else: + LOGGER.error(f"""int_dtypes_colname_list = {int_dtypes_colname_list}""") + LOGGER.error(f"""PrimaryKey column(s) are more than one (OR) not an integer datatype column!""") + sys.exit(1) + # ------------------------------------------------------- + + if args.get("jdbc_read_500mb_partitions", "false") == "true": + jdbc_read_partitions_num = int(total_size_mb/500) + elif args.get("jdbc_read_1gb_partitions", "false") == "true": + jdbc_read_partitions_num = int(total_size_mb/1024) + elif args.get("jdbc_read_2gb_partitions", "false") == "true": + jdbc_read_partitions_num = int((total_size_mb/1024)/2) + else: + jdbc_read_partitions_num = total_files + LOGGER.info("""jdbc_read_partitions_num = {jdbc_read_partitions_num}""") + + rows_per_partition_v1 = int(df_rds_count/jdbc_read_partitions_num) + LOGGER.info("""rows_per_partition_v1 = {rows_per_partition_v1}""") + + rows_per_partition_v2 = int(pkey_max_value/jdbc_read_partitions_num) + LOGGER.info("""rows_per_partition_v2 = {rows_per_partition_v2}""") + + jdbc_partition_col_upperbound = rows_per_partition_v1 \ + if rows_per_partition_v1 > rows_per_partition_v2 \ + else rows_per_partition_v2 + LOGGER.info("""jdbc_partition_col_upperbound = {jdbc_partition_col_upperbound}""") + + jdbc_rows_fetch_size = jdbc_partition_col_upperbound \ + if args["rds_read_rows_fetch_size"] < jdbc_partition_col_upperbound \ + else args["rds_read_rows_fetch_size"] + + LOGGER.info(f"""jdbc_rows_fetch_size = {jdbc_rows_fetch_size}""") + validated_colmn_msg_list = list() for_loop_count = 0 @@ -525,12 +595,18 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in LOGGER.info(f"""{for_loop_count}-Processing - {rds_tbl_name}.{rds_column}.""") LOGGER.info(f"""Using Dataframe-'select' column list: {temp_select_list}""") - df_rds_temp = (get_df_read_rds_db_query(rds_db_name, rds_tbl_name, temp_select_list, - num_of_jdbc_connections=total_files)) + df_rds_temp = (get_df_read_rds_db_tbl_int_pkey(rds_db_name, + rds_tbl_name, + temp_select_list, + jdbc_partition_column, + jdbc_partition_col_upperbound, + jdbc_read_partitions_num, + jdbc_rows_fetch_size)) LOGGER.info(f"""df_rds_temp-{rds_column}: READ PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""") - df_rds_temp = df_rds_temp.repartition(input_repartition_factor) - LOGGER.info(f"""df_rds_temp-{rds_column}: RE-PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""") + if args["dataframe_repartitions"] != 0: + df_rds_temp = df_rds_temp.repartition(input_repartition_factor) + LOGGER.info(f"""df_rds_temp-{rds_column}: RE-PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""") # ------------------------------------------------------- t1_rds_str_col_trimmed = False @@ -568,8 +644,9 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in .select(*temp_select_list)) LOGGER.info(f"""df_prq_temp-{rds_column}: READ PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""") - df_prq_temp = df_prq_temp.repartition(input_repartition_factor) - LOGGER.info(f"""df_prq_temp-{rds_column}: RE-PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""") + if args["dataframe_repartitions"] != 0: + df_prq_temp = df_prq_temp.repartition(input_repartition_factor) + LOGGER.info(f"""df_prq_temp-{rds_column}: RE-PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""") df_prq_temp_t1 = df_prq_temp.selectExpr(*get_nvl_select_list(df_rds_temp, rds_db_name, rds_tbl_name)) From de7a0134c5f57c0488ba3afe31c529b8da093b06 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Thu, 27 Jun 2024 12:13:11 +0100 Subject: [PATCH 2/6] DV_GlueJob_V3_Altered-2 --- .../glue-job/dms_dv_rds_and_s3_parquet_write_v3.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py index 70007a786b4..a127946314a 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py @@ -484,7 +484,7 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in LOGGER.error(f"""Global Dictionary - 'RECORDED_PKEYS_LIST' has no key '{rds_tbl_name}'!""") sys.exit(1) else: - LOGGER.info("""rds_db_tbl_pkeys_col_list = {rds_db_tbl_pkeys_col_list}""") + LOGGER.info(f"""rds_db_tbl_pkeys_col_list = {rds_db_tbl_pkeys_col_list}""") rds_db_tbl_pkeys_col_list = [f"""{column.strip().strip("'").strip('"')}""" for column in args['rds_db_tbl_pkeys_col_list'].split(",")] # ------------------------------------------------------- @@ -516,6 +516,8 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in LOGGER.info(final_validation_msg) df_dv_output = df_dv_output.union(df_temp_row) return df_dv_output + else: + LOGGER.info(f"""df_rds_count = {df_rds_count}""") # ------------------------------------------------------- rds_df_trim_str_col_str = args.get('rds_df_trim_str_col_list', '') @@ -561,18 +563,18 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in jdbc_read_partitions_num = int((total_size_mb/1024)/2) else: jdbc_read_partitions_num = total_files - LOGGER.info("""jdbc_read_partitions_num = {jdbc_read_partitions_num}""") + LOGGER.info(f"""jdbc_read_partitions_num = {jdbc_read_partitions_num}""") rows_per_partition_v1 = int(df_rds_count/jdbc_read_partitions_num) - LOGGER.info("""rows_per_partition_v1 = {rows_per_partition_v1}""") + LOGGER.info(f"""rows_per_partition_v1 = {rows_per_partition_v1}""") rows_per_partition_v2 = int(pkey_max_value/jdbc_read_partitions_num) - LOGGER.info("""rows_per_partition_v2 = {rows_per_partition_v2}""") + LOGGER.info(f"""rows_per_partition_v2 = {rows_per_partition_v2}""") jdbc_partition_col_upperbound = rows_per_partition_v1 \ if rows_per_partition_v1 > rows_per_partition_v2 \ else rows_per_partition_v2 - LOGGER.info("""jdbc_partition_col_upperbound = {jdbc_partition_col_upperbound}""") + LOGGER.info(f"""jdbc_partition_col_upperbound = {jdbc_partition_col_upperbound}""") jdbc_rows_fetch_size = jdbc_partition_col_upperbound \ if args["rds_read_rows_fetch_size"] < jdbc_partition_col_upperbound \ From a834b76a137b0dd15e2d2445b2ca7f38ada6d512 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Thu, 27 Jun 2024 12:30:27 +0100 Subject: [PATCH 3/6] DV_GlueJob_V3_Altered-3 --- .../glue-job/dms_dv_rds_and_s3_parquet_write_v3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py index a127946314a..a13c74cbfbe 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py @@ -484,9 +484,9 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in LOGGER.error(f"""Global Dictionary - 'RECORDED_PKEYS_LIST' has no key '{rds_tbl_name}'!""") sys.exit(1) else: - LOGGER.info(f"""rds_db_tbl_pkeys_col_list = {rds_db_tbl_pkeys_col_list}""") rds_db_tbl_pkeys_col_list = [f"""{column.strip().strip("'").strip('"')}""" for column in args['rds_db_tbl_pkeys_col_list'].split(",")] + LOGGER.info(f"""rds_db_tbl_pkeys_col_list = {rds_db_tbl_pkeys_col_list}""") # ------------------------------------------------------- df_dv_output = get_pyspark_empty_df(df_dv_output_schema) From 16aa3da39a5e85a82f18b0bd20277293f0ca6417 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Thu, 27 Jun 2024 12:48:05 +0100 Subject: [PATCH 4/6] DV_GlueJob_V3_Altered-4 --- .../dms_data_validation_glue_job.tf | 6 ++++-- .../dms_dv_rds_and_s3_parquet_write_v3.py | 21 ++++++++++++------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index 824a00e3c5b..96d9e6defec 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -154,8 +154,10 @@ resource "aws_glue_job" "dms_dv_glue_job_v3" { "--rds_db_tbl_pkeys_col_list" = "" "--rds_df_trim_str_col_list" = "" "--rds_df_trim_micro_sec_ts_col_list" = "" - "--jdbc_read_500mb_partitions" = "false" - "--jdbc_read_1gb_partitions" = "true" + "--jdbc_read_128mb_partitions" = "false" + "--jdbc_read_256mb_partitions" = "false" + "--jdbc_read_512mb_partitions" = "false" + "--jdbc_read_1gb_partitions" = "false" "--jdbc_read_2gb_partitions" = "false" "--rds_read_rows_fetch_size" = 100000 "--dataframe_repartitions" = 0 diff --git a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py index a13c74cbfbe..732a774b901 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py @@ -55,7 +55,9 @@ def resolve_args(args_list): "rds_sqlserver_db_table", "rds_db_tbl_pkeys_col_list", "dataframe_repartitions", - "jdbc_read_500mb_partitions", + "jdbc_read_128mb_partitions", + "jdbc_read_256mb_partitions", + "jdbc_read_512mb_partitions", "jdbc_read_1gb_partitions", "jdbc_read_2gb_partitions", "rds_read_rows_fetch_size" @@ -555,8 +557,12 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in sys.exit(1) # ------------------------------------------------------- - if args.get("jdbc_read_500mb_partitions", "false") == "true": - jdbc_read_partitions_num = int(total_size_mb/500) + if args.get("jdbc_read_128mb_partitions", "false") == "true": + jdbc_read_partitions_num = int(total_size_mb/128) + elif args.get("jdbc_read_256mb_partitions", "false") == "true": + jdbc_read_partitions_num = int(total_size_mb/256) + elif args.get("jdbc_read_512mb_partitions", "false") == "true": + jdbc_read_partitions_num = int(total_size_mb/512) elif args.get("jdbc_read_1gb_partitions", "false") == "true": jdbc_read_partitions_num = int(total_size_mb/1024) elif args.get("jdbc_read_2gb_partitions", "false") == "true": @@ -576,9 +582,10 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in else rows_per_partition_v2 LOGGER.info(f"""jdbc_partition_col_upperbound = {jdbc_partition_col_upperbound}""") + rds_read_rows_fetch_size = int(args["rds_read_rows_fetch_size"]) jdbc_rows_fetch_size = jdbc_partition_col_upperbound \ - if args["rds_read_rows_fetch_size"] < jdbc_partition_col_upperbound \ - else args["rds_read_rows_fetch_size"] + if rds_read_rows_fetch_size < jdbc_partition_col_upperbound \ + else rds_read_rows_fetch_size LOGGER.info(f"""jdbc_rows_fetch_size = {jdbc_rows_fetch_size}""") @@ -606,7 +613,7 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in jdbc_rows_fetch_size)) LOGGER.info(f"""df_rds_temp-{rds_column}: READ PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""") - if args["dataframe_repartitions"] != 0: + if int(args["dataframe_repartitions"]) != 0: df_rds_temp = df_rds_temp.repartition(input_repartition_factor) LOGGER.info(f"""df_rds_temp-{rds_column}: RE-PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""") # ------------------------------------------------------- @@ -646,7 +653,7 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in .select(*temp_select_list)) LOGGER.info(f"""df_prq_temp-{rds_column}: READ PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""") - if args["dataframe_repartitions"] != 0: + if int(args["dataframe_repartitions"]) != 0: df_prq_temp = df_prq_temp.repartition(input_repartition_factor) LOGGER.info(f"""df_prq_temp-{rds_column}: RE-PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""") From dec01d6d9b2bf89c674401cf859f8d3bf70ad89e Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Thu, 27 Jun 2024 13:11:01 +0100 Subject: [PATCH 5/6] DV_GlueJob_V3_Altered-5 --- .../glue-job/dms_dv_rds_and_s3_parquet_write_v3.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py index 732a774b901..5ad98a17c73 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_rds_and_s3_parquet_write_v3.py @@ -558,17 +558,19 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in # ------------------------------------------------------- if args.get("jdbc_read_128mb_partitions", "false") == "true": - jdbc_read_partitions_num = int(total_size_mb/128) + int_partitions_evaluated = int(total_size_mb/128) elif args.get("jdbc_read_256mb_partitions", "false") == "true": - jdbc_read_partitions_num = int(total_size_mb/256) + int_partitions_evaluated = int(total_size_mb/256) elif args.get("jdbc_read_512mb_partitions", "false") == "true": - jdbc_read_partitions_num = int(total_size_mb/512) + int_partitions_evaluated = int(total_size_mb/512) elif args.get("jdbc_read_1gb_partitions", "false") == "true": - jdbc_read_partitions_num = int(total_size_mb/1024) + int_partitions_evaluated = int(total_size_mb/1024) elif args.get("jdbc_read_2gb_partitions", "false") == "true": - jdbc_read_partitions_num = int((total_size_mb/1024)/2) + int_partitions_evaluated = int((total_size_mb/1024)/2) else: - jdbc_read_partitions_num = total_files + int_partitions_evaluated = total_files + + jdbc_read_partitions_num = 1 if int_partitions_evaluated < 1 else int_partitions_evaluated LOGGER.info(f"""jdbc_read_partitions_num = {jdbc_read_partitions_num}""") rows_per_partition_v1 = int(df_rds_count/jdbc_read_partitions_num) From 06dc2a0e9b16dcc1d121c32edf474a22fb1703e0 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Thu, 27 Jun 2024 15:22:56 +0100 Subject: [PATCH 6/6] DV_GlueJob_V3_Altered-6 --- .../dms_data_validation_glue_job.tf | 8 +++----- .../glue-job/dms_dv_rds_and_s3_parquet_write_v3.py | 13 ++++++++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index 96d9e6defec..f6e33b3f552 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -113,7 +113,7 @@ resource "aws_glue_job" "dms_dv_glue_job_v2" { "--enable-auto-scaling" = "true" "--conf" = <