Skip to content

Commit

Permalink
provider/aws: Added SQS FIFO queues (#10614)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ninir authored and stack72 committed Dec 12, 2016
1 parent ba4db17 commit 5e1cdcc
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 22 deletions.
30 changes: 28 additions & 2 deletions import_aws_sqs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,40 @@ func TestAccAWSSQSQueue_importBasic(t *testing.T) {
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSSQSQueueDestroy,
Steps: []resource.TestStep{
resource.TestStep{
{
Config: testAccAWSSQSConfigWithDefaults(queueName),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("aws_sqs_queue.queue", "fifo_queue", "false"),
),
},
},
})
}

func TestAccAWSSQSQueue_importFifo(t *testing.T) {
resourceName := "aws_sqs_queue.queue"
queueName := fmt.Sprintf("sqs-queue-%s", acctest.RandString(5))

resource.TestStep{
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSSQSQueueDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSSQSFifoConfigWithDefaults(queueName),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("aws_sqs_queue.queue", "fifo_queue", "true"),
),
},
},
})
Expand Down
77 changes: 63 additions & 14 deletions resource_aws_sqs_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import (
)

var AttributeMap = map[string]string{
"delay_seconds": "DelaySeconds",
"max_message_size": "MaximumMessageSize",
"message_retention_seconds": "MessageRetentionPeriod",
"receive_wait_time_seconds": "ReceiveMessageWaitTimeSeconds",
"visibility_timeout_seconds": "VisibilityTimeout",
"policy": "Policy",
"redrive_policy": "RedrivePolicy",
"arn": "QueueArn",
"delay_seconds": "DelaySeconds",
"max_message_size": "MaximumMessageSize",
"message_retention_seconds": "MessageRetentionPeriod",
"receive_wait_time_seconds": "ReceiveMessageWaitTimeSeconds",
"visibility_timeout_seconds": "VisibilityTimeout",
"policy": "Policy",
"redrive_policy": "RedrivePolicy",
"arn": "QueueArn",
"fifo_queue": "FifoQueue",
"content_based_deduplication": "ContentBasedDeduplication",
}

// A number of these are marked as computed because if you don't
Expand Down Expand Up @@ -90,6 +92,17 @@ func resourceAwsSqsQueue() *schema.Resource {
Type: schema.TypeString,
Computed: true,
},
"fifo_queue": {
Type: schema.TypeBool,
Default: false,
ForceNew: true,
Optional: true,
},
"content_based_deduplication": {
Type: schema.TypeBool,
Default: false,
Optional: true,
},
},
}
}
Expand All @@ -98,6 +111,22 @@ func resourceAwsSqsQueueCreate(d *schema.ResourceData, meta interface{}) error {
sqsconn := meta.(*AWSClient).sqsconn

name := d.Get("name").(string)
fq := d.Get("fifo_queue").(bool)
cbd := d.Get("content_based_deduplication").(bool)

if fq {
if errors := validateSQSFifoQueueName(name, "name"); len(errors) > 0 {
return fmt.Errorf("Error validating the FIFO queue name: %v", errors)
}
} else {
if errors := validateSQSQueueName(name, "name"); len(errors) > 0 {
return fmt.Errorf("Error validating SQS queue name: %v", errors)
}
}

if !fq && cbd {
return fmt.Errorf("Content based deduplication can only be set with FIFO queues")
}

log.Printf("[DEBUG] SQS queue create: %s", name)

Expand All @@ -112,9 +141,12 @@ func resourceAwsSqsQueueCreate(d *schema.ResourceData, meta interface{}) error {
for k, s := range resource.Schema {
if attrKey, ok := AttributeMap[k]; ok {
if value, ok := d.GetOk(k); ok {
if s.Type == schema.TypeInt {
switch s.Type {
case schema.TypeInt:
attributes[attrKey] = aws.String(strconv.Itoa(value.(int)))
} else {
case schema.TypeBool:
attributes[attrKey] = aws.String(strconv.FormatBool(value.(bool)))
default:
attributes[attrKey] = aws.String(value.(string))
}
}
Expand Down Expand Up @@ -147,9 +179,12 @@ func resourceAwsSqsQueueUpdate(d *schema.ResourceData, meta interface{}) error {
if d.HasChange(k) {
log.Printf("[DEBUG] Updating %s", attrKey)
_, n := d.GetChange(k)
if s.Type == schema.TypeInt {
switch s.Type {
case schema.TypeInt:
attributes[attrKey] = aws.String(strconv.Itoa(n.(int)))
} else {
case schema.TypeBool:
attributes[attrKey] = aws.String(strconv.FormatBool(n.(bool)))
default:
attributes[attrKey] = aws.String(n.(string))
}
}
Expand Down Expand Up @@ -201,21 +236,35 @@ func resourceAwsSqsQueueRead(d *schema.ResourceData, meta interface{}) error {
// iKey = internal struct key, oKey = AWS Attribute Map key
for iKey, oKey := range AttributeMap {
if attrmap[oKey] != nil {
if resource.Schema[iKey].Type == schema.TypeInt {
switch resource.Schema[iKey].Type {
case schema.TypeInt:
value, err := strconv.Atoi(*attrmap[oKey])
if err != nil {
return err
}
d.Set(iKey, value)
log.Printf("[DEBUG] Reading %s => %s -> %d", iKey, oKey, value)
} else {
case schema.TypeBool:
value, err := strconv.ParseBool(*attrmap[oKey])
if err != nil {
return err
}
d.Set(iKey, value)
log.Printf("[DEBUG] Reading %s => %s -> %t", iKey, oKey, value)
default:
log.Printf("[DEBUG] Reading %s => %s -> %s", iKey, oKey, *attrmap[oKey])
d.Set(iKey, *attrmap[oKey])
}
}
}
}

// Since AWS does not send the FifoQueue attribute back when the queue
// is a standard one (even to false), this enforces the queue to be set
// to the correct value.
d.Set("fifo_queue", d.Get("fifo_queue").(bool))
d.Set("content_based_deduplication", d.Get("content_based_deduplication").(bool))

return nil
}

Expand Down
138 changes: 132 additions & 6 deletions resource_aws_sqs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
"github.com/jen20/awspolicyequivalence"
"regexp"
)

func TestAccAWSSQSQueue_basic(t *testing.T) {
queueName := fmt.Sprintf("sqs-queue-%s", acctest.RandString(5))
queueName := fmt.Sprintf("sqs-queue-%s", acctest.RandString(10))
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
Expand Down Expand Up @@ -43,8 +44,8 @@ func TestAccAWSSQSQueue_basic(t *testing.T) {
}

func TestAccAWSSQSQueue_policy(t *testing.T) {
queueName := fmt.Sprintf("sqs-queue-%s", acctest.RandString(5))
topicName := fmt.Sprintf("sns-topic-%s", acctest.RandString(5))
queueName := fmt.Sprintf("sqs-queue-%s", acctest.RandString(10))
topicName := fmt.Sprintf("sns-topic-%s", acctest.RandString(10))

expectedPolicyText := fmt.Sprintf(
`{"Version": "2012-10-17","Id": "sqspolicy","Statement":[{"Sid": "Stmt1451501026839","Effect": "Allow","Principal":"*","Action":"sqs:SendMessage","Resource":"arn:aws:sqs:us-west-2:470663696735:%s","Condition":{"ArnEquals":{"aws:SourceArn":"arn:aws:sns:us-west-2:470663696735:%s"}}}]}`,
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestAccAWSSQSQueue_redrivePolicy(t *testing.T) {
CheckDestroy: testAccCheckAWSSQSQueueDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSSQSConfigWithRedrive(acctest.RandStringFromCharSet(5, acctest.CharSetAlpha)),
Config: testAccAWSSQSConfigWithRedrive(acctest.RandString(10)),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSSQSExistsWithDefaults("aws_sqs_queue.my_dead_letter_queue"),
),
Expand All @@ -83,8 +84,8 @@ func TestAccAWSSQSQueue_redrivePolicy(t *testing.T) {

// Tests formatting and compacting of Policy, Redrive json
func TestAccAWSSQSQueue_Policybasic(t *testing.T) {
queueName := fmt.Sprintf("sqs-queue-%s", acctest.RandString(5))
topicName := fmt.Sprintf("sns-topic-%s", acctest.RandString(5))
queueName := fmt.Sprintf("sqs-queue-%s", acctest.RandString(10))
topicName := fmt.Sprintf("sns-topic-%s", acctest.RandString(10))
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
Expand All @@ -100,6 +101,70 @@ func TestAccAWSSQSQueue_Policybasic(t *testing.T) {
})
}

func TestAccAWSSQSQueue_FIFO(t *testing.T) {

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSSQSQueueDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSSQSConfigWithFIFO(acctest.RandString(10)),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSSQSExists("aws_sqs_queue.queue"),
resource.TestCheckResourceAttr("aws_sqs_queue.queue", "fifo_queue", "true"),
),
},
},
})
}

func TestAccAWSSQSQueue_FIFOExpectNameError(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSSQSQueueDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSSQSConfigWithFIFOExpectError(acctest.RandString(10)),
ExpectError: regexp.MustCompile(`Error validating the FIFO queue name`),
},
},
})
}

func TestAccAWSSQSQueue_FIFOWithContentBasedDeduplication(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSSQSQueueDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSSQSConfigWithFIFOContentBasedDeduplication(acctest.RandString(10)),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSSQSExists("aws_sqs_queue.queue"),
resource.TestCheckResourceAttr("aws_sqs_queue.queue", "fifo_queue", "true"),
resource.TestCheckResourceAttr("aws_sqs_queue.queue", "content_based_deduplication", "true"),
),
},
},
})
}

func TestAccAWSSQSQueue_ExpectContentBasedDeduplicationError(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSSQSQueueDestroy,
Steps: []resource.TestStep{
{
Config: testAccExpectContentBasedDeduplicationError(acctest.RandString(10)),
ExpectError: regexp.MustCompile(`Content based deduplication can only be set with FIFO queues`),
},
},
})
}

func testAccCheckAWSSQSQueueDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).sqsconn

Expand Down Expand Up @@ -169,6 +234,21 @@ func testAccCheckAWSQSHasPolicy(n string, expectedPolicyText string) resource.Te
}
}

func testAccCheckAWSSQSExists(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No Queue URL specified!")
}

return nil
}
}

func testAccCheckAWSSQSExistsWithDefaults(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
Expand Down Expand Up @@ -277,6 +357,15 @@ resource "aws_sqs_queue" "queue" {
`, r)
}

func testAccAWSSQSFifoConfigWithDefaults(r string) string {
return fmt.Sprintf(`
resource "aws_sqs_queue" "queue" {
name = "%s.fifo"
fifo_queue = true
}
`, r)
}

func testAccAWSSQSConfigWithOverrides(r string) string {
return fmt.Sprintf(`
resource "aws_sqs_queue" "queue" {
Expand Down Expand Up @@ -362,3 +451,40 @@ resource "aws_sns_topic_subscription" "test_queue_target" {
}
`, topic, queue)
}

func testAccAWSSQSConfigWithFIFO(queue string) string {
return fmt.Sprintf(`
resource "aws_sqs_queue" "queue" {
name = "%s.fifo"
fifo_queue = true
}
`, queue)
}

func testAccAWSSQSConfigWithFIFOContentBasedDeduplication(queue string) string {
return fmt.Sprintf(`
resource "aws_sqs_queue" "queue" {
name = "%s.fifo"
fifo_queue = true
content_based_deduplication = true
}
`, queue)
}

func testAccAWSSQSConfigWithFIFOExpectError(queue string) string {
return fmt.Sprintf(`
resource "aws_sqs_queue" "queue" {
name = "%s"
fifo_queue = true
}
`, queue)
}

func testAccExpectContentBasedDeduplicationError(queue string) string {
return fmt.Sprintf(`
resource "aws_sqs_queue" "queue" {
name = "%s"
content_based_deduplication = true
}
`, queue)
}
Loading

0 comments on commit 5e1cdcc

Please sign in to comment.