Skip to content

Commit

Permalink
Add expiration policy to pubsub subscription resource
Browse files Browse the repository at this point in the history
Signed-off-by: Modular Magician <[email protected]>
  • Loading branch information
Ty Larrabee authored and modular-magician committed Apr 30, 2019
1 parent 974f30e commit 965c899
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
73 changes: 73 additions & 0 deletions google/resource_pubsub_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ func resourcePubsubSubscription() *schema.Resource {
Computed: true,
Optional: true,
},
"expiration_policy": {
Type: schema.TypeList,
Computed: true,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"ttl": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
"labels": {
Type: schema.TypeMap,
Optional: true,
Expand Down Expand Up @@ -153,6 +167,12 @@ func resourcePubsubSubscriptionCreate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("retain_acked_messages"); !isEmptyValue(reflect.ValueOf(retainAckedMessagesProp)) && (ok || !reflect.DeepEqual(v, retainAckedMessagesProp)) {
obj["retainAckedMessages"] = retainAckedMessagesProp
}
expirationPolicyProp, err := expandPubsubSubscriptionExpirationPolicy(d.Get("expiration_policy"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("expiration_policy"); !isEmptyValue(reflect.ValueOf(expirationPolicyProp)) && (ok || !reflect.DeepEqual(v, expirationPolicyProp)) {
obj["expirationPolicy"] = expirationPolicyProp
}

url, err := replaceVars(d, config, "https://pubsub.googleapis.com/v1/projects/{{project}}/subscriptions/{{name}}")
if err != nil {
Expand Down Expand Up @@ -224,6 +244,9 @@ func resourcePubsubSubscriptionRead(d *schema.ResourceData, meta interface{}) er
if err := d.Set("retain_acked_messages", flattenPubsubSubscriptionRetainAckedMessages(res["retainAckedMessages"], d)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}
if err := d.Set("expiration_policy", flattenPubsubSubscriptionExpirationPolicy(res["expirationPolicy"], d)); err != nil {
return fmt.Errorf("Error reading Subscription: %s", err)
}

return nil
}
Expand Down Expand Up @@ -262,6 +285,12 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
} else if v, ok := d.GetOkExists("retain_acked_messages"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, retainAckedMessagesProp)) {
obj["retainAckedMessages"] = retainAckedMessagesProp
}
expirationPolicyProp, err := expandPubsubSubscriptionExpirationPolicy(d.Get("expiration_policy"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("expiration_policy"); !isEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, expirationPolicyProp)) {
obj["expirationPolicy"] = expirationPolicyProp
}

obj, err = resourcePubsubSubscriptionUpdateEncoder(d, meta, obj)
if err != nil {
Expand Down Expand Up @@ -295,6 +324,10 @@ func resourcePubsubSubscriptionUpdate(d *schema.ResourceData, meta interface{})
if d.HasChange("retain_acked_messages") {
updateMask = append(updateMask, "retainAckedMessages")
}

if d.HasChange("expiration_policy") {
updateMask = append(updateMask, "expirationPolicy")
}
// updateMask is a URL parameter but not present in the schema, so replaceVars
// won't set it
url, err = addQueryParams(url, map[string]string{"updateMask": strings.Join(updateMask, ",")})
Expand Down Expand Up @@ -404,6 +437,23 @@ func flattenPubsubSubscriptionRetainAckedMessages(v interface{}, d *schema.Resou
return v
}

func flattenPubsubSubscriptionExpirationPolicy(v interface{}, d *schema.ResourceData) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["ttl"] =
flattenPubsubSubscriptionExpirationPolicyTtl(original["ttl"], d)
return []interface{}{transformed}
}
func flattenPubsubSubscriptionExpirationPolicyTtl(v interface{}, d *schema.ResourceData) interface{} {
return v
}

func expandPubsubSubscriptionName(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
project, err := getProject(d, config)
if err != nil {
Expand Down Expand Up @@ -511,6 +561,29 @@ func expandPubsubSubscriptionRetainAckedMessages(v interface{}, d TerraformResou
return v, nil
}

func expandPubsubSubscriptionExpirationPolicy(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedTtl, err := expandPubsubSubscriptionExpirationPolicyTtl(original["ttl"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedTtl); val.IsValid() && !isEmptyValue(val) {
transformed["ttl"] = transformedTtl
}

return transformed, nil
}

func expandPubsubSubscriptionExpirationPolicyTtl(v interface{}, d TerraformResourceData, config *Config) (interface{}, error) {
return v, nil
}

func resourcePubsubSubscriptionUpdateEncoder(d *schema.ResourceData, meta interface{}, obj map[string]interface{}) (map[string]interface{}, error) {
newObj := make(map[string]interface{})
newObj["subscription"] = obj
Expand Down
19 changes: 19 additions & 0 deletions website/docs/r/pubsub_subscription.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ The following arguments are supported:
messages are not expunged from the subscription's backlog, even if
they are acknowledged, until they fall out of the
messageRetentionDuration window.

* `expiration_policy` -
(Optional)
A policy that specifies the conditions for this subscription's expiration.
A subscription is considered active as long as any connected subscriber
is successfully consuming messages from the subscription or is issuing
operations on the subscription. If expirationPolicy is not set, a default
policy with ttl of 31 days will be used. The minimum allowed value for
expirationPolicy.ttl is 1 day. Structure is documented below.
* `project` - (Optional) The ID of the project in which the resource belongs.
If it is not provided, the provider project is used.

Expand Down Expand Up @@ -198,6 +207,16 @@ The `push_config` block supports:
- v1beta1: uses the push format defined in the v1beta1 Pub/Sub API.
- v1 or v1beta2: uses the push format defined in the v1 Pub/Sub API.

The `expiration_policy` block supports:

* `ttl` -
(Optional)
Specifies the "time-to-live" duration for an associated resource. The
resource expires if it is not active for a period of ttl. The definition
of "activity" depends on the type of the associated resource. The minimum
and maximum allowed values for ttl depend on the type of the associated
resource, as well. If ttl is not set, the associated resource never expires.

## Attributes Reference

In addition to the arguments listed above, the following computed attributes are exported:
Expand Down

0 comments on commit 965c899

Please sign in to comment.