Skip to content

Commit

Permalink
Add support for setting Pub/Sub Cloud Storage subscription max_messag…
Browse files Browse the repository at this point in the history
…es and use_topic_schema (GoogleCloudPlatform#11583)
  • Loading branch information
lahuang4 authored Sep 2, 2024
1 parent e169437 commit 692bae2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 20 deletions.
8 changes: 8 additions & 0 deletions mmv1/products/pubsub/Subscription.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ properties:
description: |
The maximum bytes that can be written to a Cloud Storage file before a new file is created. Min 1 KB, max 10 GiB.
The maxBytes limit may be exceeded in cases where messages are larger than the limit.
- !ruby/object:Api::Type::Integer
name: 'maxMessages'
description: |
The maximum messages that can be written to a Cloud Storage file before a new file is created. Min 1000 messages.
- !ruby/object:Api::Type::Enum
name: 'state'
description: |
Expand All @@ -230,6 +234,10 @@ properties:
name: 'writeMetadata'
description: |
When true, write the subscription name, messageId, publishTime, attributes, and orderingKey as additional fields in the output.
- !ruby/object:Api::Type::Boolean
name: 'useTopicSchema'
description: |
When true, the output Cloud Storage file will be serialized using the topic schema, if it exists.
- !ruby/object:Api::Type::String
name: 'serviceAccountEmail'
description: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ resource "google_pubsub_subscription" "<%= ctx[:primary_resource_id] %>" {

max_bytes = 1000
max_duration = "300s"
max_messages = 1000
}
depends_on = [
google_storage_bucket.<%= ctx[:primary_resource_id] %>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ resource "google_pubsub_subscription" "<%= ctx[:primary_resource_id] %>" {

max_bytes = 1000
max_duration = "300s"
max_messages = 1000

avro_config {
write_metadata = true
use_topic_schema = true
}
}
depends_on = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestAccPubsubSubscriptionBigQuery_serviceAccount(t *testing.T) {
})
}

func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
func TestAccPubsubSubscriptionCloudStorage_updateText(t *testing.T) {
t.Parallel()

bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10))
Expand All @@ -263,7 +263,7 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", ""),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -272,7 +272,41 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
ImportStateId: subscriptionShort,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccPubsubSubscriptionCloudStorage_updateAvro(t *testing.T) {
t.Parallel()

bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10))
topic := fmt.Sprintf("tf-test-topic-%s", acctest.RandString(t, 10))
subscriptionShort := fmt.Sprintf("tf-test-sub-%s", acctest.RandString(t, 10))

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "avro"),
},
{
ResourceName: "google_pubsub_subscription.foo",
ImportStateId: subscriptionShort,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "avro"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -297,7 +331,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa"),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -306,7 +340,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand All @@ -315,7 +349,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
ImportStateVerify: true,
},
{
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa2"),
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa2", "avro"),
},
{
ResourceName: "google_pubsub_subscription.foo",
Expand Down Expand Up @@ -597,10 +631,10 @@ resource "google_pubsub_subscription" "foo" {
}

func testAccPubsubSubscriptionBigQuery_basic(dataset, table, topic, subscription string, useTableSchema bool, serviceAccountId string) string {
serivceAccountEmailField := ""
serivceAccountResource := ""
serviceAccountEmailField := ""
serviceAccountResource := ""
if serviceAccountId != "" {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_service_account" "bq_write_service_account" {
account_id = "%s"
display_name = "BQ Write Service Account"
Expand All @@ -617,9 +651,9 @@ resource "google_project_iam_member" "editor" {
role = "roles/bigquery.dataEditor"
member = "serviceAccount:${google_service_account.bq_write_service_account.email}"
}`, serviceAccountId)
serivceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email"
serviceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email"
} else {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_project_iam_member" "viewer" {
project = data.google_project.project.project_id
role = "roles/bigquery.metadataViewer"
Expand Down Expand Up @@ -679,10 +713,10 @@ resource "google_pubsub_subscription" "foo" {
google_project_iam_member.editor
]
}
`, serivceAccountResource, dataset, table, topic, subscription, useTableSchema, serivceAccountEmailField)
`, serviceAccountResource, dataset, table, topic, subscription, useTableSchema, serviceAccountEmailField)
}

func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, serviceAccountId string) string {
func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, maxMessages int, serviceAccountId, outputFormat string) string {
filenamePrefixString := ""
if filenamePrefix != "" {
filenamePrefixString = fmt.Sprintf(`filename_prefix = "%s"`, filenamePrefix)
Expand All @@ -703,11 +737,15 @@ func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, fi
if maxDuration != "" {
maxDurationString = fmt.Sprintf(`max_duration = "%s"`, maxDuration)
}
maxMessagesString := ""
if maxMessages != 0 {
maxMessagesString = fmt.Sprintf(`max_messages = %d`, maxMessages)
}

serivceAccountEmailField := ""
serivceAccountResource := ""
serviceAccountEmailField := ""
serviceAccountResource := ""
if serviceAccountId != "" {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_service_account" "storage_write_service_account" {
account_id = "%s"
display_name = "Write Service Account"
Expand All @@ -724,14 +762,23 @@ resource "google_project_iam_member" "editor" {
role = "roles/bigquery.dataEditor"
member = "serviceAccount:${google_service_account.storage_write_service_account.email}"
}`, serviceAccountId)
serivceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email"
serviceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email"
} else {
serivceAccountResource = fmt.Sprintf(`
serviceAccountResource = fmt.Sprintf(`
resource "google_storage_bucket_iam_member" "admin" {
bucket = google_storage_bucket.test.name
role = "roles/storage.admin"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}`)
}
outputFormatString := ""
if outputFormat == "avro" {
outputFormatString = `
avro_config {
write_metadata = true
use_topic_schema = true
}
`
}
return fmt.Sprintf(`
data "google_project" "project" { }
Expand All @@ -758,15 +805,17 @@ resource "google_pubsub_subscription" "foo" {
%s
%s
%s
%s
%s
%s
%s
}
depends_on = [
google_storage_bucket.test,
google_storage_bucket_iam_member.admin,
]
}
`, bucket, serivceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, serivceAccountEmailField)
`, bucket, serviceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, maxMessagesString, serviceAccountEmailField, outputFormatString)
}

func testAccPubsubSubscription_topicOnly(topic string) string {
Expand Down

0 comments on commit 692bae2

Please sign in to comment.