Skip to content

Commit

Permalink
Merge pull request #172 from datamountaineer/feature/renable_progress…
Browse files Browse the repository at this point in the history
…_cassandra

readding the progress in cassandra sink (customer requirement)
  • Loading branch information
Andrew Stevenson authored May 3, 2017
2 parents 9a102a5 + 55062a7 commit 6a300b0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ object CassandraConfigSink {
ConfigDef.Width.MEDIUM,
CassandraConfigConstants.SINK_THREAD_POOL_DISPLAY
)
.define(CassandraConfigConstants.PROGRESS_COUNTER_ENABLED,
Type.BOOLEAN,
CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DEFAULT,
Importance.MEDIUM,
CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DOC,
"Metrics",
1,
ConfigDef.Width.MEDIUM,
CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DISPLAY)
}

case class CassandraConfigSink(props: util.Map[String, String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,8 @@ object CassandraConfigConstants {
val TIMESTAMP_TYPE_DOC = "The Cassandra data type of the timestamp column, either timeuuid (default) or timestamp."
val TIMESTAMP_TYPE_DEFAULT = "timeUUID"

val PROGRESS_COUNTER_ENABLED = "connect.cassandra.progress.enabled"
val PROGRESS_COUNTER_ENABLED_DOC = "Enables the output for how many records have been processed"
val PROGRESS_COUNTER_ENABLED_DEFAULT = false
val PROGRESS_COUNTER_ENABLED_DISPLAY = "Enable progress counter"
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ case class CassandraSinkSetting(keySpace: String,
errorPolicy: ErrorPolicy,
threadPoolSize: Int,
consistencyLevel: Option[ConsistencyLevel],
taskRetries: Int = CassandraConfigConstants.NBR_OF_RETIRES_DEFAULT
) extends CassandraSetting
taskRetries: Int = CassandraConfigConstants.NBR_OF_RETIRES_DEFAULT,
enableProgress: Boolean = CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DEFAULT) extends CassandraSetting

/**
* Cassandra Setting used for both Readers and writers
Expand Down Expand Up @@ -119,6 +119,15 @@ object CassandraSettings extends StrictLogging {
val threadPoolSize = config.getThreadPoolSize
val consistencyLevel = config.getConsistencyLevel

CassandraSinkSetting(keySpace, routes, fields, ignoreFields, errorPolicy, threadPoolSize, consistencyLevel, retries)
val enableCounter = config.getBoolean(CassandraConfigConstants.PROGRESS_COUNTER_ENABLED)
CassandraSinkSetting(keySpace,
routes,
fields,
ignoreFields,
errorPolicy,
threadPoolSize,
consistencyLevel,
retries,
enableCounter)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.datamountaineer.streamreactor.connect.cassandra.sink

import java.util

import com.datamountaineer.streamreactor.connect.cassandra.config.CassandraConfigSink
import com.datamountaineer.streamreactor.connect.cassandra.config.{CassandraConfigSink, CassandraSettings}
import com.datamountaineer.streamreactor.connect.utils.ProgressCounter
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.clients.consumer.OffsetAndMetadata
Expand All @@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}


Expand All @@ -40,6 +39,7 @@ import scala.util.{Failure, Success, Try}
class CassandraSinkTask extends SinkTask with StrictLogging {
private var writer: Option[CassandraJsonWriter] = None
private val progressCounter = new ProgressCounter
private var enableProgress: Boolean = false
logger.info("Task initialising")


Expand All @@ -52,6 +52,8 @@ class CassandraSinkTask extends SinkTask with StrictLogging {
case Success(s) => s
}

val sinkSettings = CassandraSettings.configureSink(taskConfig)
enableProgress = sinkSettings.enableProgress
logger.info(
"""
| ____ __ __ ___ __ _
Expand All @@ -74,8 +76,11 @@ class CassandraSinkTask extends SinkTask with StrictLogging {
**/
override def put(records: util.Collection[SinkRecord]): Unit = {
require(writer.nonEmpty, "Writer is not set!")
writer.foreach(w => w.write(records.toVector))
//progressCounter.update(records.asScala.toSeq)
val seq = records.toVector
writer.foreach(w => w.write(seq))
if (enableProgress) {
progressCounter.update(seq)
}
}

/**
Expand All @@ -84,7 +89,9 @@ class CassandraSinkTask extends SinkTask with StrictLogging {
override def stop(): Unit = {
logger.info("Stopping Cassandra sink.")
writer.foreach(w => w.close())
progressCounter.empty
if (enableProgress) {
progressCounter.empty
}
}

override def flush(map: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {}
Expand Down

0 comments on commit 6a300b0

Please sign in to comment.