Skip to content

Commit

Permalink
Configuration decoding with pureconfig (closes #105)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldemirenes committed Sep 14, 2017
1 parent 51b7c38 commit febfb97
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 224 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy val root = project.in(file("."))
Dependencies.Libraries.scalaz7,
Dependencies.Libraries.json4sJackson,
Dependencies.Libraries.snowplowTracker,
Dependencies.Libraries.pureconfig,
// Scala (test only)
Dependencies.Libraries.specs2,
// Thrift (test only)
Expand Down
82 changes: 39 additions & 43 deletions examples/config.hocon.sample
Original file line number Diff line number Diff line change
@@ -1,106 +1,102 @@
# Default configuration for kinesis-lzo-s3-sink
# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source: "{{source}}"
source = "{{source}}"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink: "{{sink}}"
sink = "{{sink}}"

# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
#
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
access-key: "iam"
secret-key: "iam"
accessKey = "iam"
secretKey = "iam"
}

# Config for NSQ
nsq {
# Channel name for NSQ source
channel-name: "{{NsqSourceChannelName}}"
# If more than one application reading from the same NSQ topic at the same time,
# all of them must have unique channel name for getting all the data from the same topic
channelName = "{{nsqSourceChannelName}}"

# Host name for NSQ tools
host: "{{NsqHost}}"
host = "{{nsqHost}}"

# Port for nsqd
port: "{{NsqdPort}}"
# HTTP port for nsqd
port = {{nsqdPort}}

# Port for nsqlookupd
lookup-port: {{NsqlookupdPort}}
# HTTP port for nsqlookupd
lookupPort = {{nsqlookupdPort}}
}

kinesis {
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# Note: This only affects the first run of this application
# on a stream.
initial-position: "TRIM_HORIZON"
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition = "{{kinesisInitialPosition}}"

# Maximum number of records to read per GetRecords call
max-records: {{sinkKinesisMaxRecords}}
maxRecords = {{kinesisMaxRecords}}

region: "{{sinkKinesisRegion}}"
region = "{{kinesisRegion}}"

# "app-name" is used for a DynamoDB table to maintain stream state.
# You can set it automatically using: "SnowplowLzoS3Sink-$\\{sink.kinesis.in.stream-name\\}"
app-name: "{{sinkKinesisAppName}}"
# "appName" is used for a DynamoDB table to maintain stream state.
appName = "{{appName}}"
}

streams {
# Input stream name
stream-name-in = "{{InStreamName}}"
inStreamName = "{{inStreamName}}"

# Stream for events for which the storage process fails
stream-name-out = "{{OutStreamName}}"
outStreamName = "{{outStreamName}}"

# Events are accumulated in a buffer before being sent to S3.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byte-limit or
# - the number of stored records exceeds record-limit or
# - the time in milliseconds since it was last emptied exceeds time-limit
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byte-limit: {{sinkLzoBufferByteThreshold}} # Not supported by NSQ; will be ignored
record-limit: {{sinkLzoBufferRecordThreshold}}
time-limit: {{sinkLzoBufferTimeThreshold}} # Not supported by NSQ; will be ignored
byteLimit = {{bufferByteThreshold}} # Not supported by NSQ; will be ignored
recordLimit = {{bufferRecordThreshold}}
timeLimit = {{bufferTimeThreshold}} # Not supported by NSQ; will be ignored
}
}

s3 {
# If using us-east-1, then endpoint should be "http://s3.amazonaws.com".
# Otherwise "http://s3-<<region>>.s3.amazonaws.com", e.g.
# http://s3-eu-west-1.amazonaws.com
region: "{{sinkKinesisS3Region}}"
bucket: "{{sinkKinesisS3Bucket}}"
region = "{{s3Region}}"
bucket = "{{s3bucket}}"

# Format is one of lzo or gzip
# Note, that you can use gzip only for enriched data stream.
format: "{{sinkKinesisFormat}}"
format = "{{format}}"

# Maximum Timeout that the application is allowed to fail for
max-timeout: {{sinkKinesisMaxTimeout}}
maxTimeout = {{maxTimeout}}
}

# Set the Logging Level for the S3 Sink
# Options: ERROR, WARN, INFO, DEBUG, TRACE
logging {
level: "{{sinkLzoLogLevel}}"
level = "{{LogLevel}}"
}

# Optional section for tracking endpoints
monitoring {
snowplow {
collector-uri: "{{collectorUri}}"
collector-port: 80
app-id: "{{sinkLzoAppName}}"
method: "GET"
snowplow{
collectorUri = "{{collectorUri}}"
collectorPort = 80
appId = "{{appName}}"
method = "{{method}}"
}
}
1 change: 0 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ object BuildSettings {
import sbtassembly.AssemblyPlugin.autoImport._
lazy val sbtAssemblySettings = Seq(
assemblyJarName in assembly := { s"${name.value}-${version.value}.jar" },

assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList("org", "objectweb", "asm", xs @ _*) => MergeStrategy.first
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object Dependencies {
val json4s = "3.2.11"
val scalaz7 = "7.2.13"
val snowplowTracker = "0.3.0"
val pureconfig = "0.8.0"
// Scala (test only)
val specs2 = "3.9.1"
}
Expand Down Expand Up @@ -72,6 +73,7 @@ object Dependencies {
val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s
val scalaz7 = "org.scalaz" %% "scalaz-core" % V.scalaz7
val snowplowTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.snowplowTracker
val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig
// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import scala.collection.JavaConverters._
import java.util.Calendar
import java.text.SimpleDateFormat

//AWS libs
import com.amazonaws.auth.AWSCredentialsProvider

// AWS Kinesis connector libs
import com.amazonaws.services.kinesis.connectors.{
UnmodifiableBuffer,
Expand All @@ -38,21 +41,23 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker
// This project
import sinks._
import serializers._
import model._

/**
* Emitter for flushing Kinesis event data to S3.
*
* Once the buffer is full, the emit function is called.
*/
class KinesisS3Emitter(
config: KinesisConnectorConfiguration,
config: S3LoaderConfig,
provider: AWSCredentialsProvider,
badSink: ISink,
serializer: ISerializer,
maxConnectionTime: Long,
tracker: Option[Tracker]
) extends IEmitter[EmitterInput] {

val s3Emitter = new S3Emitter(config, badSink, maxConnectionTime, tracker)
val s3Emitter = new S3Emitter(config, provider, badSink, maxConnectionTime, tracker)
val dateFormat = new SimpleDateFormat("yyyy-MM-dd");

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker
// This project
import sinks._
import serializers._
import model._

/**
* S3Pipeline class sets up the Emitter/Buffer/Transformer/Filter
*/
class KinesisS3Pipeline(badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends IKinesisConnectorPipeline[ValidatedRecord, EmitterInput] {
class KinesisS3Pipeline(s3LoaderConfig: S3LoaderConfig, badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends IKinesisConnectorPipeline[ValidatedRecord, EmitterInput] {

override def getEmitter(configuration: KinesisConnectorConfiguration) = new KinesisS3Emitter(configuration, badSink, serializer, maxConnectionTime, tracker)
override def getEmitter(configuration: KinesisConnectorConfiguration) = new KinesisS3Emitter(s3LoaderConfig, configuration.AWS_CREDENTIALS_PROVIDER, badSink, serializer, maxConnectionTime, tracker)

override def getBuffer(configuration: KinesisConnectorConfiguration) = new BasicMemoryBuffer[ValidatedRecord](configuration)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker
// This project
import sinks._
import serializers._
import model._

/**
* Boilerplate class for Kinessis Conenector
*/
class KinesisSourceExecutor(config: KinesisConnectorConfiguration, badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends KinesisConnectorExecutorBase[ValidatedRecord, EmitterInput] {
class KinesisSourceExecutor(config: KinesisConnectorConfiguration, s3LoaderConfig: S3LoaderConfig, badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends KinesisConnectorExecutorBase[ValidatedRecord, EmitterInput] {
super.initialize(config)

override def getKinesisConnectorRecordProcessorFactory = {
new KinesisConnectorRecordProcessorFactory[ValidatedRecord, EmitterInput](new KinesisS3Pipeline(badSink, serializer, maxConnectionTime, tracker), config)
new KinesisConnectorRecordProcessorFactory[ValidatedRecord, EmitterInput](new KinesisS3Pipeline(s3LoaderConfig, badSink, serializer, maxConnectionTime, tracker), config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker
import scalaz._
import Scalaz._

//AWS libs
import com.amazonaws.auth.AWSCredentialsProvider

// Joda-Time
import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format.DateTimeFormat
Expand All @@ -49,19 +52,20 @@ import org.slf4j.LoggerFactory
// This project
import sinks._
import serializers._
import model._

/**
* Executor for NSQSource
*
* @param config the kinesis config for getting informations for S3
* @param nsqConfig the NSQ configuration
* @param badSink the configured BadSink
* @param serializer the instance of one of the serializer
* @param maxConnectionTime max time for trying to connect S3 instance
* @param config S3Loader configuration
* @param provider AWSCredentialsProvider
* @param badSink Configured BadSink
* @param serializer Serializer instance
* @param maxConnectionTime Max time for trying to connect S3 instance
*/
class NsqSourceExecutor(
config: KinesisConnectorConfiguration,
nsqConfig: S3LoaderNsqConfig,
config: S3LoaderConfig,
provider: AWSCredentialsProvider,
badSink: ISink,
serializer: ISerializer,
maxConnectionTime: Long,
Expand All @@ -73,7 +77,7 @@ class NsqSourceExecutor(
//nsq messages will be buffered in msgBuffer until buffer size become equal to nsqBufferSize
val msgBuffer = new ListBuffer[EmitterInput]()

val s3Emitter = new S3Emitter(config, badSink, maxConnectionTime, tracker)
val s3Emitter = new S3Emitter(config.s3, provider, badSink, maxConnectionTime, tracker)
private val TimeFormat = DateTimeFormat.forPattern("HH:mm:ss.SSS").withZone(DateTimeZone.UTC)
private val DateFormat = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC)

Expand Down Expand Up @@ -133,15 +137,15 @@ class NsqSourceExecutor(

val errorCallback = new NSQErrorCallback {
override def error(e: NSQException) =
log.error(s"Exception while consuming topic $nsqConfig.nsqGoodSourceTopicName", e)
log.error(s"Exception while consuming topic $config.streams.inStreamName", e)
}

val lookup = new DefaultNSQLookup
// use NSQLookupd
lookup.addLookupAddress(nsqConfig.nsqHost, nsqConfig.nsqlookupPort)
lookup.addLookupAddress(config.nsq.host, config.nsq.lookupPort)
val consumer = new NSQConsumer(lookup,
nsqConfig.nsqSourceTopicName,
nsqConfig.nsqSourceChannelName,
config.streams.inStreamName,
config.nsq.channelName,
nsqCallback,
new NSQConfig(),
errorCallback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import com.amazonaws.AmazonServiceException
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.auth.AWSCredentialsProvider

// json4s
import org.json4s.jackson.JsonMethods._
Expand All @@ -54,29 +55,32 @@ import org.joda.time.format.DateTimeFormat
// This project
import sinks._
import serializers._
import model._

/**
* Emitter for flushing data to S3.
*
* @param config Configuration for S3 Client
* @param config S3Loader configuration
* @param provider AWSCredentialsProvider
* @param badSink Sink instance for not sent data
* @param maxConnectionTime Max time for attempting to send S3
* @param tracker Tracker instance
*/
class S3Emitter(
config: KinesisConnectorConfiguration,
config: S3Config,
provider: AWSCredentialsProvider,
badSink: ISink,
maxConnectionTime: Long,
tracker: Option[Tracker]
) {

// create Amazon S3 Client
private val bucket = config.S3_BUCKET
private val bucket = config.bucket
val log = LoggerFactory.getLogger(getClass)
val client = AmazonS3ClientBuilder
.standard()
.withCredentials(config.AWS_CREDENTIALS_PROVIDER)
.withEndpointConfiguration(new EndpointConfiguration(config.S3_ENDPOINT, config.REGION_NAME))
.withCredentials(provider)
.withEndpointConfiguration(new EndpointConfiguration(config.endpoint, config.region))
.build()

/**
Expand Down
Loading

0 comments on commit febfb97

Please sign in to comment.