Skip to content

Commit

Permalink
Add retry policy option for Cloud PubSub (hashicorp#6623)
Browse files Browse the repository at this point in the history
* Add retry_policy option
* Add test case for the new option
mookjp committed Jun 18, 2020
1 parent eecde6e commit 8848ede
Showing 2 changed files with 148 additions and 0 deletions.
103 changes: 103 additions & 0 deletions google/resource_pubsub_subscription.go
Original file line number Diff line number Diff line change
@@ -272,6 +272,31 @@ messages are not expunged from the subscription's backlog, even if
they are acknowledged, until they fall out of the
messageRetentionDuration window.`,
},
"retry_policy": {
Type: schema.TypeList,
Optional: true,
Description: `A policy that specifies how Pub/Sub retries message delivery for this subscription.
If not set, the default retry policy is applied.
This generally implies that messages will be retried as soon as possible for healthy subscribers.
RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"minimum_backoff": {
Type: schema.TypeInt,
Optional: true,
Description: `The minimum delay between consecutive deliveries of a given message.
Value should be between 0 and 600 seconds. Defaults to 10 seconds.`,
},
"maximum_backoff": {
Type: schema.TypeInt,
Required: true,
Description: `The maximum delay between consecutive deliveries of a given message.
Value should be between 0 and 600 seconds. Defaults to 600 seconds.`,
},
},
},
},
"path": {
Type: schema.TypeString,
Computed: true,
@@ -344,6 +369,12 @@ func resourcePubsubSubscriptionCreate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("dead_letter_policy"); ok || !reflect.DeepEqual(v, deadLetterPolicyProp) {
obj["deadLetterPolicy"] = deadLetterPolicyProp
}
retryPolicyProp, err := expandPubsubSubscriptionRetryPolicy(d.Get("retry_policy"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("retry_policy"); !isEmptyValue(reflect.ValueOf(retryPolicyProp)) && (ok || !reflect.DeepEqual(v, retryPolicyProp)) {
obj["retryPolicy"] = retryPolicyProp
}

obj, err = resourcePubsubSubscriptionEncoder(d, meta, obj)
if err != nil {
@@ -475,6 +506,9 @@ func resourcePubsubSubscriptionRead(d *schema.ResourceData, meta interface{}) er
if err := d.Set("dead_letter_policy", flattenPubsubSubscriptionDeadLetterPolicy(res["deadLetterPolicy"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("retry_policy", flattenPubsubSubscriptionRetryPolicy(res["retryPolicy"], d, config)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}

return nil
}
@@ -530,6 +564,12 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("dead_letter_policy"); ok || !reflect.DeepEqual(v, deadLetterPolicyProp) {
obj["deadLetterPolicy"] = deadLetterPolicyProp
}
retryPolicyProp, err := expandPubsubSubscriptionRetryPolicy(d.Get("retry_policy"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("retry_policy"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, retryPolicyProp)) {
obj["retryPolicy"] = retryPolicyProp
}

obj, err = resourcePubsubSubscriptionUpdateEncoder(d, meta, obj)
if err != nil {
@@ -571,6 +611,11 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
if d.HasChange("dead_letter_policy") {
updateMask = append(updateMask, "deadLetterPolicy")
}

if d.HasChange("retry_policy") {
updateMask = append(updateMask, "retryPolicy")
}

// updateMask is a URL parameter but not present in the schema, so replaceVars
// won't set it
url, err = addQueryParams(url, map[string]string{"updateMask": strings.Join(updateMask, ",")})
@@ -772,6 +817,30 @@ func flattenPubsubSubscriptionDeadLetterPolicyMaxDeliveryAttempts(v interface{},
return v // let terraform core handle it otherwise
}

func flattenPubsubSubscriptionRetryPolicy(v interface{}, d *schema.ResourceData, config *Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["minimum_backoff"] =
flattenPubsubSubscriptionRetryPolicyMinimumBackoff(original["minimumBackoff"], d, config)
transformed["maximum_backoff"] =
flattenPubsubSubscriptionRetryPolicyMaximumBackoff(original["maximumBackoff"], d, config)
return []interface{}{transformed}
}

func flattenPubsubSubscriptionRetryPolicyMinimumBackoff(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func flattenPubsubSubscriptionRetryPolicyMaximumBackoff(v interface{}, d *schema.ResourceData, config *Config) interface{} {
return v
}

func expandPubsubSubscriptionName(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return replaceVars(d, config, "projects/{{project}}/subscriptions/{{name}}")
}
@@ -963,6 +1032,40 @@ func expandPubsubSubscriptionDeadLetterPolicyMaxDeliveryAttempts(v interface{},
return v, nil
}

func expandPubsubSubscriptionRetryPolicy(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedMinimumBackoff, err := expandPubsubSubscriptionRetryPolicyMinimumBackoff(original["minimum_backoff"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMinimumBackoff); val.IsValid() && !isEmptyValue(val) {
transformed["minimumBackoff"] = transformedMinimumBackoff
}

transformedMaximumBackoff, err := expandPubsubSubscriptionRetryPolicyMaximumBackoff(original["maximum_backoff"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMaximumBackoff); val.IsValid() && !isEmptyValue(val) {
transformed["maximumBackoff"] = transformedMaximumBackoff
}

return transformed, nil
}

func expandPubsubSubscriptionRetryPolicyMinimumBackoff(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func expandPubsubSubscriptionRetryPolicyMaximumBackoff(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func resourcePubsubSubscriptionEncoder(d *schema.ResourceData, meta interface{}, obj map[string]interface{}) (map[string]interface{}, error) {
delete(obj, "name")
return obj, nil
45 changes: 45 additions & 0 deletions google/resource_pubsub_subscription_test.go
Original file line number Diff line number Diff line change
@@ -118,6 +118,30 @@ func TestAccPubsubSubscription_push(t *testing.T) {
})
}

func TestAccPubsubSubscription_retryPolicy(t *testing.T) {
t.Parallel()

topic := fmt.Sprintf("tf-test-topic-%s", randString(t, 10))
subscription := fmt.Sprintf("tf-test-sub-%s", randString(t, 10))

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscription_retryPolicy(topic, subscription, "bar", 30, 300),
},
{
ResourceName: "google_pubsub_subscription.foo",
ImportStateId: subscription,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

// Context: terraform-providers/terraform-provider-google#4993
// This test makes a call to GET an subscription before it is actually created.
// The PubSub API negative-caches responses so this tests we are
@@ -212,6 +236,27 @@ resource "google_pubsub_subscription" "foo" {
`, saAccount, topicFoo, subscription)
}

func testAccPubsubSubscription_retryPolicy(topic, subscription, label string, min, max int) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {
name = "%s"
}
resource "google_pubsub_subscription" "foo" {
name = "%s"
topic = google_pubsub_topic.foo.id
labels = {
foo = "%s"
}
ack_deadline_seconds = 10
retry_policy {
minimum_backoff = %d
maximum_backoff = %d
}
}
`, topic, subscription, label, min, max)
}

func testAccPubsubSubscription_basic(topic, subscription, label string, deadline int) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {

0 comments on commit 8848ede

Please sign in to comment.