diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala index 2bbc74416..4b34ea82d 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfig.scala @@ -327,6 +327,15 @@ object CassandraConfigSink { ConfigDef.Width.MEDIUM, CassandraConfigConstants.SINK_THREAD_POOL_DISPLAY ) + .define(CassandraConfigConstants.PROGRESS_COUNTER_ENABLED, + Type.BOOLEAN, + CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DEFAULT, + Importance.MEDIUM, + CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DOC, + "Metrics", + 1, + ConfigDef.Width.MEDIUM, + CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DISPLAY) } case class CassandraConfigSink(props: util.Map[String, String]) diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala index 2f0295c51..26b258318 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraConfigConstants.scala @@ -154,4 +154,8 @@ object CassandraConfigConstants { val TIMESTAMP_TYPE_DOC = "The Cassandra data type of the timestamp column, either timeuuid (default) or timestamp." val TIMESTAMP_TYPE_DEFAULT = "timeUUID" + val PROGRESS_COUNTER_ENABLED = "connect.cassandra.progress.enabled" + val PROGRESS_COUNTER_ENABLED_DOC = "Enables the output for how many records have been processed" + val PROGRESS_COUNTER_ENABLED_DEFAULT = false + val PROGRESS_COUNTER_ENABLED_DISPLAY = "Enable progress counter" } diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala index 25a624212..277bc0f7e 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/config/CassandraSettings.scala @@ -57,8 +57,8 @@ case class CassandraSinkSetting(keySpace: String, errorPolicy: ErrorPolicy, threadPoolSize: Int, consistencyLevel: Option[ConsistencyLevel], - taskRetries: Int = CassandraConfigConstants.NBR_OF_RETIRES_DEFAULT - ) extends CassandraSetting + taskRetries: Int = CassandraConfigConstants.NBR_OF_RETIRES_DEFAULT, + enableProgress: Boolean = CassandraConfigConstants.PROGRESS_COUNTER_ENABLED_DEFAULT) extends CassandraSetting /** * Cassandra Setting used for both Readers and writers @@ -119,6 +119,15 @@ object CassandraSettings extends StrictLogging { val threadPoolSize = config.getThreadPoolSize val consistencyLevel = config.getConsistencyLevel - CassandraSinkSetting(keySpace, routes, fields, ignoreFields, errorPolicy, threadPoolSize, consistencyLevel, retries) + val enableCounter = config.getBoolean(CassandraConfigConstants.PROGRESS_COUNTER_ENABLED) + CassandraSinkSetting(keySpace, + routes, + fields, + ignoreFields, + errorPolicy, + threadPoolSize, + consistencyLevel, + retries, + enableCounter) } } diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala index 33a017378..97798eb18 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala @@ -18,7 +18,7 @@ package com.datamountaineer.streamreactor.connect.cassandra.sink import java.util -import com.datamountaineer.streamreactor.connect.cassandra.config.CassandraConfigSink +import com.datamountaineer.streamreactor.connect.cassandra.config.{CassandraConfigSink, CassandraSettings} import com.datamountaineer.streamreactor.connect.utils.ProgressCounter import com.typesafe.scalalogging.slf4j.StrictLogging import org.apache.kafka.clients.consumer.OffsetAndMetadata @@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.{SinkRecord, SinkTask} import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} @@ -40,6 +39,7 @@ import scala.util.{Failure, Success, Try} class CassandraSinkTask extends SinkTask with StrictLogging { private var writer: Option[CassandraJsonWriter] = None private val progressCounter = new ProgressCounter + private var enableProgress: Boolean = false logger.info("Task initialising") @@ -52,6 +52,8 @@ class CassandraSinkTask extends SinkTask with StrictLogging { case Success(s) => s } + val sinkSettings = CassandraSettings.configureSink(taskConfig) + enableProgress = sinkSettings.enableProgress logger.info( """ | ____ __ __ ___ __ _ @@ -74,8 +76,11 @@ class CassandraSinkTask extends SinkTask with StrictLogging { **/ override def put(records: util.Collection[SinkRecord]): Unit = { require(writer.nonEmpty, "Writer is not set!") - writer.foreach(w => w.write(records.toVector)) - //progressCounter.update(records.asScala.toSeq) + val seq = records.toVector + writer.foreach(w => w.write(seq)) + if (enableProgress) { + progressCounter.update(seq) + } } /** @@ -84,7 +89,9 @@ class CassandraSinkTask extends SinkTask with StrictLogging { override def stop(): Unit = { logger.info("Stopping Cassandra sink.") writer.foreach(w => w.close()) - progressCounter.empty + if (enableProgress) { + progressCounter.empty + } } override def flush(map: util.Map[TopicPartition, OffsetAndMetadata]): Unit = {}