diff --git a/internal/service/kinesisanalyticsv2/application.go b/internal/service/kinesisanalyticsv2/application.go index 7c6888046ab..99c2ff3c8fe 100644 --- a/internal/service/kinesisanalyticsv2/application.go +++ b/internal/service/kinesisanalyticsv2/application.go @@ -1380,6 +1380,20 @@ func resourceApplicationUpdate(d *schema.ResourceData, meta interface{}) error { } } + if d.HasChange("application_configuration.0.run_configuration") { + application, err := FindApplicationDetailByName(conn, applicationName) + + if err != nil { + return fmt.Errorf("error reading Kinesis Analytics v2 Application (%s): %w", applicationName, err) + } + + if actual, expected := aws.StringValue(application.ApplicationStatus), kinesisanalyticsv2.ApplicationStatusRunning; actual == expected { + input.RunConfigurationUpdate = expandRunConfigurationUpdate(d.Get("application_configuration.0.run_configuration").([]interface{})) + + updateApplication = true + } + } + input.ApplicationConfigurationUpdate = applicationConfigurationUpdate } @@ -2483,6 +2497,46 @@ func expandVPCConfigurationUpdate(vVpcConfiguration []interface{}) *kinesisanaly return vpcConfigurationUpdate } +func expandRunConfigurationUpdate(vRunConfigurationUpdate []interface{}) *kinesisanalyticsv2.RunConfigurationUpdate { + if len(vRunConfigurationUpdate) == 0 || vRunConfigurationUpdate[0] == nil { + return nil + } + + runConfigurationUpdate := &kinesisanalyticsv2.RunConfigurationUpdate{} + + mRunConfiguration := vRunConfigurationUpdate[0].(map[string]interface{}) + + if vApplicationRestoreConfiguration, ok := mRunConfiguration["application_restore_configuration"].([]interface{}); ok && len(vApplicationRestoreConfiguration) > 0 && vApplicationRestoreConfiguration[0] != nil { + applicationRestoreConfiguration := &kinesisanalyticsv2.ApplicationRestoreConfiguration{} + + mApplicationRestoreConfiguration := vApplicationRestoreConfiguration[0].(map[string]interface{}) + + if vApplicationRestoreType, ok := mApplicationRestoreConfiguration["application_restore_type"].(string); ok && vApplicationRestoreType != "" { + applicationRestoreConfiguration.ApplicationRestoreType = aws.String(vApplicationRestoreType) + } + + if vSnapshotName, ok := mApplicationRestoreConfiguration["snapshot_name"].(string); ok && vSnapshotName != "" { + applicationRestoreConfiguration.SnapshotName = aws.String(vSnapshotName) + } + + runConfigurationUpdate.ApplicationRestoreConfiguration = applicationRestoreConfiguration + } + + if vFlinkRunConfiguration, ok := mRunConfiguration["flink_run_configuration"].([]interface{}); ok && len(vFlinkRunConfiguration) > 0 && vFlinkRunConfiguration[0] != nil { + flinkRunConfiguration := &kinesisanalyticsv2.FlinkRunConfiguration{} + + mFlinkRunConfiguration := vFlinkRunConfiguration[0].(map[string]interface{}) + + if vAllowNonRestoredState, ok := mFlinkRunConfiguration["allow_non_restored_state"].(bool); ok { + flinkRunConfiguration.AllowNonRestoredState = aws.Bool(vAllowNonRestoredState) + } + + runConfigurationUpdate.FlinkRunConfiguration = flinkRunConfiguration + } + + return runConfigurationUpdate +} + func flattenApplicationConfigurationDescription(applicationConfigurationDescription *kinesisanalyticsv2.ApplicationConfigurationDescription) []interface{} { if applicationConfigurationDescription == nil { return []interface{}{} diff --git a/internal/service/kinesisanalyticsv2/application_snapshot_test.go b/internal/service/kinesisanalyticsv2/application_snapshot_test.go index 526ae8e6d39..32bdff7409e 100644 --- a/internal/service/kinesisanalyticsv2/application_snapshot_test.go +++ b/internal/service/kinesisanalyticsv2/application_snapshot_test.go @@ -141,5 +141,5 @@ func testAccCheckApplicationSnapshotExists(n string, v *kinesisanalyticsv2.Snaps } func testAccApplicationSnapshotConfig(rName string) string { - return testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "SKIP_RESTORE_FROM_SNAPSHOT", "") + return testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "SKIP_RESTORE_FROM_SNAPSHOT", "", false) } diff --git a/internal/service/kinesisanalyticsv2/application_test.go b/internal/service/kinesisanalyticsv2/application_test.go index 2cdfccfcdd9..0c8ac635d76 100644 --- a/internal/service/kinesisanalyticsv2/application_test.go +++ b/internal/service/kinesisanalyticsv2/application_test.go @@ -1033,7 +1033,7 @@ func TestAccKinesisAnalyticsV2Application_FlinkApplication_restoreFromSnapshot(t CheckDestroy: testAccCheckApplicationDestroy, Steps: []resource.TestStep{ { - Config: testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "RESTORE_FROM_LATEST_SNAPSHOT", ""), + Config: testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "RESTORE_FROM_LATEST_SNAPSHOT", "", false), Check: resource.ComposeTestCheckFunc( testAccCheckApplicationExists(resourceName, &v), resource.TestCheckResourceAttr(resourceName, "application_configuration.#", "1"), @@ -1162,7 +1162,7 @@ func TestAccKinesisAnalyticsV2Application_FlinkApplication_restoreFromSnapshot(t ImportStateVerifyIgnore: []string{"force_stop", "start_application"}, }, { - Config: testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "RESTORE_FROM_CUSTOM_SNAPSHOT", rName), + Config: testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "RESTORE_FROM_CUSTOM_SNAPSHOT", rName, false), Check: resource.ComposeTestCheckFunc( testAccCheckApplicationExists(resourceName, &v), resource.TestCheckResourceAttr(resourceName, "application_configuration.#", "1"), @@ -3921,6 +3921,158 @@ func TestAccKinesisAnalyticsV2Application_SQLApplicationVPC_update(t *testing.T) }) } +func TestAccKinesisAnalyticsV2Application_RunConfiguration_Update(t *testing.T) { + var v kinesisanalyticsv2.ApplicationDetail + resourceName := "aws_kinesisanalyticsv2_application.test" + iamRoleResourceName := "aws_iam_role.test.0" + s3BucketResourceName := "aws_s3_bucket.test" + s3BucketObjectResourceName := "aws_s3_bucket_object.test" + snapshotResourceName := "aws_kinesisanalyticsv2_application_snapshot.test" + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, kinesisanalyticsv2.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckApplicationDestroy, + Steps: []resource.TestStep{ + { + Config: testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "RESTORE_FROM_LATEST_SNAPSHOT", "", false), + Check: resource.ComposeTestCheckFunc( + testAccCheckApplicationExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "application_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.0.bucket_arn", s3BucketResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.0.file_key", s3BucketObjectResourceName, "key"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.0.object_version", ""), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.text_content", ""), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content_type", "ZIPFILE"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_snapshot_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_snapshot_configuration.0.snapshots_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.environment_properties.#", "1"), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "application_configuration.0.environment_properties.0.property_group.*", map[string]string{ + "property_group_id": "ConsumerConfigProperties", + "property_map.%": "3", + "property_map.flink.inputstream.initpos": "LATEST", + }), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "application_configuration.0.environment_properties.0.property_group.*", map[string]string{ + "property_group_id": "ProducerConfigProperties", + "property_map.%": "3", + "property_map.AggregationEnabled": "false", + }), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.checkpointing_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.checkpoint_interval", "60000"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.configuration_type", "DEFAULT"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.min_pause_between_checkpoints", "5000"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.0.configuration_type", "DEFAULT"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.0.log_level", "INFO"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.0.metrics_level", "APPLICATION"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.auto_scaling_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.configuration_type", "DEFAULT"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.parallelism", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.parallelism_per_kpu", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.application_restore_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.application_restore_configuration.0.application_restore_type", "RESTORE_FROM_LATEST_SNAPSHOT"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.flink_run_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.flink_run_configuration.0.allow_non_restored_state", "false"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.sql_application_configuration.#", "0"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.vpc_configuration.#", "0"), + acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "0"), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckNoResourceAttr(resourceName, "force_stop"), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "runtime_environment", "FLINK-1_11"), + resource.TestCheckResourceAttrPair(resourceName, "service_execution_role", iamRoleResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "start_application", "true"), + resource.TestCheckResourceAttr(resourceName, "status", "RUNNING"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "version_id", "1"), + ), + }, + { + Config: testAccApplicationConfigStartSnapshotableFlinkApplication(rName, "RESTORE_FROM_CUSTOM_SNAPSHOT", rName, true), + Check: resource.ComposeTestCheckFunc( + testAccCheckApplicationExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "application_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.0.bucket_arn", s3BucketResourceName, "arn"), + resource.TestCheckResourceAttrPair(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.0.file_key", s3BucketObjectResourceName, "key"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.s3_content_location.0.object_version", ""), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content.0.text_content", ""), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_code_configuration.0.code_content_type", "ZIPFILE"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_snapshot_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.application_snapshot_configuration.0.snapshots_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.environment_properties.#", "1"), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "application_configuration.0.environment_properties.0.property_group.*", map[string]string{ + "property_group_id": "ConsumerConfigProperties", + "property_map.%": "3", + "property_map.flink.inputstream.initpos": "LATEST", + }), + resource.TestCheckTypeSetElemNestedAttrs(resourceName, "application_configuration.0.environment_properties.0.property_group.*", map[string]string{ + "property_group_id": "ProducerConfigProperties", + "property_map.%": "3", + "property_map.AggregationEnabled": "false", + }), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.checkpointing_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.checkpoint_interval", "60000"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.configuration_type", "DEFAULT"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.checkpoint_configuration.0.min_pause_between_checkpoints", "5000"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.0.configuration_type", "DEFAULT"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.0.log_level", "INFO"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.monitoring_configuration.0.metrics_level", "APPLICATION"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.auto_scaling_enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.configuration_type", "DEFAULT"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.parallelism", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.flink_application_configuration.0.parallelism_configuration.0.parallelism_per_kpu", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.application_restore_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.application_restore_configuration.0.application_restore_type", "RESTORE_FROM_CUSTOM_SNAPSHOT"), + resource.TestCheckResourceAttrPair(resourceName, "application_configuration.0.run_configuration.0.application_restore_configuration.0.snapshot_name", snapshotResourceName, "snapshot_name"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.flink_run_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.run_configuration.0.flink_run_configuration.0.allow_non_restored_state", "true"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.sql_application_configuration.#", "0"), + resource.TestCheckResourceAttr(resourceName, "application_configuration.0.vpc_configuration.#", "0"), + acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "kinesisanalytics", fmt.Sprintf("application/%s", rName)), + resource.TestCheckResourceAttr(resourceName, "cloudwatch_logging_options.#", "0"), + resource.TestCheckResourceAttrSet(resourceName, "create_timestamp"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckNoResourceAttr(resourceName, "force_stop"), + resource.TestCheckResourceAttrSet(resourceName, "last_update_timestamp"), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "runtime_environment", "FLINK-1_11"), + resource.TestCheckResourceAttrPair(resourceName, "service_execution_role", iamRoleResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "start_application", "true"), + resource.TestCheckResourceAttr(resourceName, "status", "RUNNING"), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "version_id", "2"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"force_stop", "start_application"}, + }, + }, + }) +} + func testAccCheckApplicationDestroy(s *terraform.State) error { conn := acctest.Provider.Meta().(*conns.AWSClient).KinesisAnalyticsV2Conn @@ -4639,7 +4791,7 @@ resource "aws_kinesisanalyticsv2_application" "test" { `, rName)) } -func testAccApplicationConfigStartSnapshotableFlinkApplication(rName, applicationRestoreType, snapshotName string) string { +func testAccApplicationConfigStartSnapshotableFlinkApplication(rName, applicationRestoreType, snapshotName string, allowNonRestoredState bool) string { if snapshotName == "" { snapshotName = "null" } else { @@ -4719,6 +4871,9 @@ resource "aws_kinesisanalyticsv2_application" "test" { application_restore_type = %[2]q snapshot_name = %[3]s } + flink_run_configuration { + allow_non_restored_state = %[4]t + } } } @@ -4729,7 +4884,7 @@ resource "aws_kinesisanalyticsv2_application_snapshot" "test" { application_name = aws_kinesisanalyticsv2_application.test.name snapshot_name = %[1]q } -`, rName, applicationRestoreType, snapshotName)) +`, rName, applicationRestoreType, snapshotName, allowNonRestoredState)) } func testAccApplicationConfigStopSnapshotableFlinkApplication(rName string, forceStop bool) string {