Skip to content

Commit

Permalink
Split lzo serializers into a separate sbt project (close #261)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed May 5, 2022
1 parent 9d32ebb commit 6bc601b
Show file tree
Hide file tree
Showing 35 changed files with 137 additions and 51 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/lacework.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,10 @@ jobs:
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-s3-loader ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull

- name: Scan snowplow-s3-loader lzo
env:
LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }}
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-s3-loader ${{ steps.ver.outputs.tag }}-lzo --build-id ${{ github.run_id }} --no-pull
25 changes: 12 additions & 13 deletions .github/workflows/test_and_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,21 @@ jobs:
- name: Check formatting
run: sbt scalafmtCheck

- name: Publish to Docker Hub
- name: Login to Docker Hub
if: startsWith(github.ref, 'refs/tags/')
run: |
docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD
project_version=$(sbt version -Dsbt.log.noformat=true | perl -ne 'print "$1\n" if /info.*(\d+\.\d+\.\d+[^\r\n]*)/' | tail -n 1 | tr -d '\n')
if [[ "${{ github.ref }}" = "refs/tags/${project_version}" ]]
then
echo Publishing to Docker Hub
sbt docker:publish
else
echo "${{ github.ref }} does not match project version $project_version => not publishing"
exit 1
fi
run: docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD
env:
DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }}
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}

- name: Publish to Docker Hub
if: startsWith(github.ref, 'refs/tags/')
run: sbt 'project main' docker:publish

- name: Publish to Docker Hub lzo
if: startsWith(github.ref, 'refs/tags/')
run: sbt 'project lzo' docker:publish

- name: Build artifacts
run: |
sbt assembly
Expand All @@ -57,7 +55,8 @@ jobs:
name: Version ${{ steps.ver.outputs.project_version }}
tag_name: ${{ steps.ver.outputs.project_version }}
files: |
target/scala-2.13/snowplow-s3-loader-${{ steps.ver.outputs.project_version }}.jar
modules/main/target/scala-2.13/snowplow-s3-loader-${{ steps.ver.outputs.project_version }}.jar
modules/lzo/target/scala-2.13/snowplow-s3-loader-lzo-${{ steps.ver.outputs.project_version }}.jar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

Expand Down
40 changes: 26 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,25 @@
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/

lazy val root = project.in(file("."))
.aggregate(main, lzo)

lazy val main = project.in(file("modules/main"))
.settings(
name := "snowplow-s3-loader",
description := "Load the contents of a Kinesis stream topic to S3"
name := "snowplow-s3-loader",
)
.settings(BuildSettings.basicSettings)
.settings(BuildSettings.scalifySettings)
.settings(BuildSettings.sbtAssemblySettings)
.settings(BuildSettings.dockerSettings)
.settings(BuildSettings.addExampleConfToTestCp)
.settings(BuildSettings.commonSettings)
.settings(
libraryDependencies ++= Seq(
// Java
Dependencies.Libraries.kinesisClient,
Dependencies.Libraries.kinesisConnector,
Dependencies.Libraries.slf4j,
Dependencies.Libraries.jclOverSlf4j,
Dependencies.Libraries.hadoop,
Dependencies.Libraries.elephantbird,
Dependencies.Libraries.hadoopLZO,
Dependencies.Libraries.apacheCommons,
Dependencies.Libraries.jackson,
Dependencies.Libraries.jacksonCbor,
Dependencies.Libraries.thrift,
Dependencies.Libraries.sentry,
Dependencies.Libraries.collections,
Dependencies.Libraries.jaxbApi,
Dependencies.Libraries.protobuf,
// Scala
Expand All @@ -50,10 +43,29 @@ lazy val root = project.in(file("."))
// Scala (test only)
Dependencies.Libraries.specs2,
// Thrift (test only)
Dependencies.Libraries.collectorPayload
Dependencies.Libraries.collectorPayload,
Dependencies.Libraries.thrift % Test,
),
excludeDependencies += "commons-logging" % "commons-logging"
)
.enablePlugins(JavaAppPackaging, DockerPlugin)

lazy val lzo = project.in(file("modules/lzo"))
.settings(
name := "snowplow-s3-loader-lzo",
)
.settings(BuildSettings.commonSettings)
.settings(BuildSettings.lzoSettings)
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.hadoop,
Dependencies.Libraries.elephantbird,
Dependencies.Libraries.hadoopLZO,
Dependencies.Libraries.thrift,
Dependencies.Libraries.collections,
)
)
.dependsOn(main % "compile->compile; test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin)

shellPrompt := { _ => "s3-loader> " }
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.s3.loader
package serializers
package com.snowplowanalytics.s3.loader.lzo

// Java libs
import java.io.{ByteArrayOutputStream, DataOutputStream}
Expand All @@ -23,6 +22,9 @@ import com.hadoop.compression.lzo.LzopCodec
// Elephant bird
import com.twitter.elephantbird.mapreduce.io.RawBlockWriter

import com.snowplowanalytics.s3.loader.Result
import com.snowplowanalytics.s3.loader.serializers.ISerializer

/**
* Object to handle LZO compression of raw events
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2014-2022 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.s3.loader.lzo

import com.snowplowanalytics.s3.loader.MainPlatform

object Main extends MainPlatform {
def main(args: Array[String]): Unit =
withConfig(args)(S3LoaderWithLzo.run)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2014-2022 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.s3.loader.lzo

import com.snowplowanalytics.s3.loader.{Config, S3Loader}
import com.snowplowanalytics.s3.loader.serializers.{GZipSerializer, ISerializer}

object S3LoaderWithLzo extends S3Loader {

override def serializer(config: Config): ISerializer =
config.output.s3.compression match {
case Config.Compression.Lzo => LzoSerializer
case Config.Compression.Gzip => GZipSerializer
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.s3.loader.serializers
package com.snowplowanalytics.s3.loader.lzo

// Java
import java.io.{BufferedInputStream, File, FileInputStream, FileOutputStream}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ import cats.syntax.show._
/**
* The entrypoint class for the Kinesis-S3 Sink application.
*/
object Main {
trait MainPlatform {

val config = Opts
.option[Path]("config", "Path to configuration HOCON file", "c", "filename")
val parser =
Command(s"${generated.Settings.name}-${generated.Settings.version}", "Streaming sink app for S3")(config)

def main(args: Array[String]): Unit =
def withConfig(args: Array[String])(f: Config => Unit): Unit =
parser.parse(args.toList) match {
case Right(c) =>
Config.load(c) match {
case Right(config) =>
S3Loader.run(config)
f(config)
case Left(e) =>
System.err.println(s"Configuration error: $e")
System.exit(1)
Expand All @@ -45,3 +45,8 @@ object Main {
System.exit(1)
}
}

object Main extends MainPlatform {
def main(args: Array[String]): Unit =
withConfig(args)(S3Loader.run)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,26 @@ import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.s3.loader.Config.Compression
import com.snowplowanalytics.s3.loader.connector.KinesisSourceExecutor
import com.snowplowanalytics.s3.loader.monitoring.Monitoring
import com.snowplowanalytics.s3.loader.serializers.{GZipSerializer, LzoSerializer}
import com.snowplowanalytics.s3.loader.serializers.{GZipSerializer, ISerializer}

object S3Loader {
trait S3Loader {

val logger = LoggerFactory.getLogger(getClass)

val processor = Processor(generated.Settings.name, generated.Settings.version)

def serializer(config: Config): ISerializer =
config.output.s3.compression match {
case Compression.Gzip => GZipSerializer
case Compression.Lzo => throw new IllegalArgumentException("This build of S3 loader does not support LZO compression")
}

def run(config: Config): Unit = {
val monitoring = Monitoring.build(config.monitoring)

// A sink for records that could not be emitted to S3
val badSink = KinesisSink.build(config, monitoring)

val serializer = config.output.s3.compression match {
case Compression.Lzo => LzoSerializer
case Compression.Gzip => GZipSerializer
}

val executor =
new KinesisSourceExecutor(
config.region,
Expand All @@ -51,7 +52,7 @@ object S3Loader {
config.purpose,
config.output,
badSink,
serializer,
serializer(config),
monitoring,
config.monitoring
.flatMap(_.metrics.flatMap(_.cloudWatch))
Expand Down Expand Up @@ -122,3 +123,5 @@ object S3Loader {
new KinesisConnectorConfiguration(props, credentialsProvider)
}
}

object S3Loader extends S3Loader
File renamed without changes.
24 changes: 19 additions & 5 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ object BuildSettings {

// Basic settings for our app
lazy val basicSettings = Seq(
organization := "com.snowplowanalytics",
scalaVersion := "2.13.6",
organization := "com.snowplowanalytics",
scalaVersion := "2.13.6",
description := "Load the contents of a Kinesis stream topic to S3",
resolvers ++= Dependencies.resolvers,
ThisBuild / dynverVTagPrefix := false,
ThisBuild / dynverSeparator := "-"
Expand All @@ -42,7 +43,7 @@ object BuildSettings {
/** Add example config for integration tests */
lazy val addExampleConfToTestCp = Seq(
Test / unmanagedClasspath += {
baseDirectory.value / "config"
baseDirectory.value / "../../config"
}
)

Expand All @@ -52,11 +53,15 @@ object BuildSettings {
Docker / packageName := "snowplow/snowplow-s3-loader",
dockerBaseImage := "eclipse-temurin:11-jre-focal",
dockerUpdateLatest := true,
)

lazy val lzoDockerSettings = Seq(
dockerCommands := {
val installLzo = Seq(Cmd("RUN", "mkdir -p /var/lib/apt/lists/partial && apt-get update && apt-get install -y lzop && apt-get purge -y"))
val (h, t) = dockerCommands.value.splitAt(dockerCommands.value.size-4)
h ++ installLzo ++ t
}
},
dockerAlias := dockerAlias.value.withTag(Some(version.value + "-lzo"))
)

// Makes our SBT app settings available from within the app
Expand All @@ -82,10 +87,12 @@ object BuildSettings {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "objectweb", "asm", xs @ _*) => MergeStrategy.first
case PathList("org", "objectweb", "asm", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "log4j", _*) => MergeStrategy.last // handled by log4j-over-slf4j
case PathList("org", "apache", "log4j", _*) => MergeStrategy.last
case PathList("org", "apache", "commons", _*) => MergeStrategy.last
case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "module-info.class" => MergeStrategy.discard
case PathList("com", "snowplowanalytics", "s3", "loader", "generated", _*) => MergeStrategy.last
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
Expand All @@ -104,4 +111,11 @@ object BuildSettings {
scalafmtConfig := file(".scalafmt.conf"),
scalafmtOnCompile := false
)

lazy val commonSettings = basicSettings ++ scalifySettings ++ sbtAssemblySettings ++ dockerSettings ++ addExampleConfToTestCp

lazy val lzoSettings = lzoDockerSettings ++ Seq(
Compile / discoveredMainClasses := Seq(),
Compile / mainClass := Some("com.snowplowanalytics.s3.loader.lzo.Main")
)
}
6 changes: 2 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ object Dependencies {
val log4j = "2.14.0"
val kinesisClient = "1.14.7"
val kinesisConnector = "1.3.0"
val hadoop = "2.7.7"
val hadoop = "2.10.1"
val elephantbird = "4.17"
val hadoopLZO = "0.4.20"
val apacheCommons = "3.2.1"
val jackson = "2.12.6"
val sentry = "1.7.30"
val collections = "3.2.2" // Address vulnerability
Expand Down Expand Up @@ -77,11 +76,10 @@ object Dependencies {
.exclude("org.mortbay.jetty", "jetty-util")
.exclude("org.mortbay.jetty", "jetty")
val collections = "commons-collections" % "commons-collections" % V.collections
val jaxbApi = "javax.xml.bind" % "jaxb-api" % V.jaxbApi
val jaxbApi = "javax.xml.bind" % "jaxb-api" % V.jaxbApi % Runtime
val elephantbird = ("com.twitter.elephantbird" % "elephant-bird-core" % V.elephantbird)
.exclude("com.hadoop.gplcompression", "hadoop-lzo")
val hadoopLZO = "com.hadoop.gplcompression" % "hadoop-lzo" % V.hadoopLZO
val apacheCommons = "org.apache.directory.studio" % "org.apache.commons.collections" % V.apacheCommons
val sentry = "io.sentry" % "sentry" % V.sentry
val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.2
sbt.version=1.5.5

0 comments on commit 6bc601b

Please sign in to comment.