Skip to content

Commit

Permalink
DPR2-147: Fix streaming job not ingesting events after running idle (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
koladeadewuyi-moj authored Oct 12, 2023
1 parent 241ff12 commit f9c762b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"reporting_hub_worker_type": "G.1X",
"reporting_hub_num_workers": 4,
"reporting_hub_batch_duration_seconds": 30,
"reporting_hub_add_idle_time_between_reads": true,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down Expand Up @@ -87,6 +89,8 @@
"reporting_hub_worker_type": "G.1X",
"reporting_hub_num_workers": 4,
"reporting_hub_batch_duration_seconds": 30,
"reporting_hub_add_idle_time_between_reads": true,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down Expand Up @@ -162,6 +166,8 @@
"reporting_hub_worker_type": "G.2X",
"reporting_hub_num_workers": 4,
"reporting_hub_batch_duration_seconds": 30,
"reporting_hub_add_idle_time_between_reads": false,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down Expand Up @@ -237,6 +243,8 @@
"reporting_hub_worker_type": "G.2X",
"reporting_hub_num_workers": 6,
"reporting_hub_batch_duration_seconds": 40,
"reporting_hub_add_idle_time_between_reads": false,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down
5 changes: 4 additions & 1 deletion terraform/environments/digital-prison-reporting/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ locals {
reporting_hub_num_workers = local.application_data.accounts[local.environment].reporting_hub_num_workers
reporting_hub_log_level = local.application_data.accounts[local.environment].reporting_hub_spark_log_level

reporting_hub_batch_duration_seconds = local.application_data.accounts[local.environment].reporting_hub_batch_duration_seconds
reporting_hub_batch_duration_seconds = local.application_data.accounts[local.environment].reporting_hub_batch_duration_seconds
reporting_hub_add_idle_time_between_reads = local.application_data.accounts[local.environment].reporting_hub_add_idle_time_between_reads

reporting_hub_idle_time_between_reads_in_millis = local.application_data.accounts[local.environment].reporting_hub_idle_time_between_reads_in_millis

# Refresh Job
refresh_job_worker_type = local.application_data.accounts[local.environment].refresh_job_worker_type
Expand Down
123 changes: 66 additions & 57 deletions terraform/environments/digital-prison-reporting/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ module "glue_reporting_hub_job" {
job_language = "scala"
create_security_configuration = local.create_sec_conf
temp_dir = "s3://${module.s3_glue_job_bucket.bucket_id}/tmp/${local.project}-reporting-hub-${local.env}/"
checkpoint_dir = "s3://${module.s3_glue_job_bucket.bucket_id}/checkpoint/${local.project}-reporting-hub-${local.env}/"
# Using s3a for checkpoint because to align with Hadoop 3 supports
checkpoint_dir = "s3a://${module.s3_glue_job_bucket.bucket_id}/checkpoint/${local.project}-reporting-hub-${local.env}/"
spark_event_logs = "s3://${module.s3_glue_job_bucket.bucket_id}/spark-logs/${local.project}-reporting-hub-${local.env}/"
# Placeholder Script Location
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
execution_class = "STANDARD"
worker_type = local.reporting_hub_worker_type
number_of_workers = local.reporting_hub_num_workers
max_concurrent = 1
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
execution_class = "STANDARD"
worker_type = local.reporting_hub_worker_type
number_of_workers = local.reporting_hub_num_workers
max_concurrent = 1
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1

tags = merge(
local.all_tags,
Expand All @@ -43,28 +44,30 @@ module "glue_reporting_hub_job" {
)

arguments = {
"--extra-jars" = local.glue_jobs_latest_jar_location
"--job-bookmark-option" = "job-bookmark-disable"
"--class" = "uk.gov.justice.digital.job.DataHubJob"
"--dpr.kinesis.stream.arn" = module.kinesis_stream_ingestor.kinesis_stream_arn
"--dpr.aws.region" = local.account_region
"--dpr.curated.s3.path" = "s3://${module.s3_curated_bucket.bucket_id}/"
"--dpr.batchDurationSeconds" = local.reporting_hub_batch_duration_seconds
"--dpr.raw.s3.path" = "s3://${module.s3_raw_bucket.bucket_id}/"
"--dpr.structured.s3.path" = "s3://${module.s3_structured_bucket.bucket_id}/"
"--dpr.violations.s3.path" = "s3://${module.s3_violation_bucket.bucket_id}/"
"--enable-metrics" = true
"--enable-spark-ui" = false
"--enable-auto-scaling" = true
"--enable-job-insights" = true
"--dpr.aws.dynamodb.endpointUrl" = "https://dynamodb.${local.account_region}.amazonaws.com"
"--dpr.contract.registryName" = trimprefix(module.glue_registry_avro.registry_name, "${local.glue_avro_registry[0]}/")
"--dpr.domain.registry" = "${local.project}-domain-registry-${local.environment}"
"--dpr.domain.target.path" = "s3://${module.s3_domain_bucket.bucket_id}"
"--dpr.domain.catalog.db" = module.glue_data_domain_database.db_name
"--dpr.redshift.secrets.name" = "${local.project}-redshift-secret-${local.environment}"
"--dpr.datamart.db.name" = "datamart"
"--dpr.log.level" = local.reporting_hub_log_level
"--extra-jars" = local.glue_jobs_latest_jar_location
"--job-bookmark-option" = "job-bookmark-disable"
"--class" = "uk.gov.justice.digital.job.DataHubJob"
"--dpr.kinesis.stream.arn" = module.kinesis_stream_ingestor.kinesis_stream_arn
"--dpr.aws.region" = local.account_region
"--dpr.curated.s3.path" = "s3://${module.s3_curated_bucket.bucket_id}/"
"--dpr.batchDurationSeconds" = local.reporting_hub_batch_duration_seconds
"--dpr.add.idle.time.between.reads" = local.reporting_hub_add_idle_time_between_reads
"--dpr.idle.time.between.reads.millis" = local.reporting_hub_idle_time_between_reads_in_millis
"--dpr.raw.s3.path" = "s3://${module.s3_raw_bucket.bucket_id}/"
"--dpr.structured.s3.path" = "s3://${module.s3_structured_bucket.bucket_id}/"
"--dpr.violations.s3.path" = "s3://${module.s3_violation_bucket.bucket_id}/"
"--enable-metrics" = true
"--enable-spark-ui" = false
"--enable-auto-scaling" = true
"--enable-job-insights" = true
"--dpr.aws.dynamodb.endpointUrl" = "https://dynamodb.${local.account_region}.amazonaws.com"
"--dpr.contract.registryName" = trimprefix(module.glue_registry_avro.registry_name, "${local.glue_avro_registry[0]}/")
"--dpr.domain.registry" = "${local.project}-domain-registry-${local.environment}"
"--dpr.domain.target.path" = "s3://${module.s3_domain_bucket.bucket_id}"
"--dpr.domain.catalog.db" = module.glue_data_domain_database.db_name
"--dpr.redshift.secrets.name" = "${local.project}-redshift-secret-${local.environment}"
"--dpr.datamart.db.name" = "datamart"
"--dpr.log.level" = local.reporting_hub_log_level
}
}

Expand All @@ -82,19 +85,19 @@ module "glue_domain_refresh_job" {
checkpoint_dir = "s3://${module.s3_glue_job_bucket.bucket_id}/checkpoint/${local.project}-domain-refresh-${local.env}/"
spark_event_logs = "s3://${module.s3_glue_job_bucket.bucket_id}/spark-logs/${local.project}-domain-refresh-${local.env}/"
# Placeholder Script Location
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
# timeout = 1440
execution_class = "FLEX"
worker_type = local.refresh_job_worker_type
number_of_workers = local.refresh_job_num_workers
max_concurrent = 64
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1
execution_class = "FLEX"
worker_type = local.refresh_job_worker_type
number_of_workers = local.refresh_job_num_workers
max_concurrent = 64
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1

tags = merge(
local.all_tags,
Expand Down Expand Up @@ -168,7 +171,7 @@ module "glue_registry_avro" {
source = "./modules/glue_registry"
enable_glue_registry = true
name = "${local.project}-glue-registry-avro-${local.env}"
tags = merge(
tags = merge(
local.all_tags,
{
Name = "${local.project}-glue-registry-avro-${local.env}"
Expand All @@ -194,7 +197,7 @@ module "glue_raw_table" {
# AWS Glue catalog table
glue_catalog_table_description = "Glue Table for raw data, managed by Terraform."
glue_catalog_table_table_type = "EXTERNAL_TABLE"
glue_catalog_table_parameters = {
glue_catalog_table_parameters = {
EXTERNAL = "TRUE"
"parquet.compression" = "SNAPPY"
"classification" = "parquet"
Expand Down Expand Up @@ -256,7 +259,7 @@ module "glue_reconciliation_table" {
# AWS Glue catalog table
glue_catalog_table_description = "Glue Table for reconciliation data, managed by Terraform."
glue_catalog_table_table_type = "EXTERNAL_TABLE"
glue_catalog_table_parameters = {
glue_catalog_table_parameters = {
EXTERNAL = "TRUE"
"parquet.compression" = "SNAPPY"
"classification" = "parquet"
Expand Down Expand Up @@ -594,9 +597,9 @@ module "ec2_kinesis_agent" {
ebs_encrypted = true
ebs_delete_on_termination = false
# s3_policy_arn = aws_iam_policy.read_s3_read_access_policy.arn # TBC
region = local.account_region
account = local.account_id
env = local.env
region = local.account_region
account = local.account_id
env = local.env


tags = merge(
Expand Down Expand Up @@ -626,7 +629,9 @@ module "datamart" {
create_subnet_group = true
kms_key_arn = aws_kms_key.redshift-kms-key.arn
enhanced_vpc_routing = false
subnet_ids = [data.aws_subnet.private_subnets_a.id, data.aws_subnet.private_subnets_b.id, data.aws_subnet.private_subnets_c.id]
subnet_ids = [
data.aws_subnet.private_subnets_a.id, data.aws_subnet.private_subnets_b.id, data.aws_subnet.private_subnets_c.id
]
vpc = data.aws_vpc.shared.id
cidr = [data.aws_vpc.shared.cidr_block, local.cloud_platform_cidr]
iam_role_arns = [aws_iam_role.redshift-role.arn, aws_iam_role.redshift-spectrum-role.arn]
Expand All @@ -636,7 +641,7 @@ module "datamart" {

# Scheduled actions
create_scheduled_action_iam_role = true
scheduled_actions = {
scheduled_actions = {
pause = {
name = "${local.redshift_cluster_name}-pause"
description = "Pause cluster every night"
Expand Down Expand Up @@ -690,7 +695,9 @@ module "dms_nomis_ingestor" {
migration_type = "full-load-and-cdc"
replication_instance_version = "3.4.7" # Upgrade
replication_instance_class = "dms.t3.medium"
subnet_ids = [data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id]
subnet_ids = [
data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id
]

vpc_role_dependency = [aws_iam_role.dmsvpcrole]
cloudwatch_role_dependency = [aws_iam_role.dms_cloudwatch_logs_role]
Expand Down Expand Up @@ -740,7 +747,9 @@ module "dms_fake_data_ingestor" {
migration_type = "full-load-and-cdc"
replication_instance_version = "3.4.7" # Rollback
replication_instance_class = "dms.t3.medium"
subnet_ids = [data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id]
subnet_ids = [
data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id
]

vpc_role_dependency = [aws_iam_role.dmsvpcrole]
cloudwatch_role_dependency = [aws_iam_role.dms_cloudwatch_logs_role]
Expand Down

0 comments on commit f9c762b

Please sign in to comment.