Skip to content

Commit

Permalink
Add type partition for bad row partition persistence (closes #16)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldemirenes authored and Piotr Limanowski committed Dec 11, 2019
1 parent 488c77e commit ff70ba2
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 70 deletions.
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ val scalaMacrosVersion = "2.1.1"
val slf4jVersion = "1.7.29"
val scalatestVersion = "3.0.8"
val mockitoVersion = "2.28.2"
val circe = "0.11.1"
val igluCore = "0.5.0"

lazy val compilerOptions = Seq(
"-target:jvm-1.8",
Expand Down Expand Up @@ -58,6 +60,9 @@ lazy val root: Project = project
"com.spotify" %% "scio-core" % scioVersion,
"com.spotify" %% "scio-test" % scioVersion % Test,
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
"io.circe" %% "circe-core" % circe,
"io.circe" %% "circe-parser" % circe,
"com.snowplowanalytics" %% "iglu-core-circe" % igluCore,
"org.slf4j" % "slf4j-simple" % slf4jVersion,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.mockito" % "mockito-core" % mockitoVersion % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,28 @@
*/
package com.snowplowanalytics.storage.googlecloudstorage.loader

import io.circe.parser.parse

import com.spotify.scio._
import org.apache.beam.sdk.io.{Compression, FileBasedSink, TextIO}

import org.joda.time.Duration

import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.io.FileIO.Write.FileNaming
import org.apache.beam.sdk.io.{Compression, FileBasedSink, FileIO, TextIO}
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.{ValueProvider, PipelineOptionsFactory}
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.transforms.windowing.{FixedWindows, Window}
import org.joda.time.Duration

import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.core.circe.implicits._

/** Dataflow job outputting the content from a Pubsub subscription to a Cloud Storage bucket. */
object CloudStorageLoader {

def main(args: Array[String]): Unit = {
PipelineOptionsFactory.register(classOf[Options])
val options = PipelineOptionsFactory
Expand All @@ -41,37 +51,87 @@ object CloudStorageLoader {
def run(options: Options): Unit = {
val sc = ScioContext(options)

val outputDirectory = options.getOutputDirectory
val outputFileNamePrefix = options.getOutputFilenamePrefix
val shardTemplate = options.getShardTemplate
val outputFilenameSuffix = options.getOutputFilenameSuffix
val dateFormat = options.getDateFormat
val partitionErrorDirectory = options.getPartitionErrorDirectory()

val inputIO = PubsubIO.readStrings().fromSubscription(options.getInputSubscription)
val outputIO = TextIO.write()

val input = sc
.customInput("input", inputIO)
.applyTransform(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong)))
).withName("windowed")

if (options.getPartitionBySchema()) {
// Partition output according to row type
val outputDynamic = FileIO.writeDynamic[String, String]()
.by(getRowType(_, partitionErrorDirectory).getName)
.via(TextIO.sink())
.to(outputDirectory.get())
.withNumShards(options.getNumShards)
.withCompression(getCompression(options.getCompression))
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(new SerializableFunction[String, FileNaming] {
// Create FileNaming for partition of window which
// partitioned according to row type
override def apply(rowType: String): FileNaming =
WindowedFilenamePolicy(
None,
outputFileNamePrefix,
shardTemplate,
outputFilenameSuffix,
dateFormat,
Some(rowType)
)
})
input.saveAsCustomOutput("output", outputDynamic)
} else {
// Output to same directory without partitioning
// according to row type
val outputIO = TextIO.write()
.withWindowedWrites
.withNumShards(options.getNumShards)
.withWritableByteChannelFactory(
FileBasedSink.CompressionType.fromCanonical(getCompression(options.getCompression)))
.withTempDirectory(NestedValueProvider.of(
options.getOutputDirectory,
outputDirectory,
new SerializableFunction[String, ResourceId] {
def apply(input: String): ResourceId =
FileBasedSink.convertToFileResourceIfPossible(input)
}
))
.to(WindowedFilenamePolicy(
options.getOutputDirectory,
options.getOutputFilenamePrefix,
options.getShardTemplate,
options.getOutputFilenameSuffix
Some(outputDirectory),
outputFileNamePrefix,
shardTemplate,
outputFilenameSuffix,
dateFormat,
None
))


sc
.customInput("input", inputIO)
.applyTransform(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong)))
).withName("windowed")
.saveAsCustomOutput("output", outputIO)
input.saveAsCustomOutput("output", outputIO)
}

sc.close()
}

/**
* Find type of the row according to its schema key
* @param row string to find the type of it
* @return row type of given string
*/
private[loader] def getRowType(row: String, partitionErrorDir: ValueProvider[String]): RowType =
parse(row) match {
case Left(_) => RowType.PartitionError(partitionErrorDir.get)
case Right(json) => SchemaKey.extract(json).fold(
_ => RowType.PartitionError(partitionErrorDir.get),
k => RowType.SelfDescribing(k)
)
}

/**
* Tries to parse a string as a [[Compression]], falls back to uncompressed.
* @param compression string to parse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ trait Options extends PipelineOptions with StreamingOptions {
def getOutputFilenamePrefix: ValueProvider[String]
def setOutputFilenamePrefix(value: ValueProvider[String]): Unit

@Description("Date format")
@Default.String("YYYY/MM/dd/HH")
def getDateFormat: ValueProvider[String]
def setDateFormat(value: ValueProvider[String]): Unit

@Description("The shard template which will be part of the filennams")
@Default.String("-W-P-SSSSS-of-NNNNN")
def getShardTemplate: ValueProvider[String]
Expand All @@ -59,4 +64,14 @@ trait Options extends PipelineOptions with StreamingOptions {
@Default.Integer(0)
def getNumShards: Int
def setNumShards(value: Int): Unit

@Description("Partition output according to schema")
@Default.Boolean(false)
def getPartitionBySchema(): Boolean
def setPartitionBySchema(value: Boolean): Unit

@Description("The directory for rows which gives error during type partition")
@Default.String("")
def getPartitionErrorDirectory(): ValueProvider[String]
def setPartitionErrorDirectory(value: ValueProvider[String]): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.snowplowanalytics.storage.googlecloudstorage.loader

import com.snowplowanalytics.iglu.core.SchemaKey

/** Type of row which determined according to schema of self describing data */
sealed trait RowType extends Product with Serializable {
def getName(): String
}

object RowType {

/** Represents cases where row type could not be determined
* since either row is not valid json or it is not self
* describing json
*/
case class PartitionError(errorDir: String) extends RowType {
override def getName(): String = errorDir
}

/** Represents cases where type of row can be determined successfully
* e.g. does have proper schema key
*/
case class SelfDescribing(schemaKey: SchemaKey) extends RowType {
override def getName(): String = s"${schemaKey.vendor}.${schemaKey.name}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,38 @@
*/
package com.snowplowanalytics.storage.googlecloudstorage.loader

import org.apache.beam.sdk.io.DefaultFilenamePolicy
import org.apache.beam.sdk.io.FileBasedSink.{FilenamePolicy, OutputFileHints}
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
import org.apache.beam.sdk.io.{DefaultFilenamePolicy, FileSystems, Compression}
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints
import org.apache.beam.sdk.io.FileIO.Write.FileNaming
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.options.ValueProvider
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, IntervalWindow, PaneInfo}
import org.apache.beam.sdk.options.ValueProvider
import org.apache.beam.sdk.transforms.windowing.{IntervalWindow, PaneInfo, BoundedWindow}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

/**
* Case class providing a policy on how the output files will be named in the output bucket.
* It supports only windowed files.
* @param outputDirectory Cloud Storage directory to output to, must end with a /
* @param outputDirectory Cloud Storage directory to output to
* @param outputFilenamePrefix the prefix with which the filenames will be prepended
* @param shardTemplate the template controlling how shard numbers will be incorporated into
* filenames
* @param outputFilenameSuffix the suffix with which the filenames will be appended
* @param dateTemplate the template controlling how date directory will be constructed
* @param rowType row type to create directory which rows with same type to put under
*/
final case class WindowedFilenamePolicy(
outputDirectory: ValueProvider[String],
outputDirectory: Option[ValueProvider[String]],
outputFilenamePrefix: ValueProvider[String],
shardTemplate: ValueProvider[String],
outputFilenameSuffix: ValueProvider[String]
) extends FilenamePolicy {
outputFilenameSuffix: ValueProvider[String],
dateTemplate: ValueProvider[String],
rowType: Option[String]
) extends FilenamePolicy with FileNaming {

/** Generates a filename from window information, fill possible date templates. */
override def windowedFilename(
shardNumber: Int,
Expand All @@ -48,8 +54,13 @@ final case class WindowedFilenamePolicy(
paneInfo: PaneInfo,
outputFileHints: OutputFileHints
): ResourceId = {
val outputFile = resolveWithDateTemplates(outputDirectory, window)
.resolve(outputFilenamePrefix.get, StandardResolveOptions.RESOLVE_FILE)
val outputFile = resolveWithDateTemplates(
outputDirectory.map(_.get),
rowType,
window,
dateTemplate.get
).resolve(outputFilenamePrefix.get, StandardResolveOptions.RESOLVE_FILE)

val policy = DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(outputFile),
shardTemplate.get,
Expand All @@ -67,23 +78,44 @@ final case class WindowedFilenamePolicy(
): ResourceId =
throw new UnsupportedOperationException("This policy only supports windowed files")

override def getFilename(
window: BoundedWindow,
pane: PaneInfo,
numShards: Int,
shardIndex: Int,
compression: Compression
): String =
windowedFilename(shardIndex, numShards, window, pane, new OutputFileHints {
override def getMimeType: String = null
override def getSuggestedFilenameSuffix: String = ""
}).toString.stripPrefix("/") ++ compression.getSuggestedSuffix

/**
* Fill the date templates with actual time information from the window we are in.
* @param outputDirectory directory possibly containing date templates
* @param outputDirectory Cloud Storage directory to output to
* @param rowType row type to create directory which rows with same type to put under
* @param window time window we are currently in
* @param dateTemplate the template controlling how date directory will be constructed
* @return a resource id with the date templated in
*/
private def resolveWithDateTemplates(
outputDirectory: ValueProvider[String],
window: BoundedWindow
outputDirectory: Option[String],
rowType: Option[String],
window: BoundedWindow,
dateTemplate: String
): ResourceId = {
val outputDir = FileSystems.matchNewResource(outputDirectory.get, isDirectory = true)

val outputDir = (outputDirectory, rowType) match {
case (Some(o), Some(t)) => FileSystems.matchNewResource(o, isDirectory = true)
.resolve(t, StandardResolveOptions.RESOLVE_DIRECTORY)
case (Some(o), None) => FileSystems.matchNewResource(o, isDirectory = true)
case (None, Some(t)) => FileSystems.matchNewResource(t, isDirectory = true)
case (None, None) => FileSystems.matchNewResource("", isDirectory = true)
}
window match {
case w: IntervalWindow =>
val windowEndTime = w.end.toDateTime
val outputPath = dateFormat(windowEndTime, outputDir.toString)
FileSystems.matchNewResource(outputPath, isDirectory = true)
val dateStr = dateFormat(windowEndTime, dateTemplate)
outputDir.resolve(dateStr, StandardResolveOptions.RESOLVE_DIRECTORY)
case _ => outputDir
}
}
Expand All @@ -102,17 +134,3 @@ final case class WindowedFilenamePolicy(
}
.fold(identity[String] _)(_ andThen _)(template)
}

object WindowedFilenamePolicy {
def apply(
outputDirectory: String,
outputFilenamePrefix: String,
shardTemplate: String,
outputFilenameSuffix: String
): WindowedFilenamePolicy = WindowedFilenamePolicy(
StaticValueProvider.of(outputDirectory),
StaticValueProvider.of(outputFilenamePrefix),
StaticValueProvider.of(shardTemplate),
StaticValueProvider.of(outputFilenameSuffix)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,50 @@ package com.snowplowanalytics.storage.googlecloudstorage.loader

import com.spotify.scio.io.CustomIO
import com.spotify.scio.testing._
import com.spotify.scio.io.CustomIO
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider

class CloudStorageLoaderSpec extends PipelineSpec {
val expected = (1 to 10).map(_.toString)
val errorDir = "err_dir"

"CloudStorageLoader" should "output a file" in {
val sub = "projects/project/subscriptions/sub"
JobTest[CloudStorageLoader.type]
.args(s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/")
.args(s"--inputSubscription=${sub}", "--outputDirectory=gs://out-dir/", "--partitionBySchema=true")
.input(CustomIO[String]("input"), expected)
.output(CustomIO[String]("output"))(_ should containInAnyOrder(expected))
.run()
}

"getRowType" should "return parsing error when given json is not valid" in {
val invalidJson =
"""
|{
| "key": "value
|}
""".stripMargin
CloudStorageLoader.getRowType(invalidJson, StaticValueProvider.of(errorDir)) should equal(RowType.PartitionError(errorDir))
}

"getRowType" should "return NonSelfDescribing as RowType when given json is not self describing json" in {
val nonSelfDescribingJson =
"""
|{
| "key": "value"
|}
""".stripMargin
CloudStorageLoader.getRowType(nonSelfDescribingJson, StaticValueProvider.of(errorDir)) should equal(RowType.PartitionError(errorDir))
}

"getRowType" should "return SelfDescribing as RowType when given json is self describing json" in {
val json =
"""
|{
| "schema": "iglu:com.acme2/example1/jsonschema/2-0-1",
| "data": "data3"
|}
""".stripMargin
CloudStorageLoader.getRowType(json, StaticValueProvider.of(errorDir)).getName should equal("com.acme2.example1")
}
}
Loading

0 comments on commit ff70ba2

Please sign in to comment.