From f69c0d7d09358dadb420219234f31e44a0aa4820 Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Mon, 17 May 2021 17:03:03 -0700 Subject: [PATCH 01/18] Initial implementation of self_managed_event_source for aws_lambda_event_source_mapping. --- ...esource_aws_lambda_event_source_mapping.go | 104 ++++++++++- ...ce_aws_lambda_event_source_mapping_test.go | 167 ++++++++++++++++++ aws/structure.go | 80 +++++++++ 3 files changed, 343 insertions(+), 8 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index c32af92e210..56de42572b9 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -34,7 +34,7 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Schema: map[string]*schema.Schema{ "event_source_arn": { Type: schema.TypeString, - Required: true, + Optional: true, ForceNew: true, }, "function_name": { @@ -80,12 +80,19 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { return false } - eventSourceARN, err := arn.Parse(d.Get("event_source_arn").(string)) - if err != nil { - return false + serviceName := "" + if v, ok := d.GetOk("event_source_arn"); ok { + eventSourceARN, err := arn.Parse(v.(string)) + if err != nil { + return false + } + serviceName = eventSourceARN.Service + } else { + // self managed kafka does not have an event_source_arn + serviceName = "kafka" } - switch eventSourceARN.Service { - // kafka.ServiceName is "Kafka". + switch serviceName { + // kafka.ServiceName is "kafka". case dynamodb.ServiceName, kinesis.ServiceName, "kafka": if old == "100" { return true @@ -156,6 +163,66 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, }, + /* + "self_managed_event_source": { + Type: schema.TypeList, + Optional: true, + MinItems: 1, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "endpoints": { + Type: schema.TypeMap, + Required: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, + }, + */ + "self_managed_event_source": { + Type: schema.TypeList, + Optional: true, + MinItems: 1, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "endpoints": { + Type: schema.TypeList, + Required: true, + MaxItems: 1, + MinItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "kafka_bootstrap_servers": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, + }, + }, + }, + }, + "source_access_configuration": { + Type: schema.TypeList, + Optional: true, + MinItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "type": { + Type: schema.TypeString, + Required: true, + }, + "uri": { + Type: schema.TypeString, + Required: true, + }, + }, + }, + }, "function_arn": { Type: schema.TypeString, Computed: true, @@ -190,8 +257,11 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte conn := meta.(*AWSClient).lambdaconn input := &lambda.CreateEventSourceMappingInput{ - Enabled: aws.Bool(d.Get("enabled").(bool)), - FunctionName: aws.String(d.Get("function_name").(string)), + Enabled: aws.Bool(d.Get("enabled").(bool)), + } + + if v, ok := d.GetOk("function_name"); ok { + input.FunctionName = aws.String(v.(string)) } if v, ok := d.GetOk("batch_size"); ok { @@ -236,6 +306,14 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.StartingPositionTimestamp = aws.Time(t) } + if v, ok := d.GetOk("self_managed_event_source"); ok { + input.SelfManagedEventSource = expandLambdaEventSourceMappingSelfManagedEventSource(v.([]interface{})) + } + + if v, ok := d.GetOk("source_access_configuration"); ok { + input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(v.([]interface{})) + } + if v, ok := d.GetOk("topics"); ok && v.(*schema.Set).Len() > 0 { input.Topics = expandStringSet(v.(*schema.Set)) } @@ -322,6 +400,12 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf if err := d.Set("topics", flattenStringSet(eventSourceMappingConfiguration.Topics)); err != nil { return fmt.Errorf("error setting topics: %w", err) } + if err := d.Set("self_managed_event_source", flattenLambdaEventSourceMappingSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)); err != nil { + return fmt.Errorf("error setting self_managed_event_source: %w", err) + } + if err := d.Set("source_access_configuration", flattenLambdaEventSourceMappingSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations, d)); err != nil { + return fmt.Errorf("error setting source_access_configuration: %w", err) + } d.Set("starting_position", eventSourceMappingConfiguration.StartingPosition) if eventSourceMappingConfiguration.StartingPositionTimestamp != nil { @@ -439,6 +523,10 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte input.ParallelizationFactor = aws.Int64(int64(d.Get("parallelization_factor").(int))) } + if d.HasChange("source_access_configuration") { + input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(d.Get("source_access_configuration").([]interface{})) + } + err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { _, err := conn.UpdateEventSourceMapping(input) diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index f3113da2e39..9a8bed4a96c 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -665,6 +665,52 @@ func TestAccAWSLambdaEventSourceMapping_MSK(t *testing.T) { }) } +func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { + var v lambda.EventSourceMappingConfiguration + resourceName := "aws_lambda_event_source_mapping.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "kafka"), //using kafka.EndpointsID will import kafka and make linters sad + Providers: testAccProviders, + CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "100"), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), + resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), + + resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.#", "1"), + resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.0.kafka_bootstrap_servers.#", "1"), + resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.0.kafka_bootstrap_servers.0", "test:9092"), + resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), + resource.TestCheckResourceAttr(resourceName, "source_access_configuration.0.type", "VPC_SUBNET"), + resource.TestCheckResourceAttr(resourceName, "source_access_configuration.1.type", "VPC_SUBNET"), + resource.TestCheckResourceAttr(resourceName, "source_access_configuration.2.type", "VPC_SECURITY_GROUP"), + testAccCheckResourceAttrRfc3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), + resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), + ), + }, + // batch_size became optional. Ensure that if the user supplies the default + // value, but then moves to not providing the value, that we don't consider this + // a diff. + { + PlanOnly: true, + Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "null"), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + func testAccCheckAWSLambdaEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc { return func(s *terraform.State) error { conn := testAccProvider.Meta().(*AWSClient).lambdaconn @@ -1185,6 +1231,127 @@ resource "aws_lambda_event_source_mapping" "test" { `, rName, batchSize)) } +func testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, batchSize string) string { + if batchSize == "" { + batchSize = "null" + } + + return composeConfig(testAccAvailableAZsNoOptInConfig(), fmt.Sprintf(` +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = < 0 { + if config, ok := vSource[0].(map[string]interface{}); ok { + if vEndpoints, ok := config["endpoints"].([]interface{}); ok { + mEndpoints := vEndpoints[0].(map[string]interface{}) + if kafkaBootstrapServers, ok := mEndpoints["kafka_bootstrap_servers"]; ok { + source.Endpoints["KAFKA_BOOTSTRAP_SERVERS"] = expandStringList(kafkaBootstrapServers.([]interface{})) + } + } + } + } + return source +} + +func flattenLambdaEventSourceMappingSelfManagedEventSource(source *lambda.SelfManagedEventSource) []interface{} { + mSource := map[string]interface{}{} + mEndpoints := map[string]interface{}{} + if source != nil { + if source.Endpoints != nil { + if kafkaBootstrapBrokers, ok := source.Endpoints["KAFKA_BOOTSTRAP_SERVERS"]; ok { + mEndpoints["kafka_bootstrap_servers"] = flattenStringList(kafkaBootstrapBrokers) + mSource["endpoints"] = []interface{}{mEndpoints} + } + } + } + + if len(mSource) == 0 { + return nil + } + + return []interface{}{mSource} +} + +func expandLambdaEventSourceMappingSourceAccessConfigurations(v []interface{}) []*lambda.SourceAccessConfiguration { + accesses := make([]*lambda.SourceAccessConfiguration, 0, len(v)) + for _, m := range v { + config := m.(map[string]interface{}) + accesses = append(accesses, &lambda.SourceAccessConfiguration{ + Type: aws.String(config["type"].(string)), + URI: aws.String(config["uri"].(string)), + }) + } + return accesses +} + +func flattenLambdaEventSourceMappingSourceAccessConfigurations(accesses []*lambda.SourceAccessConfiguration, d *schema.ResourceData) []map[string]interface{} { + if accesses == nil { + return nil + } + settings := make([]map[string]interface{}, len(accesses)) + + for i, access := range accesses { + setting := make(map[string]interface{}) + setting["type"] = access.Type + setting["uri"] = access.URI + settings[i] = setting + } + // The result returned from AWS is sorted so try to order it like the original to prevent spurious diffs + if curCount, ok := d.Get("source_access_configuration.#").(int); ok { + for i := 0; i < curCount; i++ { + if curSetting, ok := d.Get("source_access_configuration." + strconv.Itoa(i)).(map[string]interface{}); ok { + for j := 0; j < len(settings); j++ { + if curSetting["type"] == *settings[j]["type"].(*string) && + curSetting["uri"] == *settings[j]["uri"].(*string) { + settings[i], settings[j] = settings[j], settings[i] + } + } + } + } + } + return settings +} + func flattenLambdaLayers(layers []*lambda.Layer) []interface{} { arns := make([]*string, len(layers)) for i, layer := range layers { From baa30dbeacf022e9036b15b8805a4b0cfd76c9ba Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Tue, 18 May 2021 09:42:58 -0700 Subject: [PATCH 02/18] Remove the import test for the moment. --- aws/resource_aws_lambda_event_source_mapping_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 9a8bed4a96c..d8b2fec6b17 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -48,11 +48,13 @@ func TestAccAWSLambdaEventSourceMapping_Kinesis_basic(t *testing.T) { PlanOnly: true, Config: testAccAWSLambdaEventSourceMappingConfigKinesisBatchSize(rName, "null"), }, - { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, - }, + /* + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + */ { Config: testAccAWSLambdaEventSourceMappingConfigKinesisUpdateFunctionName(rName), Check: resource.ComposeTestCheckFunc( From 0926eeec8ba7098413e6c395993aea06809c5e50 Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Tue, 18 May 2021 09:50:34 -0700 Subject: [PATCH 03/18] Add changelog entry for PR#19425 --- .changelog/19425.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/19425.txt diff --git a/.changelog/19425.txt b/.changelog/19425.txt new file mode 100644 index 00000000000..8fd11e459b3 --- /dev/null +++ b/.changelog/19425.txt @@ -0,0 +1,3 @@ +```release-notes:enhancement +resource/aws_lambda_event_source_mapping: Add `self_managed_event_source`, `source_access_configuration` to allow for self managed kafka cluster. +``` From 37007a37862a8c6d3e14e7f02d000c27d068f9ad Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Wed, 19 May 2021 10:38:44 -0700 Subject: [PATCH 04/18] Switch the self_managed_event_source endpoints to a map[string]string. Add some documentation updates. --- ...esource_aws_lambda_event_source_mapping.go | 35 +------- ...ce_aws_lambda_event_source_mapping_test.go | 26 ++---- aws/structure.go | 83 +++++++++++++------ .../lambda_event_source_mapping.html.markdown | 43 +++++++++- 4 files changed, 110 insertions(+), 77 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 56de42572b9..d412cf5c62c 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -163,23 +163,6 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, }, - /* - "self_managed_event_source": { - Type: schema.TypeList, - Optional: true, - MinItems: 1, - MaxItems: 1, - Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "endpoints": { - Type: schema.TypeMap, - Required: true, - Elem: &schema.Schema{Type: schema.TypeString}, - }, - }, - }, - }, - */ "self_managed_event_source": { Type: schema.TypeList, Optional: true, @@ -188,20 +171,10 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "endpoints": { - Type: schema.TypeList, + Type: schema.TypeMap, Required: true, - MaxItems: 1, - MinItems: 1, - Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "kafka_bootstrap_servers": { - Type: schema.TypeList, - Required: true, - ForceNew: true, - Elem: &schema.Schema{Type: schema.TypeString}, - }, - }, - }, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, }, }, }, @@ -400,7 +373,7 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf if err := d.Set("topics", flattenStringSet(eventSourceMappingConfiguration.Topics)); err != nil { return fmt.Errorf("error setting topics: %w", err) } - if err := d.Set("self_managed_event_source", flattenLambdaEventSourceMappingSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)); err != nil { + if err := d.Set("self_managed_event_source", flattenLambdaEventSourceMappingSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource, d)); err != nil { return fmt.Errorf("error setting self_managed_event_source: %w", err) } if err := d.Set("source_access_configuration", flattenLambdaEventSourceMappingSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations, d)); err != nil { diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index d8b2fec6b17..94367b53e60 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -48,13 +48,11 @@ func TestAccAWSLambdaEventSourceMapping_Kinesis_basic(t *testing.T) { PlanOnly: true, Config: testAccAWSLambdaEventSourceMappingConfigKinesisBatchSize(rName, "null"), }, - /* - { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, - }, - */ + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, { Config: testAccAWSLambdaEventSourceMappingConfigKinesisUpdateFunctionName(rName), Check: resource.ComposeTestCheckFunc( @@ -684,10 +682,7 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), - - resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.#", "1"), - resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.0.kafka_bootstrap_servers.#", "1"), - resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.0.kafka_bootstrap_servers.0", "test:9092"), + resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test2:9092,test1:9092"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.0.type", "VPC_SUBNET"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.1.type", "VPC_SUBNET"), @@ -704,11 +699,6 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { PlanOnly: true, Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "null"), }, - { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, - }, }, }) } @@ -1333,8 +1323,8 @@ resource "aws_lambda_event_source_mapping" "test" { starting_position = "TRIM_HORIZON" self_managed_event_source { - endpoints { - kafka_bootstrap_servers = [ "test:9092" ] + endpoints = { + KAFKA_BOOTSTRAP_SERVERS = "test2:9092,test1:9092" } } diff --git a/aws/structure.go b/aws/structure.go index 9d68d11998a..0cb141ede4e 100644 --- a/aws/structure.go +++ b/aws/structure.go @@ -1674,20 +1674,22 @@ func flattenLambdaEventSourceMappingDestinationConfig(dest *lambda.DestinationCo return []interface{}{mDest} } -func expandLambdaEventSourceMappingSelfManagedEventSource(vSource []interface{}) *lambda.SelfManagedEventSource { - if len(vSource) == 0 { +func expandLambdaEventSourceMappingSelfManagedEventSource(configured []interface{}) *lambda.SelfManagedEventSource { + if len(configured) == 0 { return nil } source := &lambda.SelfManagedEventSource{} source.Endpoints = map[string][]*string{} - if len(vSource) > 0 { - if config, ok := vSource[0].(map[string]interface{}); ok { - if vEndpoints, ok := config["endpoints"].([]interface{}); ok { - mEndpoints := vEndpoints[0].(map[string]interface{}) - if kafkaBootstrapServers, ok := mEndpoints["kafka_bootstrap_servers"]; ok { - source.Endpoints["KAFKA_BOOTSTRAP_SERVERS"] = expandStringList(kafkaBootstrapServers.([]interface{})) + if config, ok := configured[0].(map[string]interface{}); ok { + if endpoints, ok := config["endpoints"].(map[string]interface{}); ok { + for key, value := range endpoints { + values := strings.Split(value.(string), ",") + source.Endpoints[key] = make([]*string, len(values)) + for i, value := range values { + valueCopy := value + source.Endpoints[key][i] = &valueCopy } } } @@ -1695,28 +1697,54 @@ func expandLambdaEventSourceMappingSelfManagedEventSource(vSource []interface{}) return source } -func flattenLambdaEventSourceMappingSelfManagedEventSource(source *lambda.SelfManagedEventSource) []interface{} { - mSource := map[string]interface{}{} - mEndpoints := map[string]interface{}{} - if source != nil { - if source.Endpoints != nil { - if kafkaBootstrapBrokers, ok := source.Endpoints["KAFKA_BOOTSTRAP_SERVERS"]; ok { - mEndpoints["kafka_bootstrap_servers"] = flattenStringList(kafkaBootstrapBrokers) - mSource["endpoints"] = []interface{}{mEndpoints} +func flattenLambdaEventSourceMappingSelfManagedEventSource(source *lambda.SelfManagedEventSource, d *schema.ResourceData) []interface{} { + if source == nil { + return nil + } + + if source.Endpoints == nil { + return nil + } + + endpoints := map[string]string{} + for key, values := range source.Endpoints { + sValues := make([]string, len(values)) + for i, value := range values { + sValues[i] = *value + } + // The AWS API sorts the list of brokers so try to order the string by what + // is in the TF file to prevent spurious diffs. + curValue, ok := d.Get("self_managed_event_source.0.endpoints." + key).(string) + if !ok { + curValue = "" + } + curValues := strings.Split(curValue, ",") + if len(sValues) == len(curValues) { + for i := 0; i < len(curValues); i++ { + for j := 0; j < len(sValues); j++ { + if curValues[i] == sValues[j] { + sValues[i], sValues[j] = sValues[j], sValues[i] + break + } + } } } + endpoints[key] = strings.Join(sValues, ",") } - if len(mSource) == 0 { + if len(endpoints) == 0 { return nil } - return []interface{}{mSource} + config := map[string]interface{}{} + config["endpoints"] = endpoints + + return []interface{}{config} } -func expandLambdaEventSourceMappingSourceAccessConfigurations(v []interface{}) []*lambda.SourceAccessConfiguration { - accesses := make([]*lambda.SourceAccessConfiguration, 0, len(v)) - for _, m := range v { +func expandLambdaEventSourceMappingSourceAccessConfigurations(configured []interface{}) []*lambda.SourceAccessConfiguration { + accesses := make([]*lambda.SourceAccessConfiguration, 0, len(configured)) + for _, m := range configured { config := m.(map[string]interface{}) accesses = append(accesses, &lambda.SourceAccessConfiguration{ Type: aws.String(config["type"].(string)), @@ -1740,12 +1768,13 @@ func flattenLambdaEventSourceMappingSourceAccessConfigurations(accesses []*lambd } // The result returned from AWS is sorted so try to order it like the original to prevent spurious diffs if curCount, ok := d.Get("source_access_configuration.#").(int); ok { - for i := 0; i < curCount; i++ { - if curSetting, ok := d.Get("source_access_configuration." + strconv.Itoa(i)).(map[string]interface{}); ok { - for j := 0; j < len(settings); j++ { - if curSetting["type"] == *settings[j]["type"].(*string) && - curSetting["uri"] == *settings[j]["uri"].(*string) { - settings[i], settings[j] = settings[j], settings[i] + if curCount == len(settings) { + for i := 0; i < curCount; i++ { + if curSetting, ok := d.Get("source_access_configuration." + strconv.Itoa(i)).(map[string]interface{}); ok { + for j := 0; j < len(settings); j++ { + if curSetting["type"] == *settings[j]["type"].(*string) && curSetting["uri"] == *settings[j]["uri"].(*string) { + settings[i], settings[j] = settings[j], settings[i] + } } } } diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 9ffda81bef3..923a93fe0c5 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -46,6 +46,36 @@ resource "aws_lambda_event_source_mapping" "example" { } ``` +### Self Managed Apache Kafka + +```terraform +resource "aws_lambda_event_source_mapping" "example" { + function_name = aws_lambda_function.example.arn + topics = ["Example"] + starting_position = "TRIM_HORIZON" + + self_managed_event_source { + endpoints = { + KAFKA_BOOTSTRAP_SERVERS = "kafka1.example.com:9092,kafka2.example.com:9092" + } + } + + source_access_configuration { + type = "VPC_SUBNET" + uri = "subnet:subnet-example1" + } + + source_access_configuration { + type = "VPC_SUBNET" + uri = "subnet:subnet-example2" + } + + source_access_configuration { + type = "VPC_SECURITY_GROUP" + uri = "security_group:sg-example" + } +}``` + ### SQS ```terraform @@ -59,7 +89,7 @@ resource "aws_lambda_event_source_mapping" "example" { * `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB, Kinesis and MSK, `10` for SQS. * `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function. Only available for stream sources (DynamoDB and Kinesis) and SQS standard queues. -* `event_source_arn` - (Required) The event source ARN - can be a Kinesis stream, DynamoDB stream, SQS queue or MSK cluster. +* `event_source_arn` - (Optional) The event source ARN - this is required for Kinesis stream, DynamoDB stream, SQS queue or MSK cluster. It is incompatible with a Self Managed Kafka source. * `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`. * `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events. * `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis, DynamoDB or MSK. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType). @@ -70,6 +100,8 @@ resource "aws_lambda_event_source_mapping" "example" { * `bisect_batch_on_function_error`: - (Optional) If the function returns an error, split the batch in two and retry. Only available for stream sources (DynamoDB and Kinesis). Defaults to `false`. * `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified. * `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below. +* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. Detailed below. +* `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. Detailed below. ### destination_config Configuration Block @@ -79,6 +111,15 @@ resource "aws_lambda_event_source_mapping" "example" { * `destination_arn` - (Required) The Amazon Resource Name (ARN) of the destination resource. +### self_managed_event_source Configuration Block + +* `endpoints` - (Required) A map of endpoints for the self managed source. For Kafka self-managed sources, the key should be `KAFKA_BOOTSTRAP_SERVERS` and the value should be a string with a comma separated list of broker endpoints. + +### source_access_configuration Configuration Block + +* `type` - (Required) The type of this configuration. For Self Managed Kafka you will need to supply blocks for type `VPC_SUBNET` and `VPC_SECURITY_GROUP`. +* `uri` - (Required) The URI for this configuration. For type `VPC_SUBNET` the value should be `subnet:subnet_id` where `subnet_id` is the value you would find in an aws_subnet resource's id attribute. For type `VPC_SECURITY_GROUP` the value should be `security_group:security_group_id` where `security_group_id` is the value you would find in an aws_security_group resource's id attribute. + ## Attributes Reference In addition to all arguments above, the following attributes are exported: From b6266c16712054d4ead6d2fcef0dfc6ead541a0f Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Wed, 19 May 2021 15:15:20 -0700 Subject: [PATCH 05/18] Add ExactlyOneOf to both event_source_arn and self_managed_event_source. --- aws/resource_aws_lambda_event_source_mapping.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index d412cf5c62c..1aa480b3e17 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -33,9 +33,10 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Schema: map[string]*schema.Schema{ "event_source_arn": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, }, "function_name": { Type: schema.TypeString, @@ -164,10 +165,11 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, "self_managed_event_source": { - Type: schema.TypeList, - Optional: true, - MinItems: 1, - MaxItems: 1, + Type: schema.TypeList, + Optional: true, + MinItems: 1, + MaxItems: 1, + ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "endpoints": { From bfe1b734c3c1b7127dd7ef37a3dd3eaaf5391e4f Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Thu, 20 May 2021 09:50:46 -0700 Subject: [PATCH 06/18] Move the expands and flatten functions to resource_aws_lambda_event_source_mapping.go from structure.go. Add RequiredWith schema constraing to self_managed_event_source and source_access_configuration. --- ...esource_aws_lambda_event_source_mapping.go | 119 +++++++++++++++++- aws/structure.go | 109 ---------------- 2 files changed, 116 insertions(+), 112 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 1aa480b3e17..1fdbeca07b2 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -3,6 +3,8 @@ package aws import ( "fmt" "log" + "strconv" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -170,6 +172,7 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { MinItems: 1, MaxItems: 1, ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, + RequiredWith: []string{"self_managed_event_source", "source_access_configuration"}, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "endpoints": { @@ -182,9 +185,10 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, "source_access_configuration": { - Type: schema.TypeList, - Optional: true, - MinItems: 1, + Type: schema.TypeList, + Optional: true, + MinItems: 1, + RequiredWith: []string{"self_managed_event_source", "source_access_configuration"}, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "type": { @@ -534,3 +538,112 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte return resourceAwsLambdaEventSourceMappingRead(d, meta) } + +func expandLambdaEventSourceMappingSelfManagedEventSource(configured []interface{}) *lambda.SelfManagedEventSource { + if len(configured) == 0 { + return nil + } + + source := &lambda.SelfManagedEventSource{} + source.Endpoints = map[string][]*string{} + + if config, ok := configured[0].(map[string]interface{}); ok { + if endpoints, ok := config["endpoints"].(map[string]interface{}); ok { + for key, value := range endpoints { + values := strings.Split(value.(string), ",") + source.Endpoints[key] = make([]*string, len(values)) + for i, value := range values { + valueCopy := value + source.Endpoints[key][i] = &valueCopy + } + } + } + } + return source +} + +func flattenLambdaEventSourceMappingSelfManagedEventSource(source *lambda.SelfManagedEventSource, d *schema.ResourceData) []interface{} { + if source == nil { + return nil + } + + if source.Endpoints == nil { + return nil + } + + endpoints := map[string]string{} + for key, values := range source.Endpoints { + sValues := make([]string, len(values)) + for i, value := range values { + sValues[i] = *value + } + // The AWS API sorts the list of brokers so try to order the string by what + // is in the TF file to prevent spurious diffs. + curValue, ok := d.Get("self_managed_event_source.0.endpoints." + key).(string) + if !ok { + curValue = "" + } + curValues := strings.Split(curValue, ",") + if len(sValues) == len(curValues) { + for i := 0; i < len(curValues); i++ { + for j := 0; j < len(sValues); j++ { + if curValues[i] == sValues[j] { + sValues[i], sValues[j] = sValues[j], sValues[i] + break + } + } + } + } + endpoints[key] = strings.Join(sValues, ",") + } + + if len(endpoints) == 0 { + return nil + } + + config := map[string]interface{}{} + config["endpoints"] = endpoints + + return []interface{}{config} +} + +func expandLambdaEventSourceMappingSourceAccessConfigurations(configured []interface{}) []*lambda.SourceAccessConfiguration { + accesses := make([]*lambda.SourceAccessConfiguration, 0, len(configured)) + for _, m := range configured { + config := m.(map[string]interface{}) + accesses = append(accesses, &lambda.SourceAccessConfiguration{ + Type: aws.String(config["type"].(string)), + URI: aws.String(config["uri"].(string)), + }) + } + return accesses +} + +func flattenLambdaEventSourceMappingSourceAccessConfigurations(accesses []*lambda.SourceAccessConfiguration, d *schema.ResourceData) []map[string]interface{} { + if accesses == nil { + return nil + } + settings := make([]map[string]interface{}, len(accesses)) + + for i, access := range accesses { + setting := make(map[string]interface{}) + setting["type"] = access.Type + setting["uri"] = access.URI + settings[i] = setting + } + // The result returned from AWS is sorted so try to order it like the original to prevent spurious diffs + if curCount, ok := d.Get("source_access_configuration.#").(int); ok { + if curCount == len(settings) { + for i := 0; i < curCount; i++ { + if curSetting, ok := d.Get("source_access_configuration." + strconv.Itoa(i)).(map[string]interface{}); ok { + for j := 0; j < len(settings); j++ { + if curSetting["type"] == *settings[j]["type"].(*string) && curSetting["uri"] == *settings[j]["uri"].(*string) { + settings[i], settings[j] = settings[j], settings[i] + } + } + } + } + } + } + return settings +} diff --git a/aws/structure.go b/aws/structure.go index 0cb141ede4e..f515bceca76 100644 --- a/aws/structure.go +++ b/aws/structure.go @@ -1674,115 +1674,6 @@ func flattenLambdaEventSourceMappingDestinationConfig(dest *lambda.DestinationCo return []interface{}{mDest} } -func expandLambdaEventSourceMappingSelfManagedEventSource(configured []interface{}) *lambda.SelfManagedEventSource { - if len(configured) == 0 { - return nil - } - - source := &lambda.SelfManagedEventSource{} - source.Endpoints = map[string][]*string{} - - if config, ok := configured[0].(map[string]interface{}); ok { - if endpoints, ok := config["endpoints"].(map[string]interface{}); ok { - for key, value := range endpoints { - values := strings.Split(value.(string), ",") - source.Endpoints[key] = make([]*string, len(values)) - for i, value := range values { - valueCopy := value - source.Endpoints[key][i] = &valueCopy - } - } - } - } - return source -} - -func flattenLambdaEventSourceMappingSelfManagedEventSource(source *lambda.SelfManagedEventSource, d *schema.ResourceData) []interface{} { - if source == nil { - return nil - } - - if source.Endpoints == nil { - return nil - } - - endpoints := map[string]string{} - for key, values := range source.Endpoints { - sValues := make([]string, len(values)) - for i, value := range values { - sValues[i] = *value - } - // The AWS API sorts the list of brokers so try to order the string by what - // is in the TF file to prevent spurious diffs. - curValue, ok := d.Get("self_managed_event_source.0.endpoints." + key).(string) - if !ok { - curValue = "" - } - curValues := strings.Split(curValue, ",") - if len(sValues) == len(curValues) { - for i := 0; i < len(curValues); i++ { - for j := 0; j < len(sValues); j++ { - if curValues[i] == sValues[j] { - sValues[i], sValues[j] = sValues[j], sValues[i] - break - } - } - } - } - endpoints[key] = strings.Join(sValues, ",") - } - - if len(endpoints) == 0 { - return nil - } - - config := map[string]interface{}{} - config["endpoints"] = endpoints - - return []interface{}{config} -} - -func expandLambdaEventSourceMappingSourceAccessConfigurations(configured []interface{}) []*lambda.SourceAccessConfiguration { - accesses := make([]*lambda.SourceAccessConfiguration, 0, len(configured)) - for _, m := range configured { - config := m.(map[string]interface{}) - accesses = append(accesses, &lambda.SourceAccessConfiguration{ - Type: aws.String(config["type"].(string)), - URI: aws.String(config["uri"].(string)), - }) - } - return accesses -} - -func flattenLambdaEventSourceMappingSourceAccessConfigurations(accesses []*lambda.SourceAccessConfiguration, d *schema.ResourceData) []map[string]interface{} { - if accesses == nil { - return nil - } - settings := make([]map[string]interface{}, len(accesses)) - - for i, access := range accesses { - setting := make(map[string]interface{}) - setting["type"] = access.Type - setting["uri"] = access.URI - settings[i] = setting - } - // The result returned from AWS is sorted so try to order it like the original to prevent spurious diffs - if curCount, ok := d.Get("source_access_configuration.#").(int); ok { - if curCount == len(settings) { - for i := 0; i < curCount; i++ { - if curSetting, ok := d.Get("source_access_configuration." + strconv.Itoa(i)).(map[string]interface{}); ok { - for j := 0; j < len(settings); j++ { - if curSetting["type"] == *settings[j]["type"].(*string) && curSetting["uri"] == *settings[j]["uri"].(*string) { - settings[i], settings[j] = settings[j], settings[i] - } - } - } - } - } - } - return settings -} - func flattenLambdaLayers(layers []*lambda.Layer) []interface{} { arns := make([]*string, len(layers)) for i, layer := range layers { From 76d894ab6ed1d4980a0d5f8b3c9f4c84e1758a70 Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Thu, 20 May 2021 09:53:49 -0700 Subject: [PATCH 07/18] Update document to indicate schema requirement that self_managed_event_source and source_acccess_configuration must be set together. --- website/docs/r/lambda_event_source_mapping.html.markdown | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 923a93fe0c5..13d956116c1 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -100,8 +100,8 @@ resource "aws_lambda_event_source_mapping" "example" { * `bisect_batch_on_function_error`: - (Optional) If the function returns an error, split the batch in two and retry. Only available for stream sources (DynamoDB and Kinesis). Defaults to `false`. * `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified. * `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below. -* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. Detailed below. -* `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. Detailed below. +* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. Detailed below. If set, configuration must also include `source_access_configuration`. +* `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. Detailed below. If set, configuration must also include `self_managed_event_source`. ### destination_config Configuration Block From 9f16299e76645bb5f8941e45ad429e53ce1d2e35 Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Thu, 20 May 2021 10:00:27 -0700 Subject: [PATCH 08/18] Documentation formatting. --- website/docs/r/lambda_event_source_mapping.html.markdown | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 13d956116c1..a0833359aca 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -74,7 +74,8 @@ resource "aws_lambda_event_source_mapping" "example" { type = "VPC_SECURITY_GROUP" uri = "security_group:sg-example" } -}``` +} +``` ### SQS From 4a42925e1ee3476f782449f09672ddfe03eb5153 Mon Sep 17 00:00:00 2001 From: Robert Christ Date: Thu, 20 May 2021 10:02:19 -0700 Subject: [PATCH 09/18] Documentation word ordering. --- website/docs/r/lambda_event_source_mapping.html.markdown | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index a0833359aca..6bb1476364f 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -101,8 +101,8 @@ resource "aws_lambda_event_source_mapping" "example" { * `bisect_batch_on_function_error`: - (Optional) If the function returns an error, split the batch in two and retry. Only available for stream sources (DynamoDB and Kinesis). Defaults to `false`. * `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified. * `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below. -* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. Detailed below. If set, configuration must also include `source_access_configuration`. -* `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. Detailed below. If set, configuration must also include `self_managed_event_source`. +* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below. +* `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. If set, configuration must also include `self_managed_event_source`. Detailed below. ### destination_config Configuration Block From 31540bfe7c7e14f44d535599064f183b9854e1da Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 26 May 2021 09:15:12 -0400 Subject: [PATCH 10/18] r/aws_lambda_event_source_mapping: Change 'source_access_configuration' to TypeSet and add DiffSuppressFunc for 'self_managed_event_source.endpoints.KAFKA_BOOTSTRAP_SERVERS'. --- ...esource_aws_lambda_event_source_mapping.go | 655 ++++++++++-------- ...ce_aws_lambda_event_source_mapping_test.go | 528 ++++++-------- aws/structure.go | 39 -- 3 files changed, 597 insertions(+), 625 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 1fdbeca07b2..5752cad2b95 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -3,16 +3,14 @@ package aws import ( "fmt" "log" - "strconv" + "reflect" + "sort" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/arn" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/lambda" - "github.com/aws/aws-sdk-go/service/sqs" "github.com/hashicorp/aws-sdk-go-base/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -34,42 +32,6 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, Schema: map[string]*schema.Schema{ - "event_source_arn": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, - }, - "function_name": { - Type: schema.TypeString, - Required: true, - DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { - // Using function name or ARN should not be shown as a diff. - // Try to convert the old and new values from ARN to function name - oldFunctionName, oldFunctionNameErr := getFunctionNameFromLambdaArn(old) - newFunctionName, newFunctionNameErr := getFunctionNameFromLambdaArn(new) - return (oldFunctionName == new && oldFunctionNameErr == nil) || (newFunctionName == old && newFunctionNameErr == nil) - }, - }, - "starting_position": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - ValidateFunc: validation.StringInSlice(lambda.EventSourcePosition_Values(), false), - }, - "starting_position_timestamp": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - ValidateFunc: validation.IsRFC3339Time, - }, - "topics": { - Type: schema.TypeSet, - Optional: true, - ForceNew: true, - Elem: &schema.Schema{Type: schema.TypeString}, - Set: schema.HashString, - }, "batch_size": { Type: schema.TypeInt, Optional: true, @@ -83,69 +45,37 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { return false } - serviceName := "" + var serviceName string if v, ok := d.GetOk("event_source_arn"); ok { eventSourceARN, err := arn.Parse(v.(string)) if err != nil { return false } + serviceName = eventSourceARN.Service - } else { - // self managed kafka does not have an event_source_arn + } else if _, ok := d.GetOk("self_managed_event_source"); ok { serviceName = "kafka" } + switch serviceName { - // kafka.ServiceName is "kafka". - case dynamodb.ServiceName, kinesis.ServiceName, "kafka": - if old == "100" { - return true - } - case sqs.ServiceName: - if old == "10" { - return true - } + case "dynamodb", "kinesis", "kafka": + return old == "100" + case "sqs": + return old == "10" } - return false + + return old == new }, }, - "enabled": { - Type: schema.TypeBool, - Optional: true, - Default: true, - }, - "maximum_batching_window_in_seconds": { - Type: schema.TypeInt, - Optional: true, - }, - "parallelization_factor": { - Type: schema.TypeInt, - Optional: true, - ValidateFunc: validation.IntBetween(1, 10), - Computed: true, - }, - "maximum_retry_attempts": { - Type: schema.TypeInt, - Optional: true, - Computed: true, - ValidateFunc: validation.IntBetween(-1, 10_000), - }, - "maximum_record_age_in_seconds": { - Type: schema.TypeInt, - Optional: true, - Computed: true, - ValidateFunc: validation.Any( - validation.IntInSlice([]int{-1}), - validation.IntBetween(60, 604_800), - ), - }, + "bisect_batch_on_function_error": { Type: schema.TypeBool, Optional: true, }, + "destination_config": { Type: schema.TypeList, Optional: true, - MinItems: 1, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ @@ -165,14 +95,83 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, }, + DiffSuppressFunc: suppressMissingOptionalConfigurationBlock, }, - "self_managed_event_source": { - Type: schema.TypeList, + + "enabled": { + Type: schema.TypeBool, + Optional: true, + Default: true, + }, + + "event_source_arn": { + Type: schema.TypeString, Optional: true, - MinItems: 1, - MaxItems: 1, + ForceNew: true, ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, - RequiredWith: []string{"self_managed_event_source", "source_access_configuration"}, + }, + + "function_arn": { + Type: schema.TypeString, + Computed: true, + }, + + "function_name": { + Type: schema.TypeString, + Required: true, + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + // Using function name or ARN should not be shown as a diff. + // Try to convert the old and new values from ARN to function name + oldFunctionName, oldFunctionNameErr := getFunctionNameFromLambdaArn(old) + newFunctionName, newFunctionNameErr := getFunctionNameFromLambdaArn(new) + return (oldFunctionName == new && oldFunctionNameErr == nil) || (newFunctionName == old && newFunctionNameErr == nil) + }, + }, + + "last_modified": { + Type: schema.TypeString, + Computed: true, + }, + + "last_processing_result": { + Type: schema.TypeString, + Computed: true, + }, + + "maximum_batching_window_in_seconds": { + Type: schema.TypeInt, + Optional: true, + }, + + "maximum_record_age_in_seconds": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.Any( + validation.IntInSlice([]int{-1}), + validation.IntBetween(60, 604_800), + ), + }, + + "maximum_retry_attempts": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.IntBetween(-1, 10_000), + }, + + "parallelization_factor": { + Type: schema.TypeInt, + Optional: true, + ValidateFunc: validation.IntBetween(1, 10), + Computed: true, + }, + + "self_managed_event_source": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "endpoints": { @@ -180,20 +179,36 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Required: true, ForceNew: true, Elem: &schema.Schema{Type: schema.TypeString}, + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + if k == "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS" { + // AWS returns the bootstrap brokers in sorted order. + olds := strings.Split(old, ",") + sort.Strings(olds) + news := strings.Split(new, ",") + sort.Strings(news) + + return reflect.DeepEqual(olds, news) + } + + return old == new + }, }, }, }, + ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, + RequiredWith: []string{"source_access_configuration"}, }, + "source_access_configuration": { - Type: schema.TypeList, - Optional: true, - MinItems: 1, - RequiredWith: []string{"self_managed_event_source", "source_access_configuration"}, + Type: schema.TypeSet, + Optional: true, + MaxItems: 22, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "type": { - Type: schema.TypeString, - Required: true, + Type: schema.TypeString, + Required: true, + ValidateFunc: validation.StringInSlice(lambda.SourceAccessType_Values(), false), }, "uri": { Type: schema.TypeString, @@ -201,27 +216,40 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, }, + RequiredWith: []string{"self_managed_event_source"}, }, - "function_arn": { - Type: schema.TypeString, - Computed: true, - }, - "last_modified": { - Type: schema.TypeString, - Computed: true, + + "starting_position": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.StringInSlice(lambda.EventSourcePosition_Values(), false), }, - "last_processing_result": { - Type: schema.TypeString, - Computed: true, + + "starting_position_timestamp": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.IsRFC3339Time, }, + "state": { Type: schema.TypeString, Computed: true, }, + "state_transition_reason": { Type: schema.TypeString, Computed: true, }, + + "topics": { + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "uuid": { Type: schema.TypeString, Computed: true, @@ -230,18 +258,16 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { } } -// resourceAwsLambdaEventSourceMappingCreate maps to: -// CreateEventSourceMapping in the API / SDK func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).lambdaconn + functionName := d.Get("function_name").(string) input := &lambda.CreateEventSourceMappingInput{ - Enabled: aws.Bool(d.Get("enabled").(bool)), + Enabled: aws.Bool(d.Get("enabled").(bool)), + FunctionName: aws.String(functionName), } - if v, ok := d.GetOk("function_name"); ok { - input.FunctionName = aws.String(v.(string)) - } + var target string if v, ok := d.GetOk("batch_size"); ok { input.BatchSize = aws.Int64(int64(v.(int))) @@ -251,12 +277,15 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.BisectBatchOnFunctionError = aws.Bool(v.(bool)) } - if vDest, ok := d.GetOk("destination_config"); ok { - input.DestinationConfig = expandLambdaEventSourceMappingDestinationConfig(vDest.([]interface{})) + if v, ok := d.GetOk("destination_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.DestinationConfig = expandLambdaDestinationConfig(v.([]interface{})[0].(map[string]interface{})) } if v, ok := d.GetOk("event_source_arn"); ok { - input.EventSourceArn = aws.String(v.(string)) + v := v.(string) + + input.EventSourceArn = aws.String(v) + target = v } if v, ok := d.GetOk("maximum_batching_window_in_seconds"); ok { @@ -275,6 +304,16 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.ParallelizationFactor = aws.Int64(int64(v.(int))) } + if v, ok := d.GetOk("self_managed_event_source"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.SelfManagedEventSource = expandLambdaSelfManagedEventSource(v.([]interface{})[0].(map[string]interface{})) + + target = "Self-Managed Apache Kafka" + } + + if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 { + input.SourceAccessConfigurations = expandLambdaSourceAccessConfigurations(v.(*schema.Set).List()) + } + if v, ok := d.GetOk("starting_position"); ok { input.StartingPosition = aws.String(v.(string)) } @@ -285,21 +324,10 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.StartingPositionTimestamp = aws.Time(t) } - if v, ok := d.GetOk("self_managed_event_source"); ok { - input.SelfManagedEventSource = expandLambdaEventSourceMappingSelfManagedEventSource(v.([]interface{})) - } - - if v, ok := d.GetOk("source_access_configuration"); ok { - input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(v.([]interface{})) - } - if v, ok := d.GetOk("topics"); ok && v.(*schema.Set).Len() > 0 { input.Topics = expandStringSet(v.(*schema.Set)) } - // When non-ARN targets are supported, set target to the non-nil value. - target := input.EventSourceArn - log.Printf("[DEBUG] Creating Lambda Event Source Mapping: %s", input) // IAM profiles and roles can take some time to propagate in AWS: @@ -330,7 +358,7 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte } if err != nil { - return fmt.Errorf("error creating Lambda Event Source Mapping (%s): %w", aws.StringValue(target), err) + return fmt.Errorf("error creating Lambda Event Source Mapping (%s): %w", target, err) } d.SetId(aws.StringValue(eventSourceMappingConfiguration.UUID)) @@ -342,8 +370,6 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte return resourceAwsLambdaEventSourceMappingRead(d, meta) } -// resourceAwsLambdaEventSourceMappingRead maps to: -// GetEventSourceMapping in the API / SDK func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).lambdaconn @@ -360,103 +386,61 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf } d.Set("batch_size", eventSourceMappingConfiguration.BatchSize) - d.Set("maximum_batching_window_in_seconds", eventSourceMappingConfiguration.MaximumBatchingWindowInSeconds) + d.Set("bisect_batch_on_function_error", eventSourceMappingConfiguration.BisectBatchOnFunctionError) + if eventSourceMappingConfiguration.DestinationConfig != nil { + if err := d.Set("destination_config", []interface{}{flattenLambdaDestinationConfig(eventSourceMappingConfiguration.DestinationConfig)}); err != nil { + return fmt.Errorf("error setting destination_config: %w", err) + } + } else { + d.Set("destination_config", nil) + } d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn) d.Set("function_arn", eventSourceMappingConfiguration.FunctionArn) - d.Set("last_modified", aws.TimeValue(eventSourceMappingConfiguration.LastModified).Format(time.RFC3339)) - d.Set("last_processing_result", eventSourceMappingConfiguration.LastProcessingResult) - d.Set("state", eventSourceMappingConfiguration.State) - d.Set("state_transition_reason", eventSourceMappingConfiguration.StateTransitionReason) - d.Set("uuid", eventSourceMappingConfiguration.UUID) d.Set("function_name", eventSourceMappingConfiguration.FunctionArn) - d.Set("parallelization_factor", eventSourceMappingConfiguration.ParallelizationFactor) - d.Set("maximum_retry_attempts", eventSourceMappingConfiguration.MaximumRetryAttempts) - d.Set("maximum_record_age_in_seconds", eventSourceMappingConfiguration.MaximumRecordAgeInSeconds) - d.Set("bisect_batch_on_function_error", eventSourceMappingConfiguration.BisectBatchOnFunctionError) - if err := d.Set("destination_config", flattenLambdaEventSourceMappingDestinationConfig(eventSourceMappingConfiguration.DestinationConfig)); err != nil { - return fmt.Errorf("error setting destination_config: %w", err) - } - if err := d.Set("topics", flattenStringSet(eventSourceMappingConfiguration.Topics)); err != nil { - return fmt.Errorf("error setting topics: %w", err) + if eventSourceMappingConfiguration.LastModified != nil { + d.Set("last_modified", aws.TimeValue(eventSourceMappingConfiguration.LastModified).Format(time.RFC3339)) + } else { + d.Set("last_modified", nil) } - if err := d.Set("self_managed_event_source", flattenLambdaEventSourceMappingSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource, d)); err != nil { - return fmt.Errorf("error setting self_managed_event_source: %w", err) + d.Set("last_processing_result", eventSourceMappingConfiguration.LastProcessingResult) + d.Set("maximum_batching_window_in_seconds", eventSourceMappingConfiguration.MaximumBatchingWindowInSeconds) + d.Set("maximum_record_age_in_seconds", eventSourceMappingConfiguration.MaximumRecordAgeInSeconds) + d.Set("maximum_retry_attempts", eventSourceMappingConfiguration.MaximumRetryAttempts) + d.Set("parallelization_factor", eventSourceMappingConfiguration.ParallelizationFactor) + if eventSourceMappingConfiguration.SelfManagedEventSource != nil { + if err := d.Set("self_managed_event_source", []interface{}{flattenLambdaSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)}); err != nil { + return fmt.Errorf("error setting self_managed_event_source: %w", err) + } + } else { + d.Set("self_managed_event_source", nil) } - if err := d.Set("source_access_configuration", flattenLambdaEventSourceMappingSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations, d)); err != nil { + if err := d.Set("source_access_configuration", flattenLambdaSourceAccessConfigurations(eventSourceMappingConfiguration.SourceAccessConfigurations)); err != nil { return fmt.Errorf("error setting source_access_configuration: %w", err) } - d.Set("starting_position", eventSourceMappingConfiguration.StartingPosition) if eventSourceMappingConfiguration.StartingPositionTimestamp != nil { d.Set("starting_position_timestamp", aws.TimeValue(eventSourceMappingConfiguration.StartingPositionTimestamp).Format(time.RFC3339)) } else { d.Set("starting_position_timestamp", nil) } + d.Set("state", eventSourceMappingConfiguration.State) + d.Set("state_transition_reason", eventSourceMappingConfiguration.StateTransitionReason) + d.Set("topics", aws.StringValueSlice(eventSourceMappingConfiguration.Topics)) + d.Set("uuid", eventSourceMappingConfiguration.UUID) - state := aws.StringValue(eventSourceMappingConfiguration.State) - - switch state { + switch state := d.Get("state").(string); state { case waiter.EventSourceMappingStateEnabled, waiter.EventSourceMappingStateEnabling: d.Set("enabled", true) case waiter.EventSourceMappingStateDisabled, waiter.EventSourceMappingStateDisabling: d.Set("enabled", false) default: - log.Printf("[WARN] Lambda Event Source Mapping is neither enabled nor disabled but %s", state) - } - - return nil -} - -// resourceAwsLambdaEventSourceMappingDelete maps to: -// DeleteEventSourceMapping in the API / SDK -func resourceAwsLambdaEventSourceMappingDelete(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).lambdaconn - - log.Printf("[INFO] Deleting Lambda Event Source Mapping: %s", d.Id()) - - input := &lambda.DeleteEventSourceMappingInput{ - UUID: aws.String(d.Id()), - } - - err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { - _, err := conn.DeleteEventSourceMapping(input) - - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) { - return nil - } - - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) { - return resource.RetryableError(err) - } - - if err != nil { - return resource.NonRetryableError(err) - } - - return nil - }) - - if tfresource.TimedOut(err) { - _, err = conn.DeleteEventSourceMapping(input) - } - - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) { - return nil - } - - if err != nil { - return fmt.Errorf("error deleting Lambda Event Source Mapping (%s): %w", d.Id(), err) - } - - if _, err := waiter.EventSourceMappingDelete(conn, d.Id()); err != nil { - return fmt.Errorf("error waiting for Lambda Event Source Mapping (%s) to delete: %w", d.Id(), err) + log.Printf("[WARN] Lambda Event Source Mapping (%s) is neither enabled nor disabled, but %s", d.Id(), state) + d.Set("enabled", nil) } return nil } -// resourceAwsLambdaEventSourceMappingUpdate maps to: -// UpdateEventSourceMapping in the API / SDK func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).lambdaconn @@ -475,7 +459,9 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte } if d.HasChange("destination_config") { - input.DestinationConfig = expandLambdaEventSourceMappingDestinationConfig(d.Get("destination_config").([]interface{})) + if v, ok := d.GetOk("destination_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.DestinationConfig = expandLambdaDestinationConfig(v.([]interface{})[0].(map[string]interface{})) + } } if d.HasChange("enabled") { @@ -503,7 +489,9 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte } if d.HasChange("source_access_configuration") { - input.SourceAccessConfigurations = expandLambdaEventSourceMappingSourceAccessConfigurations(d.Get("source_access_configuration").([]interface{})) + if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 { + input.SourceAccessConfigurations = expandLambdaSourceAccessConfigurations(v.(*schema.Set).List()) + } } err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { @@ -539,111 +527,220 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte return resourceAwsLambdaEventSourceMappingRead(d, meta) } -func expandLambdaEventSourceMappingSelfManagedEventSource(configured []interface{}) *lambda.SelfManagedEventSource { - if len(configured) == 0 { - return nil +func resourceAwsLambdaEventSourceMappingDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).lambdaconn + + log.Printf("[INFO] Deleting Lambda Event Source Mapping: %s", d.Id()) + + input := &lambda.DeleteEventSourceMappingInput{ + UUID: aws.String(d.Id()), } - source := &lambda.SelfManagedEventSource{} - source.Endpoints = map[string][]*string{} + err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { + _, err := conn.DeleteEventSourceMapping(input) + + if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) { + return resource.RetryableError(err) + } - if config, ok := configured[0].(map[string]interface{}); ok { - if endpoints, ok := config["endpoints"].(map[string]interface{}); ok { - for key, value := range endpoints { - values := strings.Split(value.(string), ",") - source.Endpoints[key] = make([]*string, len(values)) - for i, value := range values { - valueCopy := value - source.Endpoints[key][i] = &valueCopy - } - } + if err != nil { + return resource.NonRetryableError(err) } + + return nil + }) + + if tfresource.TimedOut(err) { + _, err = conn.DeleteEventSourceMapping(input) + } + + if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceNotFoundException) { + return nil + } + + if err != nil { + return fmt.Errorf("error deleting Lambda Event Source Mapping (%s): %w", d.Id(), err) + } + + if _, err := waiter.EventSourceMappingDelete(conn, d.Id()); err != nil { + return fmt.Errorf("error waiting for Lambda Event Source Mapping (%s) to delete: %w", d.Id(), err) + } + + return nil +} + +func expandLambdaDestinationConfig(tfMap map[string]interface{}) *lambda.DestinationConfig { + if tfMap == nil { + return nil + } + + apiObject := &lambda.DestinationConfig{} + + if v, ok := tfMap["on_failure"].([]interface{}); ok && len(v) > 0 { + apiObject.OnFailure = expandLambdaOnFailure(v[0].(map[string]interface{})) + } + + return apiObject +} + +func expandLambdaOnFailure(tfMap map[string]interface{}) *lambda.OnFailure { + if tfMap == nil { + return nil + } + + apiObject := &lambda.OnFailure{} + + if v, ok := tfMap["destination_arn"].(string); ok && v != "" { + apiObject.Destination = aws.String(v) + } + + return apiObject +} + +func flattenLambdaDestinationConfig(apiObject *lambda.DestinationConfig) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.OnFailure; v != nil { + tfMap["on_failure"] = []interface{}{flattenLambdaOnFailure(v)} + } + + return tfMap +} + +func flattenLambdaOnFailure(apiObject *lambda.OnFailure) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.Destination; v != nil { + tfMap["destination_arn"] = aws.StringValue(v) } - return source + + return tfMap } -func flattenLambdaEventSourceMappingSelfManagedEventSource(source *lambda.SelfManagedEventSource, d *schema.ResourceData) []interface{} { - if source == nil { +func expandLambdaSelfManagedEventSource(tfMap map[string]interface{}) *lambda.SelfManagedEventSource { + if tfMap == nil { return nil } - if source.Endpoints == nil { + apiObject := &lambda.SelfManagedEventSource{} + + if v, ok := tfMap["endpoints"].(map[string]interface{}); ok && len(v) > 0 { + m := map[string][]*string{} + + for k, v := range v { + m[k] = aws.StringSlice(strings.Split(v.(string), ",")) + } + + apiObject.Endpoints = m + } + + return apiObject +} + +func flattenLambdaSelfManagedEventSource(apiObject *lambda.SelfManagedEventSource) map[string]interface{} { + if apiObject == nil { return nil } - endpoints := map[string]string{} - for key, values := range source.Endpoints { - sValues := make([]string, len(values)) - for i, value := range values { - sValues[i] = *value + tfMap := map[string]interface{}{} + + if v := apiObject.Endpoints; v != nil { + m := map[string]string{} + + for k, v := range v { + m[k] = strings.Join(aws.StringValueSlice(v), ",") } - // The AWS API sorts the list of brokers so try to order the string by what - // is in the TF file to prevent spurious diffs. - curValue, ok := d.Get("self_managed_event_source.0.endpoints." + key).(string) + + tfMap["endpoints"] = m + } + + return tfMap +} + +func expandLambdaSourceAccessConfiguration(tfMap map[string]interface{}) *lambda.SourceAccessConfiguration { + if tfMap == nil { + return nil + } + + apiObject := &lambda.SourceAccessConfiguration{} + + if v, ok := tfMap["type"].(string); ok && v != "" { + apiObject.Type = aws.String(v) + } + + if v, ok := tfMap["uri"].(string); ok && v != "" { + apiObject.URI = aws.String(v) + } + + return apiObject +} + +func expandLambdaSourceAccessConfigurations(tfList []interface{}) []*lambda.SourceAccessConfiguration { + if len(tfList) == 0 { + return nil + } + + var apiObjects []*lambda.SourceAccessConfiguration + + for _, tfMapRaw := range tfList { + tfMap, ok := tfMapRaw.(map[string]interface{}) + if !ok { - curValue = "" + continue } - curValues := strings.Split(curValue, ",") - if len(sValues) == len(curValues) { - for i := 0; i < len(curValues); i++ { - for j := 0; j < len(sValues); j++ { - if curValues[i] == sValues[j] { - sValues[i], sValues[j] = sValues[j], sValues[i] - break - } - } - } + + apiObject := expandLambdaSourceAccessConfiguration(tfMap) + + if apiObject == nil { + continue } - endpoints[key] = strings.Join(sValues, ",") + + apiObjects = append(apiObjects, apiObject) } - if len(endpoints) == 0 { + return apiObjects +} + +func flattenLambdaSourceAccessConfiguration(apiObject *lambda.SourceAccessConfiguration) map[string]interface{} { + if apiObject == nil { return nil } - config := map[string]interface{}{} - config["endpoints"] = endpoints + tfMap := map[string]interface{}{} - return []interface{}{config} -} + if v := apiObject.Type; v != nil { + tfMap["type"] = aws.StringValue(v) + } -func expandLambdaEventSourceMappingSourceAccessConfigurations(configured []interface{}) []*lambda.SourceAccessConfiguration { - accesses := make([]*lambda.SourceAccessConfiguration, 0, len(configured)) - for _, m := range configured { - config := m.(map[string]interface{}) - accesses = append(accesses, &lambda.SourceAccessConfiguration{ - Type: aws.String(config["type"].(string)), - URI: aws.String(config["uri"].(string)), - }) + if v := apiObject.URI; v != nil { + tfMap["uri"] = aws.StringValue(v) } - return accesses + + return tfMap } -func flattenLambdaEventSourceMappingSourceAccessConfigurations(accesses []*lambda.SourceAccessConfiguration, d *schema.ResourceData) []map[string]interface{} { - if accesses == nil { +func flattenLambdaSourceAccessConfigurations(apiObjects []*lambda.SourceAccessConfiguration) []interface{} { + if len(apiObjects) == 0 { return nil } - settings := make([]map[string]interface{}, len(accesses)) - - for i, access := range accesses { - setting := make(map[string]interface{}) - setting["type"] = access.Type - setting["uri"] = access.URI - settings[i] = setting - } - // The result returned from AWS is sorted so try to order it like the original to prevent spurious diffs - if curCount, ok := d.Get("source_access_configuration.#").(int); ok { - if curCount == len(settings) { - for i := 0; i < curCount; i++ { - if curSetting, ok := d.Get("source_access_configuration." + strconv.Itoa(i)).(map[string]interface{}); ok { - for j := 0; j < len(settings); j++ { - if curSetting["type"] == *settings[j]["type"].(*string) && curSetting["uri"] == *settings[j]["uri"].(*string) { - settings[i], settings[j] = settings[j], settings[i] - } - } - } - } + + var tfList []interface{} + + for _, apiObject := range apiObjects { + if apiObject == nil { + continue } + + tfList = append(tfList, flattenLambdaSourceAccessConfiguration(apiObject)) } - return settings + + return tfList } diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 94367b53e60..51854bbef46 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -677,16 +677,14 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, Steps: []resource.TestStep{ { - Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "100"), + Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "100", "test1:9092,test2:9092"), Check: resource.ComposeTestCheckFunc( testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), + resource.TestCheckResourceAttr(resourceName, "enabled", "false"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), - resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test2:9092,test1:9092"), + resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), - resource.TestCheckResourceAttr(resourceName, "source_access_configuration.0.type", "VPC_SUBNET"), - resource.TestCheckResourceAttr(resourceName, "source_access_configuration.1.type", "VPC_SUBNET"), - resource.TestCheckResourceAttr(resourceName, "source_access_configuration.2.type", "VPC_SECURITY_GROUP"), testAccCheckResourceAttrRfc3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), @@ -695,9 +693,10 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { // batch_size became optional. Ensure that if the user supplies the default // value, but then moves to not providing the value, that we don't consider this // a diff. + // Verify also that bootstrap broker order does not matter. { PlanOnly: true, - Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "null"), + Config: testAccAWSLambdaEventSourceMappingConfigSelfManagedKafka(rName, "null", "test2:9092,test1:9092"), }, }, }) @@ -865,6 +864,214 @@ resource "aws_lambda_function" "test" { `, rName) } +func testAccAWSLambdaEventSourceMappingConfigSQSBase(rName string) string { + return fmt.Sprintf(` +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = < 0 { - if config, ok := vDest[0].(map[string]interface{}); ok { - if vOnFailure, ok := config["on_failure"].([]interface{}); ok && len(vOnFailure) > 0 && vOnFailure[0] != nil { - mOnFailure := vOnFailure[0].(map[string]interface{}) - onFailure.SetDestination(mOnFailure["destination_arn"].(string)) - } - } - } - dest.SetOnFailure(onFailure) - return dest -} - -func flattenLambdaEventSourceMappingDestinationConfig(dest *lambda.DestinationConfig) []interface{} { - mDest := map[string]interface{}{} - mOnFailure := map[string]interface{}{} - if dest != nil { - if dest.OnFailure != nil { - if dest.OnFailure.Destination != nil { - mOnFailure["destination_arn"] = *dest.OnFailure.Destination - mDest["on_failure"] = []interface{}{mOnFailure} - } - } - } - - if len(mDest) == 0 { - return nil - } - - return []interface{}{mDest} -} - func flattenLambdaLayers(layers []*lambda.Layer) []interface{} { arns := make([]*string, len(layers)) for i, layer := range layers { From 64c6e05096893259fc29ef48c5ecaf32417d1549 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 27 May 2021 10:01:54 -0400 Subject: [PATCH 11/18] r/aws_lambda_event_source_mapping: Focus Create retry on IAM propagation errors only (#14042). --- aws/resource_aws_lambda_event_source_mapping.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 5752cad2b95..79aa72f5d69 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -342,7 +342,7 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte err = resource.Retry(iamwaiter.PropagationTimeout, func() *resource.RetryError { eventSourceMappingConfiguration, err = conn.CreateEventSourceMapping(input) - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeInvalidParameterValueException) { + if tfawserr.ErrMessageContains(err, lambda.ErrCodeInvalidParameterValueException, "cannot be assumed by Lambda") { return resource.RetryableError(err) } @@ -497,10 +497,6 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { _, err := conn.UpdateEventSourceMapping(input) - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeInvalidParameterValueException) { - return resource.RetryableError(err) - } - if tfawserr.ErrCodeEquals(err, lambda.ErrCodeResourceInUseException) { return resource.RetryableError(err) } From e05d007c2c458868c839359eb58669d8b8813fff Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 27 May 2021 10:23:48 -0400 Subject: [PATCH 12/18] r/aws_lambda_event_source_mapping: Add `tumbling_window_in_seconds` argument. Acceptance test output: % make testacc TEST=./aws TESTARGS='-run=TestAccAWSLambdaEventSourceMapping_Kinesis_TumblingWindowInSeconds' ==> Checking that code complies with gofmt requirements... TF_ACC=1 go test ./aws -v -count 1 -parallel 20 -run=TestAccAWSLambdaEventSourceMapping_Kinesis_TumblingWindowInSeconds -timeout 180m === RUN TestAccAWSLambdaEventSourceMapping_Kinesis_TumblingWindowInSeconds === PAUSE TestAccAWSLambdaEventSourceMapping_Kinesis_TumblingWindowInSeconds === CONT TestAccAWSLambdaEventSourceMapping_Kinesis_TumblingWindowInSeconds --- PASS: TestAccAWSLambdaEventSourceMapping_Kinesis_TumblingWindowInSeconds (71.47s) PASS ok github.com/terraform-providers/terraform-provider-aws/aws 74.769s --- .changelog/19425.txt | 6 ++- ...esource_aws_lambda_event_source_mapping.go | 15 ++++++ ...ce_aws_lambda_event_source_mapping_test.go | 51 +++++++++++++++++++ .../lambda_event_source_mapping.html.markdown | 19 +++---- 4 files changed, 81 insertions(+), 10 deletions(-) diff --git a/.changelog/19425.txt b/.changelog/19425.txt index 8fd11e459b3..e78934729e4 100644 --- a/.changelog/19425.txt +++ b/.changelog/19425.txt @@ -1,3 +1,7 @@ ```release-notes:enhancement -resource/aws_lambda_event_source_mapping: Add `self_managed_event_source`, `source_access_configuration` to allow for self managed kafka cluster. +resource/aws_lambda_event_source_mapping: Add `self_managed_event_source` and `source_access_configuration` arguments to support self-managed Apache Kafka event sources ``` + +```release-notes:enhancement +resource/aws_lambda_event_source_mapping: Add `tumbling_window_in_seconds` argument to support AWS Lambda streaming analytics calculations +``` \ No newline at end of file diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 79aa72f5d69..d8162e2bf23 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -250,6 +250,12 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Elem: &schema.Schema{Type: schema.TypeString}, }, + "tumbling_window_in_seconds": { + Type: schema.TypeInt, + Optional: true, + ValidateFunc: validation.IntBetween(0, 900), + }, + "uuid": { Type: schema.TypeString, Computed: true, @@ -328,6 +334,10 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.Topics = expandStringSet(v.(*schema.Set)) } + if v, ok := d.GetOk("tumbling_window_in_seconds"); ok { + input.TumblingWindowInSeconds = aws.Int64(int64(v.(int))) + } + log.Printf("[DEBUG] Creating Lambda Event Source Mapping: %s", input) // IAM profiles and roles can take some time to propagate in AWS: @@ -426,6 +436,7 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf d.Set("state", eventSourceMappingConfiguration.State) d.Set("state_transition_reason", eventSourceMappingConfiguration.StateTransitionReason) d.Set("topics", aws.StringValueSlice(eventSourceMappingConfiguration.Topics)) + d.Set("tumbling_window_in_seconds", eventSourceMappingConfiguration.TumblingWindowInSeconds) d.Set("uuid", eventSourceMappingConfiguration.UUID) switch state := d.Get("state").(string); state { @@ -494,6 +505,10 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte } } + if d.HasChange("tumbling_window_in_seconds") { + input.TumblingWindowInSeconds = aws.Int64(int64(d.Get("tumbling_window_in_seconds").(int))) + } + err := resource.Retry(waiter.EventSourceMappingPropagationTimeout, func() *resource.RetryError { _, err := conn.UpdateEventSourceMapping(input) diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 51854bbef46..e7a0e1fbe92 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -39,6 +39,7 @@ func TestAccAWSLambdaEventSourceMapping_Kinesis_basic(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "function_name", functionResourceName, "arn"), testAccCheckResourceAttrRfc3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"), ), }, // batch_size became optional. Ensure that if the user supplies the default @@ -145,6 +146,7 @@ func TestAccAWSLambdaEventSourceMapping_DynamoDB_basic(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "function_name", functionResourceName, "arn"), testAccCheckResourceAttrRfc3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"), ), }, // batch_size became optional. Ensure that if the user supplies the default @@ -346,6 +348,42 @@ func TestAccAWSLambdaEventSourceMapping_Kinesis_ParallelizationFactor(t *testing }) } +func TestAccAWSLambdaEventSourceMapping_Kinesis_TumblingWindowInSeconds(t *testing.T) { + var conf lambda.EventSourceMappingConfiguration + rName := acctest.RandomWithPrefix("tf-acc-test") + resourceName := "aws_lambda_event_source_mapping.test" + tumblingWindowInSeconds := int64(30) + tumblingWindowInSecondsUpdate := int64(300) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID), + Providers: testAccProviders, + CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSLambdaEventSourceMappingConfigKinesisTumblingWindowInSeconds(rName, tumblingWindowInSeconds), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", strconv.Itoa(int(tumblingWindowInSeconds))), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccAWSLambdaEventSourceMappingConfigKinesisTumblingWindowInSeconds(rName, tumblingWindowInSecondsUpdate), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", strconv.Itoa(int(tumblingWindowInSecondsUpdate))), + ), + }, + }, + }) +} + func TestAccAWSLambdaEventSourceMapping_Kinesis_MaximumRetryAttempts(t *testing.T) { var conf lambda.EventSourceMappingConfiguration rName := acctest.RandomWithPrefix("tf-acc-test") @@ -1111,6 +1149,19 @@ resource "aws_lambda_event_source_mapping" "test" { `, parallelizationFactor)) } +func testAccAWSLambdaEventSourceMappingConfigKinesisTumblingWindowInSeconds(rName string, tumblingWindowInSeconds int64) string { + return composeConfig(testAccAWSLambdaEventSourceMappingConfigKinesisBase(rName), fmt.Sprintf(` +resource "aws_lambda_event_source_mapping" "test" { + batch_size = 100 + tumbling_window_in_seconds = %[1]d + enabled = true + event_source_arn = aws_kinesis_stream.test.arn + function_name = aws_lambda_function.test.arn + starting_position = "TRIM_HORIZON" +} +`, tumblingWindowInSeconds)) +} + func testAccAWSLambdaEventSourceMappingConfigKinesisMaximumRetryAttempts(rName string, maximumRetryAttempts int64) string { return composeConfig(testAccAWSLambdaEventSourceMappingConfigKinesisBase(rName), fmt.Sprintf(` resource "aws_lambda_event_source_mapping" "test" { diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 6bb1476364f..580a6419943 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -89,20 +89,21 @@ resource "aws_lambda_event_source_mapping" "example" { ## Argument Reference * `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB, Kinesis and MSK, `10` for SQS. -* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function. Only available for stream sources (DynamoDB and Kinesis) and SQS standard queues. -* `event_source_arn` - (Optional) The event source ARN - this is required for Kinesis stream, DynamoDB stream, SQS queue or MSK cluster. It is incompatible with a Self Managed Kafka source. +* `bisect_batch_on_function_error`: - (Optional) If the function returns an error, split the batch in two and retry. Only available for stream sources (DynamoDB and Kinesis). Defaults to `false`. +* `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below. * `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`. +* `event_source_arn` - (Optional) The event source ARN - this is required for Kinesis stream, DynamoDB stream, SQS queue or MSK cluster. It is incompatible with a Self Managed Kafka source. * `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events. -* `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis, DynamoDB or MSK. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType). -* `starting_position_timestamp` - (Optional) A timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of the data record which to start reading when using `starting_position` set to `AT_TIMESTAMP`. If a record with this exact timestamp does not exist, the next later record is chosen. If the timestamp is older than the current trim horizon, the oldest available record is chosen. -* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10. -* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000. +* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function. Only available for stream sources (DynamoDB and Kinesis) and SQS standard queues. * `maximum_record_age_in_seconds`: - (Optional) The maximum age of a record that Lambda sends to a function for processing. Only available for stream sources (DynamoDB and Kinesis). Must be either -1 (forever, and the default value) or between 60 and 604800 (inclusive). -* `bisect_batch_on_function_error`: - (Optional) If the function returns an error, split the batch in two and retry. Only available for stream sources (DynamoDB and Kinesis). Defaults to `false`. -* `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified. -* `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below. +* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000. +* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10. * `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below. * `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. If set, configuration must also include `self_managed_event_source`. Detailed below. +* `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis, DynamoDB or MSK. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType). +* `starting_position_timestamp` - (Optional) A timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of the data record which to start reading when using `starting_position` set to `AT_TIMESTAMP`. If a record with this exact timestamp does not exist, the next later record is chosen. If the timestamp is older than the current trim horizon, the oldest available record is chosen. +* `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified. +* `tumbling_window_in_seconds` - (Optional) The duration in seconds of a processing window for [streaming analytics](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows). The range is between 1 second up to 900 seconds. Only available for stream sources (DynamoDB and Kinesis). ### destination_config Configuration Block From 6c691a1d6a61cd69e50597b0499263e926630e3c Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 27 May 2021 10:52:26 -0400 Subject: [PATCH 13/18] r/aws_lambda_event_source_mapping: Add `function_response_types` argument. Acceptance test output: % make testacc TEST=./aws TESTARGS='-run=TestAccAWSLambdaEventSourceMapping_Kinesis_basic\|TestAccAWSLambdaEventSourceMapping_DynamoDB_FunctionResponseTypes\|TestAccAWSLambdaEventSourceMapping_DynamoDB_basic' ==> Checking that code complies with gofmt requirements... TF_ACC=1 go test ./aws -v -count 1 -parallel 20 -run=TestAccAWSLambdaEventSourceMapping_Kinesis_basic\|TestAccAWSLambdaEventSourceMapping_DynamoDB_FunctionResponseTypes\|TestAccAWSLambdaEventSourceMapping_DynamoDB_basic -timeout 180m === RUN TestAccAWSLambdaEventSourceMapping_Kinesis_basic === PAUSE TestAccAWSLambdaEventSourceMapping_Kinesis_basic === RUN TestAccAWSLambdaEventSourceMapping_DynamoDB_basic === PAUSE TestAccAWSLambdaEventSourceMapping_DynamoDB_basic === RUN TestAccAWSLambdaEventSourceMapping_DynamoDB_FunctionResponseTypes === PAUSE TestAccAWSLambdaEventSourceMapping_DynamoDB_FunctionResponseTypes === CONT TestAccAWSLambdaEventSourceMapping_Kinesis_basic === CONT TestAccAWSLambdaEventSourceMapping_DynamoDB_FunctionResponseTypes === CONT TestAccAWSLambdaEventSourceMapping_DynamoDB_basic --- PASS: TestAccAWSLambdaEventSourceMapping_DynamoDB_FunctionResponseTypes (70.67s) --- PASS: TestAccAWSLambdaEventSourceMapping_DynamoDB_basic (76.45s) --- PASS: TestAccAWSLambdaEventSourceMapping_Kinesis_basic (89.09s) PASS ok github.com/terraform-providers/terraform-provider-aws/aws 93.336s --- .changelog/19425.txt | 4 ++ ...esource_aws_lambda_event_source_mapping.go | 18 ++++++ ...ce_aws_lambda_event_source_mapping_test.go | 63 +++++++++++++++++++ .../lambda_event_source_mapping.html.markdown | 3 +- 4 files changed, 87 insertions(+), 1 deletion(-) diff --git a/.changelog/19425.txt b/.changelog/19425.txt index e78934729e4..0a98108c4de 100644 --- a/.changelog/19425.txt +++ b/.changelog/19425.txt @@ -4,4 +4,8 @@ resource/aws_lambda_event_source_mapping: Add `self_managed_event_source` and `s ```release-notes:enhancement resource/aws_lambda_event_source_mapping: Add `tumbling_window_in_seconds` argument to support AWS Lambda streaming analytics calculations +``` + +```release-notes:enhancement +resource/aws_lambda_event_source_mapping: Add `function_response_types` argument to support AWS Lambda checkpointing ``` \ No newline at end of file diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index d8162e2bf23..764d5f07342 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -128,6 +128,15 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, + "function_response_types": { + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Schema{ + Type: schema.TypeString, + ValidateFunc: validation.StringInSlice(lambda.FunctionResponseType_Values(), false), + }, + }, + "last_modified": { Type: schema.TypeString, Computed: true, @@ -294,6 +303,10 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte target = v } + if v, ok := d.GetOk("function_response_types"); ok && v.(*schema.Set).Len() > 0 { + input.FunctionResponseTypes = expandStringSet(v.(*schema.Set)) + } + if v, ok := d.GetOk("maximum_batching_window_in_seconds"); ok { input.MaximumBatchingWindowInSeconds = aws.Int64(int64(v.(int))) } @@ -407,6 +420,7 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn) d.Set("function_arn", eventSourceMappingConfiguration.FunctionArn) d.Set("function_name", eventSourceMappingConfiguration.FunctionArn) + d.Set("function_response_types", aws.StringValueSlice(eventSourceMappingConfiguration.FunctionResponseTypes)) if eventSourceMappingConfiguration.LastModified != nil { d.Set("last_modified", aws.TimeValue(eventSourceMappingConfiguration.LastModified).Format(time.RFC3339)) } else { @@ -483,6 +497,10 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte input.FunctionName = aws.String(d.Get("function_name").(string)) } + if d.HasChange("function_response_types") { + input.FunctionResponseTypes = expandStringSet(d.Get("function_response_types").(*schema.Set)) + } + if d.HasChange("maximum_batching_window_in_seconds") { input.MaximumBatchingWindowInSeconds = aws.Int64(int64(d.Get("maximum_batching_window_in_seconds").(int))) } diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index e7a0e1fbe92..98815547e04 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -38,6 +38,7 @@ func TestAccAWSLambdaEventSourceMapping_Kinesis_basic(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "function_name", functionResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "function_response_types.#", "0"), testAccCheckResourceAttrRfc3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"), ), @@ -145,6 +146,7 @@ func TestAccAWSLambdaEventSourceMapping_DynamoDB_basic(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, "stream_arn"), resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"), resource.TestCheckResourceAttrPair(resourceName, "function_name", functionResourceName, "arn"), + resource.TestCheckResourceAttr(resourceName, "function_response_types.#", "0"), testAccCheckResourceAttrRfc3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"), ), @@ -165,6 +167,41 @@ func TestAccAWSLambdaEventSourceMapping_DynamoDB_basic(t *testing.T) { }) } +func TestAccAWSLambdaEventSourceMapping_DynamoDB_FunctionResponseTypes(t *testing.T) { + var conf lambda.EventSourceMappingConfiguration + resourceName := "aws_lambda_event_source_mapping.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID), + Providers: testAccProviders, + CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSLambdaEventSourceMappingConfigDynamoDbFunctionResponseTypes(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "function_response_types.#", "1"), + resource.TestCheckTypeSetElemAttr(resourceName, "function_response_types.*", "ReportBatchItemFailures"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccAWSLambdaEventSourceMappingConfigDynamoDbNoFunctionResponseTypes(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "function_response_types.#", "0"), + ), + }, + }, + }) +} + func TestAccAWSLambdaEventSourceMapping_SQS_BatchWindow(t *testing.T) { var conf lambda.EventSourceMappingConfiguration rName := acctest.RandomWithPrefix("tf-acc-test") @@ -1387,3 +1424,29 @@ resource "aws_lambda_event_source_mapping" "test" { } `, batchSize)) } + +func testAccAWSLambdaEventSourceMappingConfigDynamoDbFunctionResponseTypes(rName string) string { + return composeConfig(testAccAWSLambdaEventSourceMappingConfigDynamoDBBase(rName), ` +resource "aws_lambda_event_source_mapping" "test" { + batch_size = 150 + enabled = true + event_source_arn = aws_dynamodb_table.test.stream_arn + function_name = aws_lambda_function.test.function_name + starting_position = "LATEST" + + function_response_types = ["ReportBatchItemFailures"] +} +`) +} + +func testAccAWSLambdaEventSourceMappingConfigDynamoDbNoFunctionResponseTypes(rName string) string { + return composeConfig(testAccAWSLambdaEventSourceMappingConfigDynamoDBBase(rName), ` +resource "aws_lambda_event_source_mapping" "test" { + batch_size = 150 + enabled = true + event_source_arn = aws_dynamodb_table.test.stream_arn + function_name = aws_lambda_function.test.function_name + starting_position = "LATEST" +} +`) +} diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 580a6419943..586ad46fcee 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -94,6 +94,7 @@ resource "aws_lambda_event_source_mapping" "example" { * `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`. * `event_source_arn` - (Optional) The event source ARN - this is required for Kinesis stream, DynamoDB stream, SQS queue or MSK cluster. It is incompatible with a Self Managed Kafka source. * `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events. +* `function_response_types` - (Optional) A list of current response type enums applied to the event source mapping for [AWS Lambda checkpointing](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting). Only available for stream sources (DynamoDB and Kinesis). Valid values: `ReportBatchItemFailures`. * `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function. Only available for stream sources (DynamoDB and Kinesis) and SQS standard queues. * `maximum_record_age_in_seconds`: - (Optional) The maximum age of a record that Lambda sends to a function for processing. Only available for stream sources (DynamoDB and Kinesis). Must be either -1 (forever, and the default value) or between 60 and 604800 (inclusive). * `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000. @@ -103,7 +104,7 @@ resource "aws_lambda_event_source_mapping" "example" { * `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis, DynamoDB or MSK. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType). * `starting_position_timestamp` - (Optional) A timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of the data record which to start reading when using `starting_position` set to `AT_TIMESTAMP`. If a record with this exact timestamp does not exist, the next later record is chosen. If the timestamp is older than the current trim horizon, the oldest available record is chosen. * `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified. -* `tumbling_window_in_seconds` - (Optional) The duration in seconds of a processing window for [streaming analytics](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows). The range is between 1 second up to 900 seconds. Only available for stream sources (DynamoDB and Kinesis). +* `tumbling_window_in_seconds` - (Optional) The duration in seconds of a processing window for [AWS Lambda streaming analytics](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows). The range is between 1 second up to 900 seconds. Only available for stream sources (DynamoDB and Kinesis). ### destination_config Configuration Block From c406bbfde846e51b02cced0158c364e14dd98b4b Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 1 Jun 2021 11:24:37 -0400 Subject: [PATCH 14/18] r/aws_lambda_event_source_mapping: Add `queues` argument. --- .changelog/19425.txt | 4 + ...esource_aws_lambda_event_source_mapping.go | 26 ++- ...ce_aws_lambda_event_source_mapping_test.go | 161 +++++++++++++++++- .../lambda_event_source_mapping.html.markdown | 5 +- 4 files changed, 188 insertions(+), 8 deletions(-) diff --git a/.changelog/19425.txt b/.changelog/19425.txt index 0a98108c4de..270d361e8a9 100644 --- a/.changelog/19425.txt +++ b/.changelog/19425.txt @@ -8,4 +8,8 @@ resource/aws_lambda_event_source_mapping: Add `tumbling_window_in_seconds` argum ```release-notes:enhancement resource/aws_lambda_event_source_mapping: Add `function_response_types` argument to support AWS Lambda checkpointing +``` + +```release-notes:enhancement +resource/aws_lambda_event_source_mapping: Add `queues` argument to support Amazon MQ for Apache ActiveMQ event sources ``` \ No newline at end of file diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 764d5f07342..2c8a89d6ae9 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -58,7 +58,7 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { } switch serviceName { - case "dynamodb", "kinesis", "kafka": + case "dynamodb", "kinesis", "kafka", "mq": return old == "100" case "sqs": return old == "10" @@ -176,6 +176,17 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Computed: true, }, + "queues": { + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Schema{ + Type: schema.TypeString, + ValidateFunc: validation.StringLenBetween(1, 1000), + }, + }, + "self_managed_event_source": { Type: schema.TypeList, Optional: true, @@ -205,7 +216,6 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, ExactlyOneOf: []string{"event_source_arn", "self_managed_event_source"}, - RequiredWith: []string{"source_access_configuration"}, }, "source_access_configuration": { @@ -225,7 +235,6 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { }, }, }, - RequiredWith: []string{"self_managed_event_source"}, }, "starting_position": { @@ -256,7 +265,11 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Type: schema.TypeSet, Optional: true, ForceNew: true, - Elem: &schema.Schema{Type: schema.TypeString}, + MaxItems: 1, + Elem: &schema.Schema{ + Type: schema.TypeString, + ValidateFunc: validation.StringLenBetween(1, 249), + }, }, "tumbling_window_in_seconds": { @@ -323,6 +336,10 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte input.ParallelizationFactor = aws.Int64(int64(v.(int))) } + if v, ok := d.GetOk("queues"); ok && v.(*schema.Set).Len() > 0 { + input.Queues = expandStringSet(v.(*schema.Set)) + } + if v, ok := d.GetOk("self_managed_event_source"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.SelfManagedEventSource = expandLambdaSelfManagedEventSource(v.([]interface{})[0].(map[string]interface{})) @@ -431,6 +448,7 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf d.Set("maximum_record_age_in_seconds", eventSourceMappingConfiguration.MaximumRecordAgeInSeconds) d.Set("maximum_retry_attempts", eventSourceMappingConfiguration.MaximumRetryAttempts) d.Set("parallelization_factor", eventSourceMappingConfiguration.ParallelizationFactor) + d.Set("queues", aws.StringValueSlice(eventSourceMappingConfiguration.Queues)) if eventSourceMappingConfiguration.SelfManagedEventSource != nil { if err := d.Set("self_managed_event_source", []interface{}{flattenLambdaSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)}); err != nil { return fmt.Errorf("error setting self_managed_event_source: %w", err) diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 98815547e04..15d324c5d23 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -698,7 +698,7 @@ func TestAccAWSLambdaEventSourceMapping_MSK(t *testing.T) { resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, - ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "kafka"), //using kafka.EndpointsID will import kafka and make linters sad + ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "kafka"), Providers: testAccProviders, CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, Steps: []resource.TestStep{ @@ -747,7 +747,7 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, - ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "kafka"), //using kafka.EndpointsID will import kafka and make linters sad + ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID), Providers: testAccProviders, CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, Steps: []resource.TestStep{ @@ -777,6 +777,39 @@ func TestAccAWSLambdaEventSourceMapping_SelfManagedKafka(t *testing.T) { }) } +func TestAccAWSLambdaEventSourceMapping_ActiveMQ(t *testing.T) { + var v lambda.EventSourceMappingConfiguration + resourceName := "aws_lambda_event_source_mapping.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "mq", "secretsmanager"), + Providers: testAccProviders, + CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSLambdaEventSourceMappingConfigActiveMQ(rName, "100"), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), + resource.TestCheckResourceAttr(resourceName, "enabled", "true"), + resource.TestCheckResourceAttr(resourceName, "queues.#", "1"), + resource.TestCheckTypeSetElemAttr(resourceName, "queues.*", "test"), + resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "1"), + ), + }, + // batch_size became optional. Ensure that if the user supplies the default + // value, but then moves to not providing the value, that we don't consider this + // a diff. + { + PlanOnly: true, + Config: testAccAWSLambdaEventSourceMappingConfigActiveMQ(rName, "null"), + }, + }, + }) +} + func testAccCheckAWSLambdaEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc { return func(s *terraform.State) error { conn := testAccProvider.Meta().(*AWSClient).lambdaconn @@ -1147,6 +1180,109 @@ resource "aws_lambda_function" "test" { `, rName)) } +func testAccAWSLambdaEventSourceMappingConfigMQBase(rName string) string { + return fmt.Sprintf(` +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = < Date: Tue, 1 Jun 2021 11:35:34 -0400 Subject: [PATCH 15/18] Fix terrafmt errors. --- aws/resource_aws_lambda_event_source_mapping_test.go | 4 ++-- website/docs/r/lambda_event_source_mapping.html.markdown | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 15d324c5d23..72571adfa2d 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -1533,13 +1533,13 @@ resource "aws_lambda_event_source_mapping" "test" { for_each = aws_subnet.test.*.id content { type = "VPC_SUBNET" - uri = "subnet:${source_access_configuration.value}" + uri = "subnet:${source_access_configuration.value}" } } source_access_configuration { type = "VPC_SECURITY_GROUP" - uri = aws_security_group.test.id + uri = aws_security_group.test.id } } `, rName, batchSize, kafkaBootstrapServers)) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 761733ccc38..8c529aed398 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -62,17 +62,17 @@ resource "aws_lambda_event_source_mapping" "example" { source_access_configuration { type = "VPC_SUBNET" - uri = "subnet:subnet-example1" + uri = "subnet:subnet-example1" } source_access_configuration { type = "VPC_SUBNET" - uri = "subnet:subnet-example2" + uri = "subnet:subnet-example2" } source_access_configuration { type = "VPC_SECURITY_GROUP" - uri = "security_group:sg-example" + uri = "security_group:sg-example" } } ``` From 89e58959fea34c61b82677c23bc54c22ab62f0cf Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 1 Jun 2021 11:57:40 -0400 Subject: [PATCH 16/18] r/aws_lambda_event_source_mapping: Correctly handle deletion of 'destination_config'. --- aws/resource_aws_lambda_event_source_mapping.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 2c8a89d6ae9..82bbb8fd0b2 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -637,7 +637,7 @@ func expandLambdaOnFailure(tfMap map[string]interface{}) *lambda.OnFailure { apiObject := &lambda.OnFailure{} - if v, ok := tfMap["destination_arn"].(string); ok && v != "" { + if v, ok := tfMap["destination_arn"].(string); ok { apiObject.Destination = aws.String(v) } From 2e077d90a849688f68bef1a606680df97994b96f Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 1 Jun 2021 11:59:24 -0400 Subject: [PATCH 17/18] Fix awsproviderlint errors. --- aws/resource_aws_lambda_event_source_mapping.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 82bbb8fd0b2..dc96af1f1be 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -180,7 +180,6 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Type: schema.TypeSet, Optional: true, ForceNew: true, - MaxItems: 1, Elem: &schema.Schema{ Type: schema.TypeString, ValidateFunc: validation.StringLenBetween(1, 1000), @@ -265,7 +264,6 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Type: schema.TypeSet, Optional: true, ForceNew: true, - MaxItems: 1, Elem: &schema.Schema{ Type: schema.TypeString, ValidateFunc: validation.StringLenBetween(1, 249), From 7c9c511f722e6182f9727104fca910bb207a0bf6 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 1 Jun 2021 12:48:44 -0400 Subject: [PATCH 18/18] No MQ in GovCloud. --- aws/resource_aws_lambda_event_source_mapping_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 72571adfa2d..f5d7c9fe127 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -697,7 +697,7 @@ func TestAccAWSLambdaEventSourceMapping_MSK(t *testing.T) { rName := acctest.RandomWithPrefix("tf-acc-test") resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { testAccPreCheck(t) }, + PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) }, ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "kafka"), Providers: testAccProviders, CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, @@ -783,7 +783,12 @@ func TestAccAWSLambdaEventSourceMapping_ActiveMQ(t *testing.T) { rName := acctest.RandomWithPrefix("tf-acc-test") resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { testAccPreCheck(t) }, + PreCheck: func() { + testAccPreCheck(t) + testAccPreCheckAWSSecretsManager(t) + testAccPartitionHasServicePreCheck("mq", t) + testAccPreCheckAWSMq(t) + }, ErrorCheck: testAccErrorCheck(t, lambda.EndpointsID, "mq", "secretsmanager"), Providers: testAccProviders, CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy,