Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPR2-147: Fix streaming job not ingesting events after running idle #3604

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"reporting_hub_worker_type": "G.1X",
"reporting_hub_num_workers": 4,
"reporting_hub_batch_duration_seconds": 30,
"reporting_hub_add_idle_time_between_reads": true,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down Expand Up @@ -87,6 +89,8 @@
"reporting_hub_worker_type": "G.1X",
"reporting_hub_num_workers": 4,
"reporting_hub_batch_duration_seconds": 30,
"reporting_hub_add_idle_time_between_reads": true,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down Expand Up @@ -162,6 +166,8 @@
"reporting_hub_worker_type": "G.2X",
"reporting_hub_num_workers": 4,
"reporting_hub_batch_duration_seconds": 30,
"reporting_hub_add_idle_time_between_reads": false,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down Expand Up @@ -237,6 +243,8 @@
"reporting_hub_worker_type": "G.2X",
"reporting_hub_num_workers": 6,
"reporting_hub_batch_duration_seconds": 40,
"reporting_hub_add_idle_time_between_reads": false,
"reporting_hub_idle_time_between_reads_in_millis": 140,
"refresh_job_worker_type": "G.1X",
"refresh_job_num_workers": 2,
"refresh_job_log_level": "INFO",
Expand Down
5 changes: 4 additions & 1 deletion terraform/environments/digital-prison-reporting/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ locals {
reporting_hub_num_workers = local.application_data.accounts[local.environment].reporting_hub_num_workers
reporting_hub_log_level = local.application_data.accounts[local.environment].reporting_hub_spark_log_level

reporting_hub_batch_duration_seconds = local.application_data.accounts[local.environment].reporting_hub_batch_duration_seconds
reporting_hub_batch_duration_seconds = local.application_data.accounts[local.environment].reporting_hub_batch_duration_seconds
reporting_hub_add_idle_time_between_reads = local.application_data.accounts[local.environment].reporting_hub_add_idle_time_between_reads

reporting_hub_idle_time_between_reads_in_millis = local.application_data.accounts[local.environment].reporting_hub_idle_time_between_reads_in_millis

# Refresh Job
refresh_job_worker_type = local.application_data.accounts[local.environment].refresh_job_worker_type
Expand Down
129 changes: 69 additions & 60 deletions terraform/environments/digital-prison-reporting/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ module "glue_reporting_hub_job" {
job_language = "scala"
create_security_configuration = local.create_sec_conf
temp_dir = "s3://${module.s3_glue_job_bucket.bucket_id}/tmp/${local.project}-reporting-hub-${local.env}/"
checkpoint_dir = "s3://${module.s3_glue_job_bucket.bucket_id}/checkpoint/${local.project}-reporting-hub-${local.env}/"
# Using s3a for checkpoint because to align with Hadoop 3 supports
checkpoint_dir = "s3a://${module.s3_glue_job_bucket.bucket_id}/checkpoint/${local.project}-reporting-hub-${local.env}/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we're using s3a?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There other Hadoop connectors to S3. Only S3A is actively maintained by the Hadoop project itself.

https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Other_S3_Connectors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so up until this point we've been using Amazon's connector which supports s3:// scheme. I'm confused why specifically change the checkpoint prefix to s3a but not all the other paths which also use the connector? dpr.raw.s3.path for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue only affects the checkpointing where an warning below appears in the logs:

23/10/06 21:58:17 WARN CheckpointFileManager: Could not use FileContext API for managing Structured Streaming checkpoint files at s3://dpr-glue-jobs-development/checkpoint/dpr-reporting-hub-development. Using FileSystem API instead for managing log files. If the implementation of FileSystem.rename() is not atomic, then the correctness and fault-tolerance ofyour Structured Streaming is not guaranteed.

spark_event_logs = "s3://${module.s3_glue_job_bucket.bucket_id}/spark-logs/${local.project}-reporting-hub-${local.env}/"
# Placeholder Script Location
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
execution_class = "STANDARD"
worker_type = local.reporting_hub_worker_type
number_of_workers = local.reporting_hub_num_workers
max_concurrent = 1
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
execution_class = "STANDARD"
worker_type = local.reporting_hub_worker_type
number_of_workers = local.reporting_hub_num_workers
max_concurrent = 1
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1

tags = merge(
local.all_tags,
Expand All @@ -43,28 +44,30 @@ module "glue_reporting_hub_job" {
)

arguments = {
"--extra-jars" = local.glue_jobs_latest_jar_location
"--job-bookmark-option" = "job-bookmark-disable"
"--class" = "uk.gov.justice.digital.job.DataHubJob"
"--dpr.kinesis.stream.arn" = module.kinesis_stream_ingestor.kinesis_stream_arn
"--dpr.aws.region" = local.account_region
"--dpr.curated.s3.path" = "s3://${module.s3_curated_bucket.bucket_id}/"
"--dpr.batchDurationSeconds" = local.reporting_hub_batch_duration_seconds
"--dpr.raw.s3.path" = "s3://${module.s3_raw_bucket.bucket_id}/"
"--dpr.structured.s3.path" = "s3://${module.s3_structured_bucket.bucket_id}/"
"--dpr.violations.s3.path" = "s3://${module.s3_violation_bucket.bucket_id}/"
"--enable-metrics" = true
"--enable-spark-ui" = false
"--enable-auto-scaling" = true
"--enable-job-insights" = true
"--dpr.aws.dynamodb.endpointUrl" = "https://dynamodb.${local.account_region}.amazonaws.com"
"--dpr.contract.registryName" = trimprefix(module.glue_registry_avro.registry_name, "${local.glue_avro_registry[0]}/")
"--dpr.domain.registry" = "${local.project}-domain-registry-${local.environment}"
"--dpr.domain.target.path" = "s3://${module.s3_domain_bucket.bucket_id}"
"--dpr.domain.catalog.db" = module.glue_data_domain_database.db_name
"--dpr.redshift.secrets.name" = "${local.project}-redshift-secret-${local.environment}"
"--dpr.datamart.db.name" = "datamart"
"--dpr.log.level" = local.reporting_hub_log_level
"--extra-jars" = local.glue_jobs_latest_jar_location
"--job-bookmark-option" = "job-bookmark-disable"
"--class" = "uk.gov.justice.digital.job.DataHubJob"
"--dpr.kinesis.stream.arn" = module.kinesis_stream_ingestor.kinesis_stream_arn
"--dpr.aws.region" = local.account_region
"--dpr.curated.s3.path" = "s3://${module.s3_curated_bucket.bucket_id}/"
"--dpr.batchDurationSeconds" = local.reporting_hub_batch_duration_seconds
"--dpr.add.idle.time.between.reads" = local.reporting_hub_add_idle_time_between_reads
"--dpr.idle.time.between.reads.millis" = local.reporting_hub_idle_time_between_reads_in_millis
"--dpr.raw.s3.path" = "s3://${module.s3_raw_bucket.bucket_id}/"
"--dpr.structured.s3.path" = "s3://${module.s3_structured_bucket.bucket_id}/"
"--dpr.violations.s3.path" = "s3://${module.s3_violation_bucket.bucket_id}/"
"--enable-metrics" = true
"--enable-spark-ui" = false
"--enable-auto-scaling" = true
"--enable-job-insights" = true
"--dpr.aws.dynamodb.endpointUrl" = "https://dynamodb.${local.account_region}.amazonaws.com"
"--dpr.contract.registryName" = trimprefix(module.glue_registry_avro.registry_name, "${local.glue_avro_registry[0]}/")
"--dpr.domain.registry" = "${local.project}-domain-registry-${local.environment}"
"--dpr.domain.target.path" = "s3://${module.s3_domain_bucket.bucket_id}"
"--dpr.domain.catalog.db" = module.glue_data_domain_database.db_name
"--dpr.redshift.secrets.name" = "${local.project}-redshift-secret-${local.environment}"
"--dpr.datamart.db.name" = "datamart"
"--dpr.log.level" = local.reporting_hub_log_level
}
}

Expand All @@ -82,19 +85,19 @@ module "glue_domain_refresh_job" {
checkpoint_dir = "s3://${module.s3_glue_job_bucket.bucket_id}/checkpoint/${local.project}-domain-refresh-${local.env}/"
spark_event_logs = "s3://${module.s3_glue_job_bucket.bucket_id}/spark-logs/${local.project}-domain-refresh-${local.env}/"
# Placeholder Script Location
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
script_location = local.glue_placeholder_script_location
enable_continuous_log_filter = false
project_id = local.project
aws_kms_key = local.s3_kms_arn
additional_policies = module.kinesis_stream_ingestor.kinesis_stream_iam_policy_admin_arn
# timeout = 1440
execution_class = "FLEX"
worker_type = local.refresh_job_worker_type
number_of_workers = local.refresh_job_num_workers
max_concurrent = 64
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1
execution_class = "FLEX"
worker_type = local.refresh_job_worker_type
number_of_workers = local.refresh_job_num_workers
max_concurrent = 64
region = local.account_region
account = local.account_id
log_group_retention_in_days = 1

tags = merge(
local.all_tags,
Expand Down Expand Up @@ -168,7 +171,7 @@ module "glue_registry_avro" {
source = "./modules/glue_registry"
enable_glue_registry = true
name = "${local.project}-glue-registry-avro-${local.env}"
tags = merge(
tags = merge(
local.all_tags,
{
Name = "${local.project}-glue-registry-avro-${local.env}"
Expand All @@ -194,7 +197,7 @@ module "glue_raw_table" {
# AWS Glue catalog table
glue_catalog_table_description = "Glue Table for raw data, managed by Terraform."
glue_catalog_table_table_type = "EXTERNAL_TABLE"
glue_catalog_table_parameters = {
glue_catalog_table_parameters = {
EXTERNAL = "TRUE"
"parquet.compression" = "SNAPPY"
"classification" = "parquet"
Expand Down Expand Up @@ -256,7 +259,7 @@ module "glue_reconciliation_table" {
# AWS Glue catalog table
glue_catalog_table_description = "Glue Table for reconciliation data, managed by Terraform."
glue_catalog_table_table_type = "EXTERNAL_TABLE"
glue_catalog_table_parameters = {
glue_catalog_table_parameters = {
EXTERNAL = "TRUE"
"parquet.compression" = "SNAPPY"
"classification" = "parquet"
Expand Down Expand Up @@ -594,9 +597,9 @@ module "ec2_kinesis_agent" {
ebs_encrypted = true
ebs_delete_on_termination = false
# s3_policy_arn = aws_iam_policy.read_s3_read_access_policy.arn # TBC
region = local.account_region
account = local.account_id
env = local.env
region = local.account_region
account = local.account_id
env = local.env


tags = merge(
Expand Down Expand Up @@ -626,7 +629,9 @@ module "datamart" {
create_subnet_group = true
kms_key_arn = aws_kms_key.redshift-kms-key.arn
enhanced_vpc_routing = false
subnet_ids = [data.aws_subnet.private_subnets_a.id, data.aws_subnet.private_subnets_b.id, data.aws_subnet.private_subnets_c.id]
subnet_ids = [
data.aws_subnet.private_subnets_a.id, data.aws_subnet.private_subnets_b.id, data.aws_subnet.private_subnets_c.id
]
vpc = data.aws_vpc.shared.id
cidr = [data.aws_vpc.shared.cidr_block, local.cloud_platform_cidr]
iam_role_arns = [aws_iam_role.redshift-role.arn, aws_iam_role.redshift-spectrum-role.arn]
Expand All @@ -636,7 +641,7 @@ module "datamart" {

# Scheduled actions
create_scheduled_action_iam_role = true
scheduled_actions = {
scheduled_actions = {
pause = {
name = "${local.redshift_cluster_name}-pause"
description = "Pause cluster every night"
Expand Down Expand Up @@ -690,7 +695,9 @@ module "dms_nomis_ingestor" {
migration_type = "full-load-and-cdc"
replication_instance_version = "3.4.7" # Upgrade
replication_instance_class = "dms.t3.medium"
subnet_ids = [data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id]
subnet_ids = [
data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id
]

vpc_role_dependency = [aws_iam_role.dmsvpcrole]
cloudwatch_role_dependency = [aws_iam_role.dms_cloudwatch_logs_role]
Expand Down Expand Up @@ -740,7 +747,9 @@ module "dms_fake_data_ingestor" {
migration_type = "full-load-and-cdc"
replication_instance_version = "3.4.7" # Rollback
replication_instance_class = "dms.t3.medium"
subnet_ids = [data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id]
subnet_ids = [
data.aws_subnet.data_subnets_a.id, data.aws_subnet.data_subnets_b.id, data.aws_subnet.data_subnets_c.id
]

vpc_role_dependency = [aws_iam_role.dmsvpcrole]
cloudwatch_role_dependency = [aws_iam_role.dms_cloudwatch_logs_role]
Expand All @@ -759,9 +768,9 @@ module "dms_fake_data_ingestor" {
tags = merge(
local.all_tags,
{
Name = "${local.project}-dms-fake-data-ingestor-${local.env}"
Resource_Type = "DMS Replication"
Postgres_Source = "DPS"
Name = "${local.project}-dms-fake-data-ingestor-${local.env}"
Resource_Type = "DMS Replication"
Postgres_Source = "DPS"
}
)
}
Expand Down
Loading