From 12c3b3676347e3b5368d5e382bfdcd25c601396b Mon Sep 17 00:00:00 2001 From: Ben Fradet Date: Thu, 18 Oct 2018 14:04:24 +0100 Subject: [PATCH] Add windowed filename policy (closes #5) --- build.sbt | 7 +- .../CloudStorageLoader.scala | 31 ++++++--- .../scala/com/snowplowanalytics/Options.scala | 21 ++++-- .../WindowedFilenamePolicy.scala | 58 ++++++++++++++++ .../WindowedFilenamePolicySpec.scala | 68 +++++++++++++++++++ 5 files changed, 169 insertions(+), 16 deletions(-) create mode 100644 src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala create mode 100644 src/test/scala/com/snowplowanalytics/WindowedFilenamePolicySpec.scala diff --git a/build.sbt b/build.sbt index 41f7c30..2a29608 100644 --- a/build.sbt +++ b/build.sbt @@ -4,6 +4,9 @@ import Keys._ val scioVersion = "0.6.1" val beamVersion = "2.6.0" val scalaMacrosVersion = "2.1.1" +val slf4jVersion = "1.7.25" +val scalatestVersion = "3.0.5" +val mockitoVersion = "2.23.0" lazy val compilerOptions = Seq( "-target:jvm-1.8", @@ -49,7 +52,9 @@ lazy val root: Project = project "com.spotify" %% "scio-core" % scioVersion, "com.spotify" %% "scio-test" % scioVersion % Test, "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion, - "org.slf4j" % "slf4j-simple" % "1.7.25" + "org.slf4j" % "slf4j-simple" % slf4jVersion, + "org.scalatest" %% "scalatest" % scalatestVersion % Test, + "org.mockito" % "mockito-core" % mockitoVersion % Test ) ) .enablePlugins(PackPlugin) diff --git a/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala b/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala index 7df8b1c..48e789a 100644 --- a/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala +++ b/src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala @@ -2,10 +2,13 @@ package com.snowplowanalytics import com.spotify.scio._ import org.apache.beam.sdk.io.{Compression, FileBasedSink, TextIO} +import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO import org.apache.beam.sdk.options.PipelineOptionsFactory -import org.apache.beam.sdk.values.PDone -import org.joda.time.Duration +import org.apache.beam.sdk.options.ValueProvider.{NestedValueProvider, StaticValueProvider} +import org.apache.beam.sdk.transforms.SerializableFunction +import org.apache.beam.sdk.transforms.windowing.{FixedWindows, Window} +import org.joda.time.{Duration, Instant} object CloudStorageLoader { def main(args: Array[String]): Unit = { @@ -23,19 +26,29 @@ object CloudStorageLoader { val sc = ScioContext(options) val input = sc.pubsubSubscription[String](options.getInputSubscription).withName("input") + .applyTransform( + Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration))) + ) - val windowed = input - .withFixedWindows(Duration.standardMinutes(options.getWindowDuration)).withName("windowed") - - windowed + input .saveAsCustomOutput("output", TextIO.write() .withWindowedWrites .withNumShards(options.getNumShards) - .withSuffix(options.getOutputFilenameSuffix) - .withShardNameTemplate("SSSS-NNNN") .withWritableByteChannelFactory( FileBasedSink.CompressionType.fromCanonical(Compression.BZIP2)) - .to(options.getOutputPath) + .withTempDirectory(NestedValueProvider.of( + StaticValueProvider.of(options.getOutputDirectory), + new SerializableFunction[String, ResourceId] { + def apply(input: String): ResourceId = + FileBasedSink.convertToFileResourceIfPossible(input) + } + )) + .to(WindowedFilenamePolicy( + options.getOutputDirectory, + options.getOutputFilenamePrefix, + options.getShardTemplate, + options.getOutputFilenameSuffix + )) ) sc.close() diff --git a/src/main/scala/com/snowplowanalytics/Options.scala b/src/main/scala/com/snowplowanalytics/Options.scala index 0b04004..d72971e 100644 --- a/src/main/scala/com/snowplowanalytics/Options.scala +++ b/src/main/scala/com/snowplowanalytics/Options.scala @@ -2,7 +2,6 @@ package com.snowplowanalytics import org.apache.beam.sdk.options._ import org.apache.beam.sdk.options.Validation.Required -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider trait Options extends PipelineOptions with StreamingOptions { @Description("The Cloud Pub/Sub subscription to read from") @@ -10,13 +9,23 @@ trait Options extends PipelineOptions with StreamingOptions { def getInputSubscription: String def setInputSubscription(value: String): Unit - @Description("The Cloud Storage path to output files to, ends with the filenames suffix") - @Default.String("gs://tmp/subscription-") - def getOutputPath: String - def setOutputPath(value: String): Unit + @Description("The Cloud Storage directory to output files to, ends with /") + @Default.String("gs://tmp/") + def getOutputDirectory: String + def setOutputDirectory(value: String): Unit + + @Description("The Cloud Storage prefix to output files to") + @Default.String("subscription-") + def getOutputFilenamePrefix: String + def setOutputFilenamePrefix(value: String): Unit + + @Description("The shard template which will be part of the filennams") + @Default.String("W-P-SSSSS-of-NNNNN") + def getShardTemplate: String + def setShardTemplate(value: String): Unit @Description("The suffix of the filenames written out") - @Default.String(".bz2") + @Default.String(".txt") def getOutputFilenameSuffix: String def setOutputFilenameSuffix(value: String): Unit diff --git a/src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala b/src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala new file mode 100644 index 0000000..d29983b --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala @@ -0,0 +1,58 @@ +package com.snowplowanalytics + +import org.apache.beam.sdk.io.DefaultFilenamePolicy +import org.apache.beam.sdk.io.FileBasedSink.{FilenamePolicy, OutputFileHints} +import org.apache.beam.sdk.io.FileSystems +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions +import org.apache.beam.sdk.io.fs.ResourceId +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider +import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo} +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat + +final case class WindowedFilenamePolicy( + outputDirectory: String, + outputFilenamePrefix: String, + shardTemplate: String, + outputFilenameSuffix: String +) extends FilenamePolicy { + override def windowedFilename( + shardNumber: Int, + numShards: Int, + window: BoundedWindow, + paneInfo: PaneInfo, + outputFileHints: OutputFileHints + ): ResourceId = { + val outputFile = resolveWithDateTemplates(outputDirectory, window) + .resolve(outputFilenamePrefix, StandardResolveOptions.RESOLVE_FILE) + val policy = DefaultFilenamePolicy.fromStandardParameters( + StaticValueProvider.of(outputFile), shardTemplate, outputFilenameSuffix, windowedWrites = true) + policy.windowedFilename(shardNumber, numShards, window, paneInfo, outputFileHints) + } + + override def unwindowedFilename( + shardNumber: Int, + numShards: Int, + outputFileHints: OutputFileHints + ): ResourceId = throw new UnsupportedOperationException("This policy only supports windowed files") + + private def resolveWithDateTemplates( + outputDirectory: String, + window: BoundedWindow + ): ResourceId = { + val outputDir = FileSystems.matchNewResource(outputDirectory, isDirectory = true) + + window match { + case w: IntervalWindow => + val windowEndTime = w.end.toDateTime + val outputPath = dateFormat(windowEndTime)(outputDir.toString) + FileSystems.matchNewResource(outputPath, isDirectory = true) + case _ => outputDir + } + } + + private def dateFormat(t: DateTime): String => String = + ((s: String) => s.replace("YYYY", DateTimeFormat.forPattern("YYYY").print(t))) andThen + ((s: String) => s.replace("MM", DateTimeFormat.forPattern("MM").print(t))) andThen + ((s: String) => s.replace("DD", DateTimeFormat.forPattern("DD").print(t))) +} diff --git a/src/test/scala/com/snowplowanalytics/WindowedFilenamePolicySpec.scala b/src/test/scala/com/snowplowanalytics/WindowedFilenamePolicySpec.scala new file mode 100644 index 0000000..20589e0 --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/WindowedFilenamePolicySpec.scala @@ -0,0 +1,68 @@ +package com.snowplowanalytics + +import java.io.File +import java.nio.file.Files + +import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints +import org.apache.beam.sdk.io.LocalResources +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions +import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo} +import org.joda.time.DateTime + +import org.mockito.Mockito.when +import org.scalatest.FreeSpec +import org.scalatest.Matchers._ +import org.scalatest.mockito.MockitoSugar + +class WindowedFilenamePolicySpec extends FreeSpec with MockitoSugar { + object TestOutputFileHints extends OutputFileHints { + override def getMimeType: String = "" + override def getSuggestedFilenameSuffix: String = "" + } + + "the WindowedFilenamePolicy" - { + "make a windowedFilename function available" - { + "which produces a windowed filename" in { + val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true) + .resolve("WindowedFilenamePolicy", StandardResolveOptions.RESOLVE_DIRECTORY) + val window = mock[BoundedWindow] + val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) + val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt") + + val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints) + + filename.getFilename shouldEqual "out-001-001.txt" + } + "which produces a dynamic filename" in { + val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true) + .resolve("YYYY/MM/dd/HH", StandardResolveOptions.RESOLVE_DIRECTORY) + val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0) + val window = mock[IntervalWindow] + val windowBegin = new DateTime(2018, 1, 8, 10, 55, 0).toInstant + val windowEnd = new DateTime(2018, 1, 8, 10, 56, 0).toInstant + when(window.maxTimestamp).thenReturn(windowEnd) + when(window.start).thenReturn(windowBegin) + when(window.end).thenReturn(windowEnd) + val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt") + + val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints) + + filename.getCurrentDirectory.toString should endWith("2018/01/08/10/") + filename.getFilename shouldEqual "out-001-001.txt" + } + } + "make a unwindowedFilename function available" - { + "which throws an unsupported operation" in { + val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true) + .resolve("WindowedFilenamePolicy", StandardResolveOptions.RESOLVE_DIRECTORY) + val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt") + + an [UnsupportedOperationException] should be thrownBy + policy.unwindowedFilename(1, 1, TestOutputFileHints) + } + } + } + + private def tempDir(): File = + Files.createTempDirectory("WindowedFilenamePolicy").toFile +}