Skip to content

Commit

Permalink
Merge pull request #6768 from ministryofjustice/ELM_2041_DV_V3_GlueJo…
Browse files Browse the repository at this point in the history
…b_to_handle_bigdata_v4

DV_GlueJob_V3_Altered
  • Loading branch information
madhu-k-sr2 authored Jun 27, 2024
2 parents 8c0921a + 06dc2a0 commit aa1351f
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ resource "aws_glue_job" "dms_dv_glue_job_v2" {
"--enable-auto-scaling" = "true"
"--conf" = <<EOF
spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=2g
--conf spark.memory.offHeap.size=4g
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
Expand Down Expand Up @@ -154,8 +154,13 @@ resource "aws_glue_job" "dms_dv_glue_job_v3" {
"--rds_db_tbl_pkeys_col_list" = ""
"--rds_df_trim_str_col_list" = ""
"--rds_df_trim_micro_sec_ts_col_list" = ""
"--jdbc_read_128mb_partitions" = "false"
"--jdbc_read_256mb_partitions" = "false"
"--jdbc_read_512mb_partitions" = "false"
"--jdbc_read_1gb_partitions" = "false"
"--jdbc_read_2gb_partitions" = "false"
"--rds_read_rows_fetch_size" = 100000
"--dataframe_repartitions" = 8
"--dataframe_repartitions" = 0
"--parquet_src_bucket_name" = aws_s3_bucket.dms_target_ep_s3_bucket.id
"--parquet_output_bucket_name" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id
"--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name
Expand All @@ -168,14 +173,12 @@ resource "aws_glue_job" "dms_dv_glue_job_v3" {
"--enable-metrics" = "true"
"--enable-auto-scaling" = "true"
"--conf" = <<EOF
spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=8g
--conf spark.sql.adaptive.enabled=true
spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
--conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED
--conf spark.sql.parquet.aggregatePushdown=true
--conf spark.sql.files.maxPartitionBytes=1047527424
--conf spark.sql.files.maxPartitionBytes=256m
EOF

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
Expand All @@ -16,7 +17,13 @@

# ===============================================================================

sc = SparkContext.getOrCreate()
sc = SparkContext()
sc._jsc.hadoopConfiguration().set("spark.driver.memory", "4g")
sc._jsc.hadoopConfiguration().set("spark.memory.offHeap.enabled", "true")
sc._jsc.hadoopConfiguration().set("spark.memory.offHeap.size", "2g")
sc._jsc.hadoopConfiguration().set("spark.dynamicAllocation.enabled", "true")
sc._jsc.hadoopConfiguration().set("spark.dynamicAllocation.minExecutors", "2")

glueContext = GlueContext(sc)
spark = glueContext.spark_session

Expand Down Expand Up @@ -55,6 +62,11 @@ def resolve_args(args_list):
"rds_sqlserver_db_table",
"rds_db_tbl_pkeys_col_list",
"dataframe_repartitions",
"jdbc_read_128mb_partitions",
"jdbc_read_256mb_partitions",
"jdbc_read_512mb_partitions",
"jdbc_read_1gb_partitions",
"jdbc_read_2gb_partitions",
"rds_read_rows_fetch_size"
]

Expand Down Expand Up @@ -100,6 +112,8 @@ def resolve_args(args_list):
'timestamp': "to_timestamp('1900-01-01', 'yyyy-MM-dd')",
'date': "to_date('1900-01-01', 'yyyy-MM-dd')"}

INT_DATATYPES_LIST = ['tinyint', 'smallint', 'int', 'bigint']

"""
# Use the below query to fetch the existing primary keys defined in RDS-DB-Schema.
# -------------------------------------------------------------------------------------
Expand Down Expand Up @@ -221,23 +235,43 @@ def get_rds_db_table_row_count(in_rds_db_name,
.load()).collect()[0].row_count


def get_df_read_rds_db_query(in_rds_db_name,
in_table_name,
select_col_list,
num_of_jdbc_connections=None
) -> DataFrame:
def get_rds_db_table_pkey_col_max_value(in_rds_db_name, in_table_name,
in_pkey_col_name) -> DataFrame:
given_rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"]

num_of_rows_per_trip = args.get("rds_read_rows_fetch_size", None)
fetchSize = 10000 if num_of_rows_per_trip is None else num_of_rows_per_trip
# The JDBC fetch size, which determines how many rows to fetch per round trip.
# This can help performance on JDBC drivers which default to low fetch size (e.g. Oracle with 10 rows).
query_str = f"""
SELECT max({in_pkey_col_name}) as max_value
FROM {given_rds_sqlserver_db_schema}.[{in_table_name}]
""".strip()

return (spark.read.format("jdbc")
.option("url", get_rds_db_jdbc_url(in_rds_db_name))
.option("driver", RDS_DB_INSTANCE_DRIVER)
.option("user", RDS_DB_INSTANCE_USER)
.option("password", RDS_DB_INSTANCE_PWD)
.option("query", f"""{query_str}""")
.load()).collect()[0].max_value


def get_df_read_rds_db_tbl_int_pkey(in_rds_db_name, in_table_name, select_col_list,
jdbc_partition_column,
jdbc_partition_col_upperbound,
jdbc_read_partitions_num,
jdbc_rows_fetch_size
) -> DataFrame:
given_rds_sqlserver_db_schema = args["rds_sqlserver_db_schema"]

numPartitions = 8 if num_of_jdbc_connections is None else num_of_jdbc_connections
numPartitions = jdbc_read_partitions_num
# Note: numPartitions is normally equal to number of executors defined.
# The maximum number of partitions that can be used for parallelism in table reading and writing.
# This also determines the maximum number of concurrent JDBC connections.


fetchSize = jdbc_rows_fetch_size
# The JDBC fetch size, which determines how many rows to fetch per round trip.
# This can help performance on JDBC drivers which default to low fetch size (e.g. Oracle with 10 rows).
# Too Small: => frequent round trips to database
# Too Large: => Consume a lot of memory

query_str = f"""
SELECT {', '.join(select_col_list)}
FROM {given_rds_sqlserver_db_schema}.[{in_table_name}]
Expand All @@ -252,9 +286,12 @@ def get_df_read_rds_db_query(in_rds_db_name,
.option("driver", RDS_DB_INSTANCE_DRIVER)
.option("user", RDS_DB_INSTANCE_USER)
.option("password", RDS_DB_INSTANCE_PWD)
.option("query", f"""{query_str} as t""")
.option("fetchSize", fetchSize)
.option("dbtable", f"""({query_str}) as t""")
.option("partitionColumn", jdbc_partition_column)
.option("lowerBound", "0")
.option("upperBound", jdbc_partition_col_upperbound)
.option("numPartitions", numPartitions)
.option("fetchSize", fetchSize)
.load())


Expand Down Expand Up @@ -458,12 +495,12 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in
else:
rds_db_tbl_pkeys_col_list = [f"""{column.strip().strip("'").strip('"')}"""
for column in args['rds_db_tbl_pkeys_col_list'].split(",")]
LOGGER.info(f"""rds_db_tbl_pkeys_col_list = {rds_db_tbl_pkeys_col_list}""")
# -------------------------------------------------------

df_dv_output = get_pyspark_empty_df(df_dv_output_schema)

rds_db_table_empty_df = get_rds_db_table_empty_df(rds_db_name, rds_tbl_name)
df_rds_columns_list = rds_db_table_empty_df.columns

df_rds_count = get_rds_db_table_row_count(rds_db_name,
rds_tbl_name,
Expand All @@ -488,6 +525,8 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in
LOGGER.info(final_validation_msg)
df_dv_output = df_dv_output.union(df_temp_row)
return df_dv_output
else:
LOGGER.info(f"""df_rds_count = {df_rds_count}""")
# -------------------------------------------------------

rds_df_trim_str_col_str = args.get('rds_df_trim_str_col_list', '')
Expand All @@ -510,6 +549,55 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in
transform_msg_2 =f"""- micro-seconds trimmed."""
# -------------------------------------------------------

df_rds_columns_list = rds_db_table_empty_df.columns
df_rds_dtype_dict = get_dtypes_dict(rds_db_table_empty_df)
int_dtypes_colname_list = [colname for colname, dtype in df_rds_dtype_dict.items()
if dtype in INT_DATATYPES_LIST]
if len(rds_db_tbl_pkeys_col_list) == 1 and \
(rds_db_tbl_pkeys_col_list[0] in int_dtypes_colname_list):
jdbc_partition_column = rds_db_tbl_pkeys_col_list[0]
pkey_max_value = get_rds_db_table_pkey_col_max_value(rds_db_name, rds_tbl_name,
jdbc_partition_column)
else:
LOGGER.error(f"""int_dtypes_colname_list = {int_dtypes_colname_list}""")
LOGGER.error(f"""PrimaryKey column(s) are more than one (OR) not an integer datatype column!""")
sys.exit(1)
# -------------------------------------------------------

if args.get("jdbc_read_128mb_partitions", "false") == "true":
int_partitions_evaluated = int(total_size_mb/128)
elif args.get("jdbc_read_256mb_partitions", "false") == "true":
int_partitions_evaluated = int(total_size_mb/256)
elif args.get("jdbc_read_512mb_partitions", "false") == "true":
int_partitions_evaluated = int(total_size_mb/512)
elif args.get("jdbc_read_1gb_partitions", "false") == "true":
int_partitions_evaluated = int(total_size_mb/1024)
elif args.get("jdbc_read_2gb_partitions", "false") == "true":
int_partitions_evaluated = int((total_size_mb/1024)/2)
else:
int_partitions_evaluated = total_files

jdbc_read_partitions_num = 1 if int_partitions_evaluated < 1 else int_partitions_evaluated
LOGGER.info(f"""jdbc_read_partitions_num = {jdbc_read_partitions_num}""")

rows_per_partition_v1 = int(df_rds_count/jdbc_read_partitions_num)
LOGGER.info(f"""rows_per_partition_v1 = {rows_per_partition_v1}""")

rows_per_partition_v2 = int(pkey_max_value/jdbc_read_partitions_num)
LOGGER.info(f"""rows_per_partition_v2 = {rows_per_partition_v2}""")

jdbc_partition_col_upperbound = rows_per_partition_v1 \
if rows_per_partition_v1 > rows_per_partition_v2 \
else rows_per_partition_v2
LOGGER.info(f"""jdbc_partition_col_upperbound = {jdbc_partition_col_upperbound}""")

rds_read_rows_fetch_size = int(args["rds_read_rows_fetch_size"])
jdbc_rows_fetch_size = jdbc_partition_col_upperbound \
if rds_read_rows_fetch_size < jdbc_partition_col_upperbound \
else rds_read_rows_fetch_size

LOGGER.info(f"""jdbc_rows_fetch_size = {jdbc_rows_fetch_size}""")

validated_colmn_msg_list = list()

for_loop_count = 0
Expand All @@ -525,12 +613,18 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in
LOGGER.info(f"""{for_loop_count}-Processing - {rds_tbl_name}.{rds_column}.""")
LOGGER.info(f"""Using Dataframe-'select' column list: {temp_select_list}""")

df_rds_temp = (get_df_read_rds_db_query(rds_db_name, rds_tbl_name, temp_select_list,
num_of_jdbc_connections=total_files))
df_rds_temp = (get_df_read_rds_db_tbl_int_pkey(rds_db_name,
rds_tbl_name,
temp_select_list,
jdbc_partition_column,
jdbc_partition_col_upperbound,
jdbc_read_partitions_num,
jdbc_rows_fetch_size))
LOGGER.info(f"""df_rds_temp-{rds_column}: READ PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""")

df_rds_temp = df_rds_temp.repartition(input_repartition_factor)
LOGGER.info(f"""df_rds_temp-{rds_column}: RE-PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""")
if int(args["dataframe_repartitions"]) != 0:
df_rds_temp = df_rds_temp.repartition(jdbc_partition_column, input_repartition_factor)
LOGGER.info(f"""df_rds_temp-{rds_column}: RE-PARTITIONS = {df_rds_temp.rdd.getNumPartitions()}""")
# -------------------------------------------------------

t1_rds_str_col_trimmed = False
Expand Down Expand Up @@ -568,8 +662,9 @@ def process_dv_for_table(rds_db_name, db_sch_tbl, total_files, total_size_mb, in
.select(*temp_select_list))
LOGGER.info(f"""df_prq_temp-{rds_column}: READ PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""")

df_prq_temp = df_prq_temp.repartition(input_repartition_factor)
LOGGER.info(f"""df_prq_temp-{rds_column}: RE-PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""")
if int(args["dataframe_repartitions"]) != 0:
df_prq_temp = df_prq_temp.repartition(jdbc_partition_column, input_repartition_factor)
LOGGER.info(f"""df_prq_temp-{rds_column}: RE-PARTITIONS = {df_prq_temp.rdd.getNumPartitions()}""")

df_prq_temp_t1 = df_prq_temp.selectExpr(*get_nvl_select_list(df_rds_temp, rds_db_name, rds_tbl_name))

Expand Down

0 comments on commit aa1351f

Please sign in to comment.