diff --git a/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala b/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala index 075e88e..61b8f4b 100644 --- a/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala +++ b/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala @@ -5,13 +5,14 @@ import org.apache.beam.sdk.io.{Compression, FileBasedSink, TextIO} import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO import org.apache.beam.sdk.options.PipelineOptionsFactory -import org.apache.beam.sdk.options.ValueProvider.{NestedValueProvider, StaticValueProvider} +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider import org.apache.beam.sdk.transforms.SerializableFunction import org.apache.beam.sdk.transforms.windowing.{FixedWindows, Window} -import org.joda.time.{Duration, Instant} +import org.joda.time.Duration object CloudStorageLoader { def main(args: Array[String]): Unit = { + PipelineOptionsFactory.register(classOf[Options]) val options = PipelineOptionsFactory .fromArgs(args: _*) .withValidation @@ -25,19 +26,14 @@ object CloudStorageLoader { def run(options: Options): Unit = { val sc = ScioContext(options) - val input = sc.pubsubSubscription[String](options.getInputSubscription).withName("input") - .applyTransform( - Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration))) - ) - - input - .saveAsCustomOutput("output", TextIO.write() + val inputIO = PubsubIO.readStrings().fromSubscription(options.getInputSubscription) + val outputIO = TextIO.write() .withWindowedWrites .withNumShards(options.getNumShards) .withWritableByteChannelFactory( FileBasedSink.CompressionType.fromCanonical(getCompression(options.getCompression))) .withTempDirectory(NestedValueProvider.of( - StaticValueProvider.of(options.getOutputDirectory), + options.getOutputDirectory, new SerializableFunction[String, ResourceId] { def apply(input: String): ResourceId = FileBasedSink.convertToFileResourceIfPossible(input) @@ -49,7 +45,14 @@ object CloudStorageLoader { options.getShardTemplate, options.getOutputFilenameSuffix )) - ) + + + sc + .customInput("input", inputIO) + .applyTransform( + Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong))) + ).withName("windowed") + .saveAsCustomOutput("output", outputIO) sc.close() } diff --git a/src/main/scala/com/snowplowanalytics/Options.scala b/src/main/scala/com/snowplowanalytics/Options.scala index 30dd7f8..32b02cd 100644 --- a/src/main/scala/com/snowplowanalytics/Options.scala +++ b/src/main/scala/com/snowplowanalytics/Options.scala @@ -5,29 +5,30 @@ import org.apache.beam.sdk.options.Validation.Required trait Options extends PipelineOptions with StreamingOptions { @Description("The Cloud Pub/Sub subscription to read from") - @Default.String("projects/project/subscriptions/subscription") - def getInputSubscription: String - def setInputSubscription(value: String): Unit + @Required + def getInputSubscription: ValueProvider[String] + def setInputSubscription(value: ValueProvider[String]): Unit @Description("The Cloud Storage directory to output files to, ends with /") - @Default.String("gs://tmp/") - def getOutputDirectory: String - def setOutputDirectory(value: String): Unit + @Required + def getOutputDirectory: ValueProvider[String] + def setOutputDirectory(value: ValueProvider[String]): Unit @Description("The Cloud Storage prefix to output files to") - @Default.String("subscription-") - def getOutputFilenamePrefix: String - def setOutputFilenamePrefix(value: String): Unit + @Default.String("output") + @Required + def getOutputFilenamePrefix: ValueProvider[String] + def setOutputFilenamePrefix(value: ValueProvider[String]): Unit @Description("The shard template which will be part of the filennams") @Default.String("-W-P-SSSSS-of-NNNNN") - def getShardTemplate: String - def setShardTemplate(value: String): Unit + def getShardTemplate: ValueProvider[String] + def setShardTemplate(value: ValueProvider[String]): Unit @Description("The suffix of the filenames written out") @Default.String(".txt") - def getOutputFilenameSuffix: String - def setOutputFilenameSuffix(value: String): Unit + def getOutputFilenameSuffix: ValueProvider[String] + def setOutputFilenameSuffix(value: ValueProvider[String]): Unit @Description("The window duration in minutes, defaults to 5") @Default.Integer(5) diff --git a/src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala b/src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala index 94da2a5..f809b5f 100644 --- a/src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala +++ b/src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala @@ -5,16 +5,17 @@ import org.apache.beam.sdk.io.FileBasedSink.{FilenamePolicy, OutputFileHints} import org.apache.beam.sdk.io.FileSystems import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions import org.apache.beam.sdk.io.fs.ResourceId +import org.apache.beam.sdk.options.ValueProvider import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo} import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat final case class WindowedFilenamePolicy( - outputDirectory: String, - outputFilenamePrefix: String, - shardTemplate: String, - outputFilenameSuffix: String + outputDirectory: ValueProvider[String], + outputFilenamePrefix: ValueProvider[String], + shardTemplate: ValueProvider[String], + outputFilenameSuffix: ValueProvider[String] ) extends FilenamePolicy { override def windowedFilename( shardNumber: Int, @@ -24,9 +25,9 @@ final case class WindowedFilenamePolicy( outputFileHints: OutputFileHints ): ResourceId = { val outputFile = resolveWithDateTemplates(outputDirectory, window) - .resolve(outputFilenamePrefix, StandardResolveOptions.RESOLVE_FILE) + .resolve(outputFilenamePrefix.get, StandardResolveOptions.RESOLVE_FILE) val policy = DefaultFilenamePolicy.fromStandardParameters( - StaticValueProvider.of(outputFile), shardTemplate, outputFilenameSuffix, windowedWrites = true) + StaticValueProvider.of(outputFile), shardTemplate.get, outputFilenameSuffix.get, windowedWrites = true) policy.windowedFilename(shardNumber, numShards, window, paneInfo, outputFileHints) } @@ -37,10 +38,10 @@ final case class WindowedFilenamePolicy( ): ResourceId = throw new UnsupportedOperationException("This policy only supports windowed files") private def resolveWithDateTemplates( - outputDirectory: String, + outputDirectory: ValueProvider[String], window: BoundedWindow ): ResourceId = { - val outputDir = FileSystems.matchNewResource(outputDirectory, isDirectory = true) + val outputDir = FileSystems.matchNewResource(outputDirectory.get, isDirectory = true) window match { case w: IntervalWindow => @@ -57,3 +58,17 @@ final case class WindowedFilenamePolicy( ((s: String) => s.replace("dd", DateTimeFormat.forPattern("dd").print(t))) andThen ((s: String) => s.replace("HH", DateTimeFormat.forPattern("HH").print(t))) } + +object WindowedFilenamePolicy { + def apply( + outputDirectory: String, + outputFilenamePrefix: String, + shardTemplate: String, + outputFilenameSuffix: String + ): WindowedFilenamePolicy = WindowedFilenamePolicy( + StaticValueProvider.of(outputDirectory), + StaticValueProvider.of(outputFilenamePrefix), + StaticValueProvider.of(shardTemplate), + StaticValueProvider.of(outputFilenameSuffix) + ) +} diff --git a/src/test/scala/com/snowplowanalytics/CloudStorageLoaderSpec.scala b/src/test/scala/com/snowplowanalytics/CloudStorageLoaderSpec.scala index 4c50c73..d4e2178 100644 --- a/src/test/scala/com/snowplowanalytics/CloudStorageLoaderSpec.scala +++ b/src/test/scala/com/snowplowanalytics/CloudStorageLoaderSpec.scala @@ -6,9 +6,10 @@ class CloudStorageLoaderSpec extends PipelineSpec { val expected = (1 to 10).map(_.toString) "CloudStorageLoader" should "output a file" in { + val sub = "projects/project/subscriptions/sub" JobTest[CloudStorageLoader.type] - .args("--inputSubscription=in", "--outputDirectory=gs://out-dir/") - .input(PubsubIO("in"), expected) + .args(s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/") + .input(CustomIO[String]("input"), expected) .output(CustomIO[String]("output"))(_ should containInAnyOrder(expected)) .run() }