Skip to content

Commit

Permalink
Make use of ValueProvider for the configuration options (closes #7)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Nov 2, 2018
1 parent f355c37 commit 8ff4b40
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 34 deletions.
25 changes: 14 additions & 11 deletions src/main/scala/com/snowplowanalytics/CloudStorageLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import org.apache.beam.sdk.io.{Compression, FileBasedSink, TextIO}
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.ValueProvider.{NestedValueProvider, StaticValueProvider}
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.transforms.windowing.{FixedWindows, Window}
import org.joda.time.{Duration, Instant}
import org.joda.time.Duration

object CloudStorageLoader {
def main(args: Array[String]): Unit = {
PipelineOptionsFactory.register(classOf[Options])
val options = PipelineOptionsFactory
.fromArgs(args: _*)
.withValidation
Expand All @@ -25,19 +26,14 @@ object CloudStorageLoader {
def run(options: Options): Unit = {
val sc = ScioContext(options)

val input = sc.pubsubSubscription[String](options.getInputSubscription).withName("input")
.applyTransform(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration)))
)

input
.saveAsCustomOutput("output", TextIO.write()
val inputIO = PubsubIO.readStrings().fromSubscription(options.getInputSubscription)
val outputIO = TextIO.write()
.withWindowedWrites
.withNumShards(options.getNumShards)
.withWritableByteChannelFactory(
FileBasedSink.CompressionType.fromCanonical(getCompression(options.getCompression)))
.withTempDirectory(NestedValueProvider.of(
StaticValueProvider.of(options.getOutputDirectory),
options.getOutputDirectory,
new SerializableFunction[String, ResourceId] {
def apply(input: String): ResourceId =
FileBasedSink.convertToFileResourceIfPossible(input)
Expand All @@ -49,7 +45,14 @@ object CloudStorageLoader {
options.getShardTemplate,
options.getOutputFilenameSuffix
))
)


sc
.customInput("input", inputIO)
.applyTransform(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong)))
).withName("windowed")
.saveAsCustomOutput("output", outputIO)

sc.close()
}
Expand Down
27 changes: 14 additions & 13 deletions src/main/scala/com/snowplowanalytics/Options.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,30 @@ import org.apache.beam.sdk.options.Validation.Required

trait Options extends PipelineOptions with StreamingOptions {
@Description("The Cloud Pub/Sub subscription to read from")
@Default.String("projects/project/subscriptions/subscription")
def getInputSubscription: String
def setInputSubscription(value: String): Unit
@Required
def getInputSubscription: ValueProvider[String]
def setInputSubscription(value: ValueProvider[String]): Unit

@Description("The Cloud Storage directory to output files to, ends with /")
@Default.String("gs://tmp/")
def getOutputDirectory: String
def setOutputDirectory(value: String): Unit
@Required
def getOutputDirectory: ValueProvider[String]
def setOutputDirectory(value: ValueProvider[String]): Unit

@Description("The Cloud Storage prefix to output files to")
@Default.String("subscription-")
def getOutputFilenamePrefix: String
def setOutputFilenamePrefix(value: String): Unit
@Default.String("output")
@Required
def getOutputFilenamePrefix: ValueProvider[String]
def setOutputFilenamePrefix(value: ValueProvider[String]): Unit

@Description("The shard template which will be part of the filennams")
@Default.String("-W-P-SSSSS-of-NNNNN")
def getShardTemplate: String
def setShardTemplate(value: String): Unit
def getShardTemplate: ValueProvider[String]
def setShardTemplate(value: ValueProvider[String]): Unit

@Description("The suffix of the filenames written out")
@Default.String(".txt")
def getOutputFilenameSuffix: String
def setOutputFilenameSuffix(value: String): Unit
def getOutputFilenameSuffix: ValueProvider[String]
def setOutputFilenameSuffix(value: ValueProvider[String]): Unit

@Description("The window duration in minutes, defaults to 5")
@Default.Integer(5)
Expand Down
31 changes: 23 additions & 8 deletions src/main/scala/com/snowplowanalytics/WindowedFilenamePolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import org.apache.beam.sdk.io.FileBasedSink.{FilenamePolicy, OutputFileHints}
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.options.ValueProvider
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

final case class WindowedFilenamePolicy(
outputDirectory: String,
outputFilenamePrefix: String,
shardTemplate: String,
outputFilenameSuffix: String
outputDirectory: ValueProvider[String],
outputFilenamePrefix: ValueProvider[String],
shardTemplate: ValueProvider[String],
outputFilenameSuffix: ValueProvider[String]
) extends FilenamePolicy {
override def windowedFilename(
shardNumber: Int,
Expand All @@ -24,9 +25,9 @@ final case class WindowedFilenamePolicy(
outputFileHints: OutputFileHints
): ResourceId = {
val outputFile = resolveWithDateTemplates(outputDirectory, window)
.resolve(outputFilenamePrefix, StandardResolveOptions.RESOLVE_FILE)
.resolve(outputFilenamePrefix.get, StandardResolveOptions.RESOLVE_FILE)
val policy = DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(outputFile), shardTemplate, outputFilenameSuffix, windowedWrites = true)
StaticValueProvider.of(outputFile), shardTemplate.get, outputFilenameSuffix.get, windowedWrites = true)
policy.windowedFilename(shardNumber, numShards, window, paneInfo, outputFileHints)
}

Expand All @@ -37,10 +38,10 @@ final case class WindowedFilenamePolicy(
): ResourceId = throw new UnsupportedOperationException("This policy only supports windowed files")

private def resolveWithDateTemplates(
outputDirectory: String,
outputDirectory: ValueProvider[String],
window: BoundedWindow
): ResourceId = {
val outputDir = FileSystems.matchNewResource(outputDirectory, isDirectory = true)
val outputDir = FileSystems.matchNewResource(outputDirectory.get, isDirectory = true)

window match {
case w: IntervalWindow =>
Expand All @@ -57,3 +58,17 @@ final case class WindowedFilenamePolicy(
((s: String) => s.replace("dd", DateTimeFormat.forPattern("dd").print(t))) andThen
((s: String) => s.replace("HH", DateTimeFormat.forPattern("HH").print(t)))
}

object WindowedFilenamePolicy {
def apply(
outputDirectory: String,
outputFilenamePrefix: String,
shardTemplate: String,
outputFilenameSuffix: String
): WindowedFilenamePolicy = WindowedFilenamePolicy(
StaticValueProvider.of(outputDirectory),
StaticValueProvider.of(outputFilenamePrefix),
StaticValueProvider.of(shardTemplate),
StaticValueProvider.of(outputFilenameSuffix)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ class CloudStorageLoaderSpec extends PipelineSpec {
val expected = (1 to 10).map(_.toString)

"CloudStorageLoader" should "output a file" in {
val sub = "projects/project/subscriptions/sub"
JobTest[CloudStorageLoader.type]
.args("--inputSubscription=in", "--outputDirectory=gs://out-dir/")
.input(PubsubIO("in"), expected)
.args(s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/")
.input(CustomIO[String]("input"), expected)
.output(CustomIO[String]("output"))(_ should containInAnyOrder(expected))
.run()
}
Expand Down

0 comments on commit 8ff4b40

Please sign in to comment.