From ecdf54e639da0853f6f5ac02adf02a7f05b65d5b Mon Sep 17 00:00:00 2001 From: Piotr Limanowski Date: Mon, 2 Mar 2020 17:07:21 +0100 Subject: [PATCH] Add support for GCP labels in dataflow launcher docker image (closes #42) --- README.md | 2 ++ .../storage/googlecloudstorage/loader/Options.scala | 3 ++- .../googlecloudstorage/loader/CloudStorageLoaderSpec.scala | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 24ae914..195cc43 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ Once unzipped the artifact can be run as follows: --compression=none \ # optional, gzip, bz2 or none --numShards=1 \ # optional --dateFormat=YYYY/MM/dd/HH/ \ # optional + --labels={\"label\": \"value\"} \ #OPTIONAL --partitionedOuptutDirectory=gs://[BUCKET]/[SUBDIR] # optional ``` @@ -91,6 +92,7 @@ docker run \ --compression=none \ # optional, gzip, bz2 or none --numShards=1 \ # optional --dateFormat=YYYY/MM/dd/HH/ \ # optional + --labels={\"label\": \"value\"} \ #OPTIONAL --partitionedOuptutDirectory=gs://[BUCKET]/[SUBDIR] # optional ``` diff --git a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala index 38c8517..1b83c2b 100644 --- a/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala +++ b/src/main/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/Options.scala @@ -16,9 +16,10 @@ package com.snowplowanalytics.storage.googlecloudstorage.loader import org.apache.beam.sdk.options._ import org.apache.beam.sdk.options.Validation.Required +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions /** Trait regrouping the loader's configuration. */ -trait Options extends PipelineOptions with StreamingOptions { +trait Options extends PipelineOptions with StreamingOptions with DataflowPipelineOptions { @Description("The Cloud Pub/Sub subscription to read from, formatted as projects/[PROJECT]/subscriptions/[SUB]") @Required def getInputSubscription: ValueProvider[String] diff --git a/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala b/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala index 3431bfe..ac32e40 100644 --- a/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala +++ b/src/test/scala/com/snowplowanalytics/storage/googlecloudstorage/loader/CloudStorageLoaderSpec.scala @@ -26,7 +26,7 @@ class CloudStorageLoaderSpec extends PipelineSpec { "CloudStorageLoader" should "output a file" in { val sub = "projects/project/subscriptions/sub" JobTest[CloudStorageLoader.type] - .args(s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/", "--partitionedOutputDirectory=gs://out-dir/partitioned") + .args("--project=test", s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/", "--partitionedOutputDirectory=gs://out-dir/partitioned", """--labels={"env": "dev"}""") .input(CustomIO[String]("input"), expected) .output(CustomIO[String]("output"))(_ should containInAnyOrder(expected)) .run()