diff --git a/mmv1/products/pubsub/Subscription.yaml b/mmv1/products/pubsub/Subscription.yaml index ed39701ebc28..e97292922ada 100644 --- a/mmv1/products/pubsub/Subscription.yaml +++ b/mmv1/products/pubsub/Subscription.yaml @@ -212,6 +212,10 @@ properties: description: | The maximum bytes that can be written to a Cloud Storage file before a new file is created. Min 1 KB, max 10 GiB. The maxBytes limit may be exceeded in cases where messages are larger than the limit. + - !ruby/object:Api::Type::Integer + name: 'maxMessages' + description: | + The maximum messages that can be written to a Cloud Storage file before a new file is created. Min 1000 messages. - !ruby/object:Api::Type::Enum name: 'state' description: | @@ -230,6 +234,10 @@ properties: name: 'writeMetadata' description: | When true, write the subscription name, messageId, publishTime, attributes, and orderingKey as additional fields in the output. + - !ruby/object:Api::Type::Boolean + name: 'useTopicSchema' + description: | + When true, the output Cloud Storage file will be serialized using the topic schema, if it exists. - !ruby/object:Api::Type::String name: 'serviceAccountEmail' description: | diff --git a/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage.tf.erb b/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage.tf.erb index e95f12169611..2f000e5adcf7 100644 --- a/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage.tf.erb +++ b/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage.tf.erb @@ -21,6 +21,7 @@ resource "google_pubsub_subscription" "<%= ctx[:primary_resource_id] %>" { max_bytes = 1000 max_duration = "300s" + max_messages = 1000 } depends_on = [ google_storage_bucket.<%= ctx[:primary_resource_id] %>, diff --git a/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage_avro.tf.erb b/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage_avro.tf.erb index b26352b1483e..3a61c489517e 100644 --- a/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage_avro.tf.erb +++ b/mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage_avro.tf.erb @@ -21,9 +21,11 @@ resource "google_pubsub_subscription" "<%= ctx[:primary_resource_id] %>" { max_bytes = 1000 max_duration = "300s" + max_messages = 1000 avro_config { write_metadata = true + use_topic_schema = true } } depends_on = [ diff --git a/mmv1/third_party/terraform/services/pubsub/resource_pubsub_subscription_test.go b/mmv1/third_party/terraform/services/pubsub/resource_pubsub_subscription_test.go index fa98e1d0d51f..af40a3411333 100644 --- a/mmv1/third_party/terraform/services/pubsub/resource_pubsub_subscription_test.go +++ b/mmv1/third_party/terraform/services/pubsub/resource_pubsub_subscription_test.go @@ -250,7 +250,7 @@ func TestAccPubsubSubscriptionBigQuery_serviceAccount(t *testing.T) { }) } -func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) { +func TestAccPubsubSubscriptionCloudStorage_updateText(t *testing.T) { t.Parallel() bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10)) @@ -263,7 +263,7 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) { CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t), Steps: []resource.TestStep{ { - Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", ""), + Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "text"), }, { ResourceName: "google_pubsub_subscription.foo", @@ -272,7 +272,41 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) { ImportStateVerify: true, }, { - Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""), + Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"), + }, + { + ResourceName: "google_pubsub_subscription.foo", + ImportStateId: subscriptionShort, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccPubsubSubscriptionCloudStorage_updateAvro(t *testing.T) { + t.Parallel() + + bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10)) + topic := fmt.Sprintf("tf-test-topic-%s", acctest.RandString(t, 10)) + subscriptionShort := fmt.Sprintf("tf-test-sub-%s", acctest.RandString(t, 10)) + + acctest.VcrTest(t, resource.TestCase{ + PreCheck: func() { acctest.AccTestPreCheck(t) }, + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t), + CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "avro"), + }, + { + ResourceName: "google_pubsub_subscription.foo", + ImportStateId: subscriptionShort, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "avro"), }, { ResourceName: "google_pubsub_subscription.foo", @@ -297,7 +331,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) { CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t), Steps: []resource.TestStep{ { - Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa"), + Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa", "text"), }, { ResourceName: "google_pubsub_subscription.foo", @@ -306,7 +340,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) { ImportStateVerify: true, }, { - Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""), + Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"), }, { ResourceName: "google_pubsub_subscription.foo", @@ -315,7 +349,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) { ImportStateVerify: true, }, { - Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa2"), + Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa2", "avro"), }, { ResourceName: "google_pubsub_subscription.foo", @@ -597,10 +631,10 @@ resource "google_pubsub_subscription" "foo" { } func testAccPubsubSubscriptionBigQuery_basic(dataset, table, topic, subscription string, useTableSchema bool, serviceAccountId string) string { - serivceAccountEmailField := "" - serivceAccountResource := "" + serviceAccountEmailField := "" + serviceAccountResource := "" if serviceAccountId != "" { - serivceAccountResource = fmt.Sprintf(` + serviceAccountResource = fmt.Sprintf(` resource "google_service_account" "bq_write_service_account" { account_id = "%s" display_name = "BQ Write Service Account" @@ -617,9 +651,9 @@ resource "google_project_iam_member" "editor" { role = "roles/bigquery.dataEditor" member = "serviceAccount:${google_service_account.bq_write_service_account.email}" }`, serviceAccountId) - serivceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email" + serviceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email" } else { - serivceAccountResource = fmt.Sprintf(` + serviceAccountResource = fmt.Sprintf(` resource "google_project_iam_member" "viewer" { project = data.google_project.project.project_id role = "roles/bigquery.metadataViewer" @@ -679,10 +713,10 @@ resource "google_pubsub_subscription" "foo" { google_project_iam_member.editor ] } - `, serivceAccountResource, dataset, table, topic, subscription, useTableSchema, serivceAccountEmailField) + `, serviceAccountResource, dataset, table, topic, subscription, useTableSchema, serviceAccountEmailField) } -func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, serviceAccountId string) string { +func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, maxMessages int, serviceAccountId, outputFormat string) string { filenamePrefixString := "" if filenamePrefix != "" { filenamePrefixString = fmt.Sprintf(`filename_prefix = "%s"`, filenamePrefix) @@ -703,11 +737,15 @@ func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, fi if maxDuration != "" { maxDurationString = fmt.Sprintf(`max_duration = "%s"`, maxDuration) } + maxMessagesString := "" + if maxMessages != 0 { + maxMessagesString = fmt.Sprintf(`max_messages = %d`, maxMessages) + } - serivceAccountEmailField := "" - serivceAccountResource := "" + serviceAccountEmailField := "" + serviceAccountResource := "" if serviceAccountId != "" { - serivceAccountResource = fmt.Sprintf(` + serviceAccountResource = fmt.Sprintf(` resource "google_service_account" "storage_write_service_account" { account_id = "%s" display_name = "Write Service Account" @@ -724,14 +762,23 @@ resource "google_project_iam_member" "editor" { role = "roles/bigquery.dataEditor" member = "serviceAccount:${google_service_account.storage_write_service_account.email}" }`, serviceAccountId) - serivceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email" + serviceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email" } else { - serivceAccountResource = fmt.Sprintf(` + serviceAccountResource = fmt.Sprintf(` resource "google_storage_bucket_iam_member" "admin" { bucket = google_storage_bucket.test.name role = "roles/storage.admin" member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com" }`) + } + outputFormatString := "" + if outputFormat == "avro" { + outputFormatString = ` + avro_config { + write_metadata = true + use_topic_schema = true + } +` } return fmt.Sprintf(` data "google_project" "project" { } @@ -758,7 +805,9 @@ resource "google_pubsub_subscription" "foo" { %s %s %s - %s + %s + %s + %s } depends_on = [ @@ -766,7 +815,7 @@ resource "google_pubsub_subscription" "foo" { google_storage_bucket_iam_member.admin, ] } -`, bucket, serivceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, serivceAccountEmailField) +`, bucket, serviceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, maxMessagesString, serviceAccountEmailField, outputFormatString) } func testAccPubsubSubscription_topicOnly(topic string) string {