Skip to content

Commit

Permalink
Add windowed filename policy (closes #5)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Nov 2, 2018
1 parent b16539c commit 12c3b36
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 16 deletions.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import Keys._
val scioVersion = "0.6.1"
val beamVersion = "2.6.0"
val scalaMacrosVersion = "2.1.1"
val slf4jVersion = "1.7.25"
val scalatestVersion = "3.0.5"
val mockitoVersion = "2.23.0"

lazy val compilerOptions = Seq(
"-target:jvm-1.8",
Expand Down Expand Up @@ -49,7 +52,9 @@ lazy val root: Project = project
"com.spotify" %% "scio-core" % scioVersion,
"com.spotify" %% "scio-test" % scioVersion % Test,
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
"org.slf4j" % "slf4j-simple" % "1.7.25"
"org.slf4j" % "slf4j-simple" % slf4jVersion,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.mockito" % "mockito-core" % mockitoVersion % Test
)
)
.enablePlugins(PackPlugin)
Expand Down
31 changes: 22 additions & 9 deletions src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package com.snowplowanalytics

import com.spotify.scio._
import org.apache.beam.sdk.io.{Compression, FileBasedSink, TextIO}
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.values.PDone
import org.joda.time.Duration
import org.apache.beam.sdk.options.ValueProvider.{NestedValueProvider, StaticValueProvider}
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.transforms.windowing.{FixedWindows, Window}
import org.joda.time.{Duration, Instant}

object CloudStorageLoader {
def main(args: Array[String]): Unit = {
Expand All @@ -23,19 +26,29 @@ object CloudStorageLoader {
val sc = ScioContext(options)

val input = sc.pubsubSubscription[String](options.getInputSubscription).withName("input")
.applyTransform(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration)))
)

val windowed = input
.withFixedWindows(Duration.standardMinutes(options.getWindowDuration)).withName("windowed")

windowed
input
.saveAsCustomOutput("output", TextIO.write()
.withWindowedWrites
.withNumShards(options.getNumShards)
.withSuffix(options.getOutputFilenameSuffix)
.withShardNameTemplate("SSSS-NNNN")
.withWritableByteChannelFactory(
FileBasedSink.CompressionType.fromCanonical(Compression.BZIP2))
.to(options.getOutputPath)
.withTempDirectory(NestedValueProvider.of(
StaticValueProvider.of(options.getOutputDirectory),
new SerializableFunction[String, ResourceId] {
def apply(input: String): ResourceId =
FileBasedSink.convertToFileResourceIfPossible(input)
}
))
.to(WindowedFilenamePolicy(
options.getOutputDirectory,
options.getOutputFilenamePrefix,
options.getShardTemplate,
options.getOutputFilenameSuffix
))
)

sc.close()
Expand Down
21 changes: 15 additions & 6 deletions src/main/scala/com/snowplowanalytics/Options.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,30 @@ package com.snowplowanalytics

import org.apache.beam.sdk.options._
import org.apache.beam.sdk.options.Validation.Required
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider

trait Options extends PipelineOptions with StreamingOptions {
@Description("The Cloud Pub/Sub subscription to read from")
@Default.String("projects/project/subscriptions/subscription")
def getInputSubscription: String
def setInputSubscription(value: String): Unit

@Description("The Cloud Storage path to output files to, ends with the filenames suffix")
@Default.String("gs://tmp/subscription-")
def getOutputPath: String
def setOutputPath(value: String): Unit
@Description("The Cloud Storage directory to output files to, ends with /")
@Default.String("gs://tmp/")
def getOutputDirectory: String
def setOutputDirectory(value: String): Unit

@Description("The Cloud Storage prefix to output files to")
@Default.String("subscription-")
def getOutputFilenamePrefix: String
def setOutputFilenamePrefix(value: String): Unit

@Description("The shard template which will be part of the filennams")
@Default.String("W-P-SSSSS-of-NNNNN")
def getShardTemplate: String
def setShardTemplate(value: String): Unit

@Description("The suffix of the filenames written out")
@Default.String(".bz2")
@Default.String(".txt")
def getOutputFilenameSuffix: String
def setOutputFilenameSuffix(value: String): Unit

Expand Down
58 changes: 58 additions & 0 deletions src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.snowplowanalytics

import org.apache.beam.sdk.io.DefaultFilenamePolicy
import org.apache.beam.sdk.io.FileBasedSink.{FilenamePolicy, OutputFileHints}
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

final case class WindowedFilenamePolicy(
outputDirectory: String,
outputFilenamePrefix: String,
shardTemplate: String,
outputFilenameSuffix: String
) extends FilenamePolicy {
override def windowedFilename(
shardNumber: Int,
numShards: Int,
window: BoundedWindow,
paneInfo: PaneInfo,
outputFileHints: OutputFileHints
): ResourceId = {
val outputFile = resolveWithDateTemplates(outputDirectory, window)
.resolve(outputFilenamePrefix, StandardResolveOptions.RESOLVE_FILE)
val policy = DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(outputFile), shardTemplate, outputFilenameSuffix, windowedWrites = true)
policy.windowedFilename(shardNumber, numShards, window, paneInfo, outputFileHints)
}

override def unwindowedFilename(
shardNumber: Int,
numShards: Int,
outputFileHints: OutputFileHints
): ResourceId = throw new UnsupportedOperationException("This policy only supports windowed files")

private def resolveWithDateTemplates(
outputDirectory: String,
window: BoundedWindow
): ResourceId = {
val outputDir = FileSystems.matchNewResource(outputDirectory, isDirectory = true)

window match {
case w: IntervalWindow =>
val windowEndTime = w.end.toDateTime
val outputPath = dateFormat(windowEndTime)(outputDir.toString)
FileSystems.matchNewResource(outputPath, isDirectory = true)
case _ => outputDir
}
}

private def dateFormat(t: DateTime): String => String =
((s: String) => s.replace("YYYY", DateTimeFormat.forPattern("YYYY").print(t))) andThen
((s: String) => s.replace("MM", DateTimeFormat.forPattern("MM").print(t))) andThen
((s: String) => s.replace("DD", DateTimeFormat.forPattern("DD").print(t)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.snowplowanalytics

import java.io.File
import java.nio.file.Files

import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints
import org.apache.beam.sdk.io.LocalResources
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo}
import org.joda.time.DateTime

import org.mockito.Mockito.when
import org.scalatest.FreeSpec
import org.scalatest.Matchers._
import org.scalatest.mockito.MockitoSugar

class WindowedFilenamePolicySpec extends FreeSpec with MockitoSugar {
object TestOutputFileHints extends OutputFileHints {
override def getMimeType: String = ""
override def getSuggestedFilenameSuffix: String = ""
}

"the WindowedFilenamePolicy" - {
"make a windowedFilename function available" - {
"which produces a windowed filename" in {
val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true)
.resolve("WindowedFilenamePolicy", StandardResolveOptions.RESOLVE_DIRECTORY)
val window = mock[BoundedWindow]
val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0)
val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt")

val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints)

filename.getFilename shouldEqual "out-001-001.txt"
}
"which produces a dynamic filename" in {
val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true)
.resolve("YYYY/MM/dd/HH", StandardResolveOptions.RESOLVE_DIRECTORY)
val paneInfo = PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 0, 0)
val window = mock[IntervalWindow]
val windowBegin = new DateTime(2018, 1, 8, 10, 55, 0).toInstant
val windowEnd = new DateTime(2018, 1, 8, 10, 56, 0).toInstant
when(window.maxTimestamp).thenReturn(windowEnd)
when(window.start).thenReturn(windowBegin)
when(window.end).thenReturn(windowEnd)
val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt")

val filename = policy.windowedFilename(1, 1, window, paneInfo, TestOutputFileHints)

filename.getCurrentDirectory.toString should endWith("2018/01/08/10/")
filename.getFilename shouldEqual "out-001-001.txt"
}
}
"make a unwindowedFilename function available" - {
"which throws an unsupported operation" in {
val outputDirectory = LocalResources.fromFile(tempDir(), isDirectory = true)
.resolve("WindowedFilenamePolicy", StandardResolveOptions.RESOLVE_DIRECTORY)
val policy = WindowedFilenamePolicy(outputDirectory.toString, "out", "-SSS-NNN", ".txt")

an [UnsupportedOperationException] should be thrownBy
policy.unwindowedFilename(1, 1, TestOutputFileHints)
}
}
}

private def tempDir(): File =
Files.createTempDirectory("WindowedFilenamePolicy").toFile
}

0 comments on commit 12c3b36

Please sign in to comment.