Skip to content

Commit

Permalink
Add type partition for bad row partition persistence (closes #16)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldemirenes authored and Piotr Limanowski committed Jan 14, 2020
1 parent 488c77e commit 2c4ef8a
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 117 deletions.
53 changes: 6 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,6 @@ Cloud Storage Loader is a [Dataflow][dataflow] job which dumps events from an in

## Building

### Cloud Dataflow template

Cloud Storage Loader is compatible with [Dataflow templates][templates] which gives you
additional flexibility when running your pipeline.

To upload the template to your own bucket, run:

```bash
sbt "runMain com.snowplowanalytics.storage.googlecloudstorage.loader.CloudStorageLoader \
--project=[PROJECT] \
--templateLocation=gs://[BUCKET]/SnowplowGoogleCloudStorageLoaderTemplate \
--stagingLocation=gs://[BUCKET]/staging \
--runner=DataflowRunner \
--tempLocation=gs://[BUCKET]/tmp" \
--numShards=1
```

### Zip archive

To build the zip archive, run:
Expand All @@ -42,32 +25,6 @@ sbt docker:publishLocal

## Running

### Through the template

You can run Dataflow templates using a variety of means:

- Using the GCP console
- Using `gcloud`
- Using the REST API

Refer to [the documentation on executing templates][executing-templates] to know more.

Here, we provide an example using `gcloud`:

```bash
gcloud dataflow jobs run [JOB-NAME] \
--gcs-location gs://sp-hosted-assets/4-storage/snowplow-google-cloud-storage-loader/0.2.0/SnowplowGoogleCloudStorageLoaderTemplate-0.2.0 \
--parameters \
inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION],\
outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/,\ # partitions by date
outputFilenamePrefix=output,\ # optional
shardTemplate=-W-P-SSSSS-of-NNNNN,\ # optional
outputFilenameSuffix=.txt,\ # optional
windowDuration=5,\ # optional, in minutes
compression=none,\ # optional, gzip, bz2 or none
numShards=1 # optional
```

### Through the zip archive

You can find the archive hosted on [our Bintray][bintray].
Expand All @@ -87,7 +44,9 @@ Once unzipped the artifact can be run as follows:
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
--numShards=1 \ # optional
--dateFormat=YYYY/MM/dd/HH/ \ # optional
--partitionErrorDirectory=gs://[BUCKET]/[SUBDIR] # optional
```

To display the help message:
Expand Down Expand Up @@ -125,7 +84,9 @@ docker run \
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
--numShards=1 \ # optional
--dateFormat=YYYY/MM/dd/HH/ \ # optional
--partitionErrorDirectory=gs://[BUCKET]/[SUBDIR] # optional
```

To display the help message:
Expand Down Expand Up @@ -187,8 +148,6 @@ limitations under the License.
[pubsub]: https://cloud.google.com/pubsub/
[storage]: https://cloud.google.com/storage/
[dataflow]: https://cloud.google.com/dataflow/
[templates]: https://cloud.google.com/dataflow/docs/templates/overview
[executing-templates]: https://cloud.google.com/dataflow/docs/templates/executing-templates

[bintray]: https://bintray.com/snowplow/snowplow-generic/snowplow-google-cloud-storage-loader
[bintray-docker]: https://bintray.com/snowplow/registry/snowplow%3Asnowplow-google-cloud-storage-loader
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ val scalaMacrosVersion = "2.1.1"
val slf4jVersion = "1.7.29"
val scalatestVersion = "3.0.8"
val mockitoVersion = "2.28.2"
val circe = "0.11.1"
val igluCore = "0.5.0"

lazy val compilerOptions = Seq(
"-target:jvm-1.8",
Expand Down Expand Up @@ -58,6 +60,9 @@ lazy val root: Project = project
"com.spotify" %% "scio-core" % scioVersion,
"com.spotify" %% "scio-test" % scioVersion % Test,
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
"io.circe" %% "circe-core" % circe,
"io.circe" %% "circe-parser" % circe,
"com.snowplowanalytics" %% "iglu-core-circe" % igluCore,
"org.slf4j" % "slf4j-simple" % slf4jVersion,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.mockito" % "mockito-core" % mockitoVersion % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,28 @@
*/
package com.snowplowanalytics.storage.googlecloudstorage.loader

import io.circe.parser.parse

import com.spotify.scio._
import org.apache.beam.sdk.io.{Compression, FileBasedSink, TextIO}

import org.joda.time.Duration

import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.io.FileIO.Write.FileNaming
import org.apache.beam.sdk.io.{Compression, FileBasedSink, FileIO, TextIO}
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.{ValueProvider, PipelineOptionsFactory}
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.transforms.windowing.{FixedWindows, Window}
import org.joda.time.Duration

import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.core.circe.implicits._

/** Dataflow job outputting the content from a Pubsub subscription to a Cloud Storage bucket. */
object CloudStorageLoader {

def main(args: Array[String]): Unit = {
PipelineOptionsFactory.register(classOf[Options])
val options = PipelineOptionsFactory
Expand All @@ -41,37 +51,87 @@ object CloudStorageLoader {
def run(options: Options): Unit = {
val sc = ScioContext(options)

val outputDirectory = options.getOutputDirectory
val outputFileNamePrefix = options.getOutputFilenamePrefix
val shardTemplate = options.getShardTemplate
val outputFilenameSuffix = options.getOutputFilenameSuffix
val dateFormat = options.getDateFormat
val partitionErrorDirectory = options.getPartitionErrorDirectory()

val inputIO = PubsubIO.readStrings().fromSubscription(options.getInputSubscription)
val outputIO = TextIO.write()

val input = sc
.customInput("input", inputIO)
.applyTransform(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong)))
).withName("windowed")

if (partitionErrorDirectory.isAccessible()) {
// Partition output according to row type
val outputDynamic = FileIO.writeDynamic[String, String]()
.by(getRowType(_, partitionErrorDirectory).name)
.via(TextIO.sink())
.to(outputDirectory.get())
.withNumShards(options.getNumShards)
.withCompression(getCompression(options.getCompression))
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(new SerializableFunction[String, FileNaming] {
// Create FileNaming for partition of window which
// partitioned according to row type
override def apply(rowType: String): FileNaming =
WindowedFilenamePolicy(
None,
outputFileNamePrefix,
shardTemplate,
outputFilenameSuffix,
dateFormat,
Some(rowType)
)
})
input.saveAsCustomOutput("output", outputDynamic)
} else {
// Output to same directory without partitioning
// according to row type
val outputIO = TextIO.write()
.withWindowedWrites
.withNumShards(options.getNumShards)
.withWritableByteChannelFactory(
FileBasedSink.CompressionType.fromCanonical(getCompression(options.getCompression)))
.withTempDirectory(NestedValueProvider.of(
options.getOutputDirectory,
outputDirectory,
new SerializableFunction[String, ResourceId] {
def apply(input: String): ResourceId =
FileBasedSink.convertToFileResourceIfPossible(input)
}
))
.to(WindowedFilenamePolicy(
options.getOutputDirectory,
options.getOutputFilenamePrefix,
options.getShardTemplate,
options.getOutputFilenameSuffix
Some(outputDirectory),
outputFileNamePrefix,
shardTemplate,
outputFilenameSuffix,
dateFormat,
None
))


sc
.customInput("input", inputIO)
.applyTransform(
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration.toLong)))
).withName("windowed")
.saveAsCustomOutput("output", outputIO)
input.saveAsCustomOutput("output", outputIO)
}

sc.close()
}

/**
* Find type of the row according to its schema key
* @param row string to find the type of it
* @return row type of given string
*/
private[loader] def getRowType(row: String, partitionErrorDir: ValueProvider[String]): RowType =
parse(row) match {
case Left(_) => RowType.PartitionError(partitionErrorDir.get)
case Right(json) => SchemaKey.extract(json).fold(
_ => RowType.PartitionError(partitionErrorDir.get),
k => RowType.SelfDescribing(k)
)
}

/**
* Tries to parse a string as a [[Compression]], falls back to uncompressed.
* @param compression string to parse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ trait Options extends PipelineOptions with StreamingOptions {
def getOutputFilenamePrefix: ValueProvider[String]
def setOutputFilenamePrefix(value: ValueProvider[String]): Unit

@Description("Date format")
@Default.String("YYYY/MM/dd/HH")
def getDateFormat: ValueProvider[String]
def setDateFormat(value: ValueProvider[String]): Unit

@Description("The shard template which will be part of the filennams")
@Default.String("-W-P-SSSSS-of-NNNNN")
def getShardTemplate: ValueProvider[String]
Expand All @@ -59,4 +64,9 @@ trait Options extends PipelineOptions with StreamingOptions {
@Default.Integer(0)
def getNumShards: Int
def setNumShards(value: Int): Unit

@Description("The directory for rows which gives error during type partition")
@Default.String("")
def getPartitionErrorDirectory(): ValueProvider[String]
def setPartitionErrorDirectory(value: ValueProvider[String]): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2018-2019 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.storage.googlecloudstorage.loader

import com.snowplowanalytics.iglu.core.SchemaKey

/** Type of row which determined according to schema of self describing data */
sealed trait RowType extends Product with Serializable {
def name: String
}

object RowType {

/** Represents cases where row type could not be determined
* since either row is not valid json or it is not self
* describing json
*/
case class PartitionError(errorDir: String) extends RowType {
override def name: String = errorDir
}

/** Represents cases where type of row can be determined successfully
* e.g. does have proper schema key
*/
case class SelfDescribing(schemaKey: SchemaKey) extends RowType {
override def name: String = s"${schemaKey.vendor}.${schemaKey.name}"
}
}
Loading

0 comments on commit 2c4ef8a

Please sign in to comment.