Skip to content

Commit

Permalink
Reduce INFO logging (closes #50)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Jun 29, 2017
1 parent 5ac5b9f commit 76243bd
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.amazonaws.services.kinesis.model.Record
import com.amazonaws.services.kinesis.connectors.interfaces.ITransformer

// Thrift libs
import org.apache.thrift.{TSerializer,TDeserializer}
import org.apache.thrift.{TSerializer, TDeserializer}

// Loggings
import org.apache.commons.logging.LogFactory
Expand All @@ -31,15 +31,15 @@ import Scalaz._
/**
* Thrift serializer/deserializer class
*/
class RawEventTransformer extends ITransformer[ ValidatedRecord, EmitterInput ] {
class RawEventTransformer extends ITransformer[ValidatedRecord, EmitterInput] {

val log = LogFactory.getLog(getClass)

lazy val serializer = new TSerializer()
lazy val deserializer = new TDeserializer()

override def toClass(record: Record): ValidatedRecord = {
log.info("Converting one record to EmitterInput before adding it to the buffer")
log.debug("Converting one record to EmitterInput before adding it to the buffer")
record.getData.array.success
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.amazonaws.services.kinesis.connectors.interfaces.{
IKinesisConnectorPipeline
}
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration
import com.amazonaws.services.kinesis.connectors.impl.{BasicMemoryBuffer,AllPassFilter}
import com.amazonaws.services.kinesis.connectors.impl.{BasicMemoryBuffer, AllPassFilter}

// Tracker
import com.snowplowanalytics.snowplow.scalatracker.Tracker
Expand All @@ -33,7 +33,7 @@ import serializers._
/**
* S3Pipeline class sets up the Emitter/Buffer/Transformer/Filter
*/
class S3Pipeline(badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends IKinesisConnectorPipeline[ ValidatedRecord, EmitterInput ] {
class S3Pipeline(badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends IKinesisConnectorPipeline[ValidatedRecord, EmitterInput] {

override def getEmitter(configuration: KinesisConnectorConfiguration) = new S3Emitter(configuration, badSink, serializer, maxConnectionTime, tracker)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import serializers._
/**
* Boilerplate class for Kinessis Conenector
*/
class S3SinkExecutor(config: KinesisConnectorConfiguration, badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends KinesisConnectorExecutorBase[ ValidatedRecord, EmitterInput ] {
class S3SinkExecutor(config: KinesisConnectorConfiguration, badSink: ISink, serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker]) extends KinesisConnectorExecutorBase[ValidatedRecord, EmitterInput] {
super.initialize(config)

override def getKinesisConnectorRecordProcessorFactory = {
new KinesisConnectorRecordProcessorFactory[ ValidatedRecord, EmitterInput ](new S3Pipeline(badSink, serializer, maxConnectionTime, tracker), config)
new KinesisConnectorRecordProcessorFactory[ValidatedRecord, EmitterInput](new S3Pipeline(badSink, serializer, maxConnectionTime, tracker), config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ import serializers._
*/
object SinkApp extends App {

val log = LogFactory.getLog(getClass)

case class FileConfig(config: File = new File("."))
val parser = new scopt.OptionParser[FileConfig](generated.Settings.name) {
head(generated.Settings.name, generated.Settings.version)
Expand Down Expand Up @@ -86,6 +84,7 @@ object SinkApp extends App {

val logLevel = conf.getConfig("sink").getConfig("logging").getString("level")
System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, logLevel)
val log = LogFactory.getLog(getClass)

val credentialConfig = conf.getConfig("sink").getConfig("aws")

Expand Down

0 comments on commit 76243bd

Please sign in to comment.