Skip to content

Commit

Permalink
Merge pull request #7091 from ministryofjustice/DV_GlueJobV4c_Cache_R…
Browse files Browse the repository at this point in the history
…DS_DF

DV_GluJob_V4c-Archived_V4d_Created
  • Loading branch information
madhu-k-sr2 authored Jul 16, 2024
2 parents 77f9e4a + 75ea4d5 commit 4b14b89
Show file tree
Hide file tree
Showing 6 changed files with 1,017 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,11 @@ resource "aws_s3_object" "dms_dv_glue_job_s3_object_v2" {
etag = filemd5("glue-job/dms_dv_rds_and_s3_parquet_write_v2.py")
}

resource "aws_s3_object" "dms_dv_glue_job_s3_object_v4a" {
resource "aws_s3_object" "dms_dv_glue_job_s3_object_v4d" {
bucket = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id
key = "dms_dv_rds_and_s3_parquet_write_v4a.py"
source = "glue-job/dms_dv_rds_and_s3_parquet_write_v4a.py"
etag = filemd5("glue-job/dms_dv_rds_and_s3_parquet_write_v4a.py")
}

resource "aws_s3_object" "dms_dv_glue_job_s3_object_v4b" {
bucket = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id
key = "dms_dv_rds_and_s3_parquet_write_v4b.py"
source = "glue-job/dms_dv_rds_and_s3_parquet_write_v4b.py"
etag = filemd5("glue-job/dms_dv_rds_and_s3_parquet_write_v4b.py")
}
resource "aws_s3_object" "dms_dv_glue_job_s3_object_v4c" {
bucket = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id
key = "dms_dv_rds_and_s3_parquet_write_v4c.py"
source = "glue-job/dms_dv_rds_and_s3_parquet_write_v4c.py"
etag = filemd5("glue-job/dms_dv_rds_and_s3_parquet_write_v4c.py")
key = "dms_dv_rds_and_s3_parquet_write_v4d.py"
source = "glue-job/dms_dv_rds_and_s3_parquet_write_v4d.py"
etag = filemd5("glue-job/dms_dv_rds_and_s3_parquet_write_v4d.py")
}

resource "aws_s3_object" "catalog_dv_table_glue_job_s3_object" {
Expand Down Expand Up @@ -151,141 +138,35 @@ EOF
)

}
# Note: Make sure 'max_table_size_mb' and 'spark.sql.files.maxPartitionBytes' values are the same.

# "--enable-spark-ui" = "false"
# "--spark-ui-event-logs-path" = "false"
# "--spark-event-logs-path" = "s3://${aws_s3_bucket.dms_dv_glue_job_s3_bucket.id}/spark_logs/"

resource "aws_glue_job" "dms_dv_glue_job_v4a" {
name = "dms-dv-glue-job-v4a"
description = "DMS Data Validation Glue-Job (PySpark)."
role_arn = aws_iam_role.dms_dv_glue_job_iam_role.arn
glue_version = "4.0"
worker_type = "G.1X"
number_of_workers = 16
default_arguments = {
"--script_bucket_name" = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id
"--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0]
"--rds_db_pwd" = aws_db_instance.database_2022.password
"--rds_sqlserver_db" = ""
"--rds_sqlserver_db_schema" = ""
"--rds_sqlserver_db_table" = ""
"--rds_db_tbl_pkeys_col_list" = ""
"--rds_df_trim_str_col_list" = ""
"--rds_df_trim_micro_sec_ts_col_list" = ""
"--jdbc_read_256mb_partitions" = "true"
"--jdbc_read_512mb_partitions" = "false"
"--jdbc_read_1gb_partitions" = "false"
"--rds_read_rows_fetch_size" = 100000
"--parquet_src_bucket_name" = aws_s3_bucket.dms_target_ep_s3_bucket.id
"--parquet_output_bucket_name" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id
"--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name
"--glue_catalog_tbl_name" = "glue_df_output"
"--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.dms_dv_cw_log_group.name}"
"--enable-continuous-cloudwatch-log" = "true"
"--enable-continuous-log-filter" = "true"
"--enable-metrics" = "true"
"--enable-auto-scaling" = "true"
"--conf" = <<EOF
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED
--conf spark.sql.parquet.aggregatePushdown=true
--conf spark.sql.shuffle.partitions=2001
--conf spark.sql.files.maxPartitionBytes=128m
EOF

}

connections = [aws_glue_connection.glue_rds_sqlserver_db_connection.name]
command {
python_version = "3"
script_location = "s3://${aws_s3_bucket.dms_dv_glue_job_s3_bucket.id}/dms_dv_rds_and_s3_parquet_write_v4a.py"
}

tags = merge(
local.tags,
{
Resource_Type = "Glue-Job that processes data sourced from both RDS and S3",
}
)

}

resource "aws_glue_job" "dms_dv_glue_job_v4b" {
name = "dms-dv-glue-job-v4b"
description = "DMS Data Validation Glue-Job (PySpark)."
role_arn = aws_iam_role.dms_dv_glue_job_iam_role.arn
glue_version = "4.0"
worker_type = "G.1X"
number_of_workers = 16
default_arguments = {
"--script_bucket_name" = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id
"--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0]
"--rds_db_pwd" = aws_db_instance.database_2022.password
"--rds_sqlserver_db" = ""
"--rds_sqlserver_db_schema" = ""
"--rds_sqlserver_db_table" = ""
"--rds_db_tbl_pkeys_col_list" = ""
"--rds_df_trim_str_col_list" = ""
"--rds_df_trim_micro_sec_ts_col_list" = ""
"--jdbc_read_256mb_partitions" = "true"
"--jdbc_read_512mb_partitions" = "false"
"--jdbc_read_1gb_partitions" = "false"
"--rds_read_rows_fetch_size" = 100000
"--parquet_src_bucket_name" = aws_s3_bucket.dms_target_ep_s3_bucket.id
"--parquet_output_bucket_name" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id
"--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name
"--glue_catalog_tbl_name" = "glue_df_output"
"--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.dms_dv_cw_log_group.name}"
"--enable-continuous-cloudwatch-log" = "true"
"--enable-continuous-log-filter" = "true"
"--enable-metrics" = "true"
"--enable-auto-scaling" = "true"
"--conf" = <<EOF
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED
--conf spark.sql.parquet.aggregatePushdown=true
--conf spark.sql.shuffle.partitions=2001
--conf spark.sql.files.maxPartitionBytes=128m
EOF

}

connections = [aws_glue_connection.glue_rds_sqlserver_db_connection.name]
command {
python_version = "3"
script_location = "s3://${aws_s3_bucket.dms_dv_glue_job_s3_bucket.id}/dms_dv_rds_and_s3_parquet_write_v4b.py"
}

tags = merge(
local.tags,
{
Resource_Type = "Glue-Job that processes data sourced from both RDS and S3",
}
)

}


resource "aws_glue_job" "dms_dv_glue_job_v4c" {
name = "dms-dv-glue-job-v4c"
resource "aws_glue_job" "dms_dv_glue_job_v4d" {
name = "dms-dv-glue-job-v4d"
description = "DMS Data Validation Glue-Job (PySpark)."
role_arn = aws_iam_role.dms_dv_glue_job_iam_role.arn
glue_version = "4.0"
worker_type = "G.1X"
number_of_workers = 16
worker_type = "G.2X"
number_of_workers = 5
default_arguments = {
"--script_bucket_name" = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id
"--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0]
"--rds_db_pwd" = aws_db_instance.database_2022.password
"--parquet_df_repartition_num" = 180
"--rds_df_repartition_num" = 0
"--parallel_jdbc_conn_num" = 45
"--prq_leftanti_join_rds" = "false"
"--parquet_df_repartition_num" = 32
"--parallel_jdbc_conn_num" = 4
"--rds_df_repartition_num" = 16
"--rds_upperbound_factor" = 8
"--rds_sqlserver_db" = ""
"--rds_sqlserver_db_schema" = ""
"--rds_sqlserver_db_table" = ""
"--rds_db_tbl_pkeys_col_list" = ""
"--rds_df_trim_str_col_list" = ""
"--rds_df_trim_str_columns" = "false"
"--rds_df_trim_micro_sec_ts_col_list" = ""
"--rds_upperbound_factor" = 45
"--parquet_src_bucket_name" = aws_s3_bucket.dms_target_ep_s3_bucket.id
"--parquet_output_bucket_name" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id
"--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name
Expand All @@ -299,15 +180,15 @@ resource "aws_glue_job" "dms_dv_glue_job_v4c" {
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED
--conf spark.sql.parquet.aggregatePushdown=true
--conf spark.sql.shuffle.partitions=2001
--conf spark.sql.files.maxPartitionBytes=128m
--conf spark.sql.files.maxPartitionBytes=1g
EOF

}

connections = [aws_glue_connection.glue_rds_sqlserver_db_connection.name]
command {
python_version = "3"
script_location = "s3://${aws_s3_bucket.dms_dv_glue_job_s3_bucket.id}/dms_dv_rds_and_s3_parquet_write_v4c.py"
script_location = "s3://${aws_s3_bucket.dms_dv_glue_job_s3_bucket.id}/dms_dv_rds_and_s3_parquet_write_v4d.py"
}

tags = merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,19 +709,19 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb) ->
msg_prefix = f"""df_rds_temp_t3-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: >> RE-PARTITIONING on {jdbc_partition_column} <<""")
df_rds_temp_t4 = df_rds_temp_t3.repartition(rds_df_repartition_num,
jdbc_partition_column)
jdbc_partition_column).cache()

msg_prefix = f"""df_rds_temp_t4-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: RDS-DF-Partitions = {df_rds_temp_t4.rdd.getNumPartitions()}""")
else:
df_rds_temp_t4 = df_rds_temp_t3.alias("df_rds_temp_t4")
df_rds_temp_t4 = df_rds_temp_t3.alias("df_rds_temp_t4").cache()

df_rds_temp_t4_count = df_rds_temp_t4.count()

# -------------------------------------------------------------------------------------------

df_filter_exp = f"""{jdbc_partition_column} between {jdbc_partition_col_lowerbound} and {jdbc_partition_col_upperbound}"""
df_prq_filtered_t3 = df_prq_read_t2.where(df_filter_exp)
df_prq_filtered_t3 = df_prq_read_t2.where(df_filter_exp).cache()
df_prq_filtered_t3_count = df_prq_filtered_t3.count()

if df_rds_temp_t4_count == df_prq_filtered_t3_count:
Expand Down Expand Up @@ -767,11 +767,14 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb) ->
LOGGER.warn(f"{loop_count}-Validation Failed - 2")
df_dv_output = df_dv_output.union(df_subtract_temp)
# -----------------------------------------------------

else:
LOGGER.error(f"""df_rds_temp_t4_count ({df_rds_temp_t4_count}) != df_prq_filtered_t3_count ({df_prq_filtered_t3_count})""")
sys.exit(1)
# -------------

df_prq_filtered_t3.unpersist(True)
df_rds_temp_t4.unpersist(True)
current_processed_rows += df_rds_temp_t4_count

else:
Expand Down Expand Up @@ -803,18 +806,21 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb) ->
msg_prefix = f"""df_rds_temp_t3-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: >> RE-PARTITIONING on {jdbc_partition_column} <<""")
df_rds_temp_t4 = df_rds_temp_t3.repartition(int(rds_df_repartition_num/2),
jdbc_partition_column)
jdbc_partition_column).cache()

msg_prefix = f"""df_rds_temp_t4-{rds_tbl_name}"""
LOGGER.info(f"""{loop_count}-{msg_prefix}: RDS-DF-Partitions = {df_rds_temp_t4.rdd.getNumPartitions()}""")
else:
df_rds_temp_t4 = df_rds_temp_t3.alias("df_rds_temp_t4")
df_rds_temp_t4 = df_rds_temp_t3.alias("df_rds_temp_t4").cache()

df_rds_temp_t4_count = df_rds_temp_t4.count()


df_filter_exp = f"""{jdbc_partition_column} between {jdbc_partition_col_lowerbound} and {jdbc_partition_col_upperbound}"""
df_prq_filtered_t3 = df_prq_read_t2.where(df_filter_exp).coalesce(int(parquet_df_repartition_num/2))
df_prq_filtered_t3 = df_prq_read_t2\
.where(df_filter_exp).coalesce(int(parquet_df_repartition_num/2))\
.cache()

df_prq_filtered_t3_count = df_prq_filtered_t3.count()

if df_rds_temp_t4_count == df_prq_filtered_t3_count:
Expand Down Expand Up @@ -849,6 +855,8 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb) ->
sys.exit(1)
# ---------------------------------------------------------

df_prq_filtered_t3.unpersist(True)
df_rds_temp_t4.unpersist(True)
current_processed_rows += df_rds_temp_t4_count

LOGGER.info(f"""Total RDS fetch batch count: {loop_count}""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,8 @@ def process_dv_for_table(rds_db_name,
trim_str_msg = ""
t2_rds_str_col_trimmed = False
if args.get("rds_df_trim_str_columns", "false") == "true":
msg_prefix = f"""Given -> rds_df_trim_str_columns = 'true'"""
LOGGER.info(f"""{msg_prefix}. Stripping string column spaces.""")
LOGGER.info(f"""Given -> rds_df_trim_str_columns = 'true'""")
LOGGER.warn(f""">> Stripping string column spaces <<""")

df_rds_temp_t2 = df_rds_temp_t1.transform(rds_df_trim_str_columns)

Expand Down
Loading

0 comments on commit 4b14b89

Please sign in to comment.