diff --git a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf index 1c4ab9f49ef..6a2983a398d 100644 --- a/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf +++ b/terraform/environments/electronic-monitoring-data/dms_data_validation_glue_job.tf @@ -222,38 +222,41 @@ resource "aws_glue_job" "rds_to_s3_parquet_migration" { worker_type = "G.2X" number_of_workers = 5 default_arguments = { - "--script_bucket_name" = aws_s3_bucket.dms_dv_glue_job_s3_bucket.id - "--rds_db_host_ep" = split(":", aws_db_instance.database_2022.endpoint)[0] - "--rds_db_pwd" = aws_db_instance.database_2022.password - "--rds_sqlserver_db" = "" - "--rds_sqlserver_db_schema" = "dbo" - "--rds_sqlserver_db_table" = "" - "--rds_db_tbl_pkeys_col_list" = "" - "--rds_table_total_size_mb" = "" - "--rds_table_total_rows" = "" - "--date_partition_column_name" = "" - "--other_partitionby_columns" = "" - "--validation_sample_fraction_float" = 0 - "--validation_sample_df_repartition" = 0 - "--jdbc_read_256mb_partitions" = "false" - "--jdbc_read_512mb_partitions" = "false" - "--jdbc_read_1gb_partitions" = "false" - "--jdbc_read_2gb_partitions" = "false" - "--rename_migrated_prq_tbl_folder" = "" - "--year_partition" = "false" - "--month_partition" = "false" - "--day_partition" = "false" - "--rds_to_parquet_output_s3_bucket" = aws_s3_bucket.dms_target_ep_s3_bucket.id - "--dv_parquet_output_s3_bucket" = aws_s3_bucket.dms_dv_parquet_s3_bucket.id - "--glue_catalog_db_name" = aws_glue_catalog_database.dms_dv_glue_catalog_db.name - "--glue_catalog_tbl_name" = "glue_df_output" - "--continuous-log-logGroup" = "/aws-glue/jobs/${aws_cloudwatch_log_group.rds_to_s3_parquet_migration.name}" - "--enable-continuous-cloudwatch-log" = "true" - "--enable-continuous-log-filter" = "true" - "--enable-metrics" = "true" - "--enable-auto-scaling" = "true" - "--conf" = < DataFrame: numPartitions = jdbc_read_partitions_num @@ -213,7 +214,7 @@ def get_df_read_rds_db_tbl_int_pkey(self, .option("password", RDS_DB_INSTANCE_PWD) .option("dbtable", f"""({query_str}) as t""") .option("partitionColumn", jdbc_partition_column) - .option("lowerBound", 0) + .option("lowerBound", jdbc_partition_col_lowerbound) .option("upperBound", jdbc_partition_col_upperbound) .option("numPartitions", numPartitions) .load()) @@ -256,6 +257,29 @@ def get_rds_db_table_empty_df(self) -> DataFrame: .option("query", f"""{query_str}""") .load()) + # def get_min_max_pkey_given_year(self, + # pkey_col_name, + # year, + # agg_str) -> DataFrame: + + # if agg_str not in ['min', 'max']: + # raise ValueError(""">> The 'aggregate' function must be either 'min' or 'max' <<""") + # sys.exit(1) + + # query_str = f""" + # SELECT {agg_str}({pkey_col_name}) as agg_value + # FROM {self.rds_db_schema_name}.[{self.rds_db_table_name}] + # WHERE YEAR(RecordedDatetime) = {year} + # """.strip() + + # return (spark.read.format("jdbc") + # .option("url", self.rds_jdbc_url_v2) + # .option("driver", RDS_DB_INSTANCE_DRIVER) + # .option("user", RDS_DB_INSTANCE_USER) + # .option("password", RDS_DB_INSTANCE_PWD) + # .option("query", f"""{query_str}""") + # .load()).collect()[0].agg_value + # --------------------------------------------------------------------- # PYTHON CLASS 'RDS_JDBC_CONNECTION' - END # --------------------------------------------------------------------- @@ -319,6 +343,50 @@ def check_s3_folder_path_if_exists(in_bucket_name, in_folder_path): # ================================================================== +def write_rds_df_to_s3_parquet_v2(df_rds_read: DataFrame, + partition_by_cols, + table_folder_path): + """ + Write dynamic frame in S3 and catalog it. + """ + + # s3://dms-rds-to-parquet-20240606144708618700000001/g4s_emsys_mvp/dbo/GPSPosition/ + + s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}""" + + # if check_s3_folder_path_if_exists(PARQUET_OUTPUT_S3_BUCKET_NAME, table_folder_path): + + # LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}""") + # glueContext.purge_s3_path(s3_table_folder_path, options={"retentionPeriod": 0}) + # # -------------------------------------------------------------------- + + dynamic_df_write = glueContext.getSink( + format_options = { + "compression": "snappy", + "useGlueParquetWriter": True + }, + path = f"""{s3_table_folder_path}/""", + connection_type = "s3", + updateBehavior = "UPDATE_IN_DATABASE", + partitionKeys = partition_by_cols, + enableUpdateCatalog = True, + transformation_ctx = "dynamic_df_write", + ) + + catalog_db, catalog_db_tbl = table_folder_path.split(f"""/{args['rds_sqlserver_db_schema']}/""") + dynamic_df_write.setCatalogInfo( + catalogDatabase = catalog_db.lower(), + catalogTableName = catalog_db_tbl.lower() + ) + + dynamic_df_write.setFormat("glueparquet") + + dydf_rds_read = DynamicFrame.fromDF(df_rds_read, glueContext, "final_spark_df") + dynamic_df_write.writeFrame(dydf_rds_read) + + LOGGER.info(f"""{db_sch_tbl} table data written to -> {s3_table_folder_path}/""") + + def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, partition_by_cols, table_folder_path): @@ -333,6 +401,8 @@ def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, glueContext.purge_s3_path(s3_table_folder_path, options={"retentionPeriod": 0}) # -------------------------------------------------------------------- + # catalog_db, catalog_db_tbl = table_folder_path.split(f"""/{args['rds_sqlserver_db_schema']}/""") + dydf = DynamicFrame.fromDF(df_rds_read, glueContext, "final_spark_df") glueContext.write_dynamic_frame.from_options(frame=dydf, connection_type='s3', format='parquet', @@ -346,7 +416,7 @@ def write_rds_df_to_s3_parquet(df_rds_read: DataFrame, 'blockSize': 13421773, 'pageSize': 1048576 }) - LOGGER.info(f"""{db_sch_tbl} table data written to -> {s3_table_folder_path}/""") + LOGGER.info(f"""'{db_sch_tbl}' table data written to -> {s3_table_folder_path}/""") def compare_rds_parquet_samples(rds_jdbc_conn_obj, @@ -368,10 +438,10 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, s3_table_folder_path = f"""s3://{PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}""" LOGGER.info(f"""Parquet Source being used for comparison: {s3_table_folder_path}""") - df_parquet_read = spark.read.schema(df_rds_read.schema).parquet(s3_table_folder_path).cache() + df_parquet_read = spark.read.schema(df_rds_read.schema).parquet(s3_table_folder_path) df_compare_columns_list = [col for col in df_parquet_read.columns - if col not in ['year', 'month', 'day']] + if col not in ['year', 'month', 'day']] df_parquet_read_sample = df_parquet_read.sample(validation_sample_fraction_float)\ .select(*df_compare_columns_list) @@ -380,9 +450,9 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, *get_nvl_select_list(rds_jdbc_conn_obj, df_parquet_read_sample)) - validation_sample_df_repartition = int(args['validation_sample_df_repartition']) - if validation_sample_df_repartition != 0: - df_parquet_read_sample_t1 = df_parquet_read_sample_t1.repartition(validation_sample_df_repartition, + validation_sample_df_repartition_num = int(args['validation_sample_df_repartition_num']) + if validation_sample_df_repartition_num != 0: + df_parquet_read_sample_t1 = df_parquet_read_sample_t1.repartition(validation_sample_df_repartition_num, jdbc_partition_column) # -------- @@ -394,8 +464,8 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, df_rds_read_sample_t1 = df_rds_read_sample.selectExpr( *get_nvl_select_list(rds_jdbc_conn_obj, df_rds_read_sample)) - if validation_sample_df_repartition != 0: - df_rds_read_sample_t1 = df_rds_read_sample_t1.repartition(validation_sample_df_repartition, + if validation_sample_df_repartition_num != 0: + df_rds_read_sample_t1 = df_rds_read_sample_t1.repartition(validation_sample_df_repartition_num, jdbc_partition_column) # -------- @@ -454,26 +524,25 @@ def compare_rds_parquet_samples(rds_jdbc_conn_obj, df_dv_output = df_dv_output.union(df_subtract_temp) # ----------------------------------------------------- - df_parquet_read.unpersist(True) - return df_dv_output -def write_to_s3_parquet(df_dv_output: DataFrame, - database, +def write_to_s3_parquet(df_dv_output: DataFrame, + rds_jdbc_conn_obj, db_sch_tbl_name): + db_name = rds_jdbc_conn_obj.rds_db_name df_dv_output = df_dv_output.repartition(1) table_folder_path = f"""{args["glue_catalog_db_name"]}/{args["glue_catalog_tbl_name"]}""" s3_table_folder_path = f'''s3://{DV_PARQUET_OUTPUT_S3_BUCKET_NAME}/{table_folder_path}''' if check_s3_folder_path_if_exists(DV_PARQUET_OUTPUT_S3_BUCKET_NAME, - f'''{table_folder_path}/database_name={database}/full_table_name={db_sch_tbl_name}''' + f'''{table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}''' ): - LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}/database_name={database}/full_table_name={db_sch_tbl_name}""") + LOGGER.info(f"""Purging S3-path: {s3_table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}""") - glueContext.purge_s3_path(f"""{s3_table_folder_path}/database_name={database}/full_table_name={db_sch_tbl_name}""", + glueContext.purge_s3_path(f"""{s3_table_folder_path}/database_name={db_name}/full_table_name={db_sch_tbl_name}""", options={"retentionPeriod": 0} ) # --------------------------------------------------------------------- @@ -491,7 +560,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, 'blockSize': 13421773, 'pageSize': 1048576 }) - LOGGER.info(f"""{db_sch_tbl_name} validation report written to -> {s3_table_folder_path}/""") + LOGGER.info(f"""'{db_sch_tbl_name}' validation report written to -> {s3_table_folder_path}/""") # =================================================================================================== @@ -525,7 +594,6 @@ def write_to_s3_parquet(df_dv_output: DataFrame, rds_jdbc_conn_obj = RDS_JDBC_CONNECTION(args['rds_sqlserver_db'], args['rds_sqlserver_db_schema'], args['rds_sqlserver_db_table']) - # ------------------------------------------- try: @@ -562,6 +630,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, jdbc_read_partitions_num = int(int(int(args['rds_table_total_size_mb'])/1024)/2) else: jdbc_read_partitions_num = 1 + # ------------------------------ jdbc_read_partitions_num = 1 if jdbc_read_partitions_num <= 0 \ else jdbc_read_partitions_num @@ -573,8 +642,7 @@ def write_to_s3_parquet(df_dv_output: DataFrame, df_rds_dtype_dict = get_dtypes_dict(rds_db_table_empty_df) int_dtypes_colname_list = [colname for colname, dtype in df_rds_dtype_dict.items() if dtype in INT_DATATYPES_LIST] - # ------------------------------------------------------- - + if args.get('rds_db_tbl_pkeys_col_list', None) is None: try: rds_db_tbl_pkeys_col_list = [column.strip() @@ -609,40 +677,51 @@ def write_to_s3_parquet(df_dv_output: DataFrame, jdbc_read_partitions_num)) LOGGER.info(f"""df_rds_read-{db_sch_tbl}: READ PARTITIONS = {df_rds_read.rdd.getNumPartitions()}""") - partition_by_cols = list() - - if args.get('date_partition_column_name', None) is not None: - given_date_column = args['date_partition_column_name'] - LOGGER.info(f"""given_date_column = {given_date_column}""") - - if args['year_partition'] == 'true': - df_rds_read = df_rds_read.withColumn("year", F.year(given_date_column)) - partition_by_cols.append("year") - - if args['month_partition'] == 'true': - df_rds_read = df_rds_read.withColumn("month", F.month(given_date_column)) - partition_by_cols.append("month") - - if args['day_partition'] == 'true': - df_rds_read = df_rds_read.withColumn("day", F.dayofmonth(given_date_column)) - partition_by_cols.append("day") - - #df_rds_read = df_rds_read.repartition("year", "month", "day") - # ---------------------------------------------------- - - if args.get('other_partitionby_columns', None) is not None: - other_partitionby_columns = [f"""{column.strip().strip("'").strip('"')}""" - for column in args['other_partitionby_columns'].split(",")] - LOGGER.info(f"""other_partitionby_columns = {other_partitionby_columns}""") - partition_by_cols.extend(other_partitionby_columns) - # ---------------------------------------------------- - - if partition_by_cols: - LOGGER.info(f"""df_rds_read-Repartitioning on columns: {partition_by_cols}""") - df_rds_read = df_rds_read.repartition(jdbc_read_partitions_num, *partition_by_cols).cache() + rds_df_repartition_num = int(args['rds_df_repartition_num']) + if rds_df_repartition_num != 0: + LOGGER.info(f"""df_rds_read-Repartitioning: {rds_df_repartition_num}""") + df_rds_read = df_rds_read.repartition(rds_df_repartition_num, + jdbc_partition_column) + # --------------------------------------------------------------- + + validation_only_run = args['validation_only_run'] + if validation_only_run != "true": + partition_by_cols = list() + + if args.get('date_partition_column_name', None) is not None: + given_date_column = args['date_partition_column_name'] + LOGGER.info(f"""given_date_column = {given_date_column}""") + + if args['year_partition_bool'] == 'true': + df_rds_read = df_rds_read.withColumn("year", F.year(given_date_column)) + partition_by_cols.append("year") + + if args['month_partition_bool'] == 'true': + df_rds_read = df_rds_read.withColumn("month", F.month(given_date_column)) + partition_by_cols.append("month") + + if args['day_partition_bool'] == 'true': + df_rds_read = df_rds_read.withColumn("day", F.dayofmonth(given_date_column)) + partition_by_cols.append("day") + + #df_rds_read = df_rds_read.repartition("year", "month", "day") + # ---------------------------------------------------- + + if args.get('other_partitionby_columns', None) is not None: + other_partitionby_columns = [f"""{column.strip().strip("'").strip('"')}""" + for column in args['other_partitionby_columns'].split(",")] + LOGGER.info(f"""other_partitionby_columns = {other_partitionby_columns}""") + partition_by_cols.extend(other_partitionby_columns) + # ---------------------------------------------------- + + if partition_by_cols: + orderby_columns = partition_by_cols + [jdbc_read_partitions_num] + LOGGER.info(f"""df_rds_read-OrderBy on partitionBy columns: {partition_by_cols}""") + df_rds_read = df_rds_read.orderBy(*orderby_columns) + # ----------------------------------- else: - df_rds_read = df_rds_read.cache() - # ----------------------------------- + LOGGER.info(f"""** validation_only_run = '{validation_only_run}' **""") + # --------------------------------------- rename_output_table_folder = args.get('rename_migrated_prq_tbl_folder', '') if rename_output_table_folder == '': @@ -651,10 +730,13 @@ def write_to_s3_parquet(df_dv_output: DataFrame, table_folder_path = f"""{rds_db_name}/{rds_sqlserver_db_schema}/{rename_output_table_folder}""" # --------------------------------------- + df_rds_read = df_rds_read.cache() - write_rds_df_to_s3_parquet(df_rds_read, - partition_by_cols, - table_folder_path) + if validation_only_run != "true": + write_rds_df_to_s3_parquet(df_rds_read, + partition_by_cols, + table_folder_path) + # ----------------------------------------------- total_files, total_size = get_s3_folder_info(PARQUET_OUTPUT_S3_BUCKET_NAME, table_folder_path) msg_part_1 = f"""total_files={total_files}""" @@ -670,10 +752,10 @@ def write_to_s3_parquet(df_dv_output: DataFrame, table_folder_path, validation_sample_fraction_float) - df_rds_read.unpersist(True) - - write_to_s3_parquet(df_dv_output, rds_db_name, db_sch_tbl) + write_to_s3_parquet(df_dv_output, rds_jdbc_conn_obj, db_sch_tbl) # ------------------------------------------------------------ + df_rds_read.unpersist(True) + job.commit()