Skip to content

Commit

Permalink
ElasticSearch connector with date in index and custom document type (#…
Browse files Browse the repository at this point in the history
…173)

* ElasticSearch connector supports date in index name and custom document type

* Fixed Jackson version problem between Kafka Streams 0.10.2.0 and ES libraries

* Testing with date in ES index name

* Default config values in ElasticSearch connector set to null
  • Loading branch information
robvadai authored and stheppi committed May 4, 2017
1 parent 6a300b0 commit 484d385
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 16 deletions.
6 changes: 3 additions & 3 deletions kafka-connect-elastic/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
project(":kafka-connect-elastic") {

ext {
elastic4sVersion = '2.1.0'
elasticVersion = '2.2.0'
elastic4sVersion = '2.4.0'
elasticVersion = '2.4.5'
jnaVersion = '3.0.9'
}

Expand All @@ -30,4 +30,4 @@ project(":kafka-connect-elastic") {
compile "com.sun.jna:jna:$jnaVersion"
testCompile "org.elasticsearch:elasticsearch:$elasticVersion"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,23 @@ import com.sksamuel.elastic4s.source.Indexable
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord
import org.elasticsearch.action.bulk.BulkResponse
import com.sksamuel.elastic4s.BulkResult

import scala.collection.immutable.Iterable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import com.datamountaineer.streamreactor.connect.elastic.indexname.CustomIndexName

class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extends StrictLogging with ConverterUtil {
logger.info("Initialising Elastic Json writer")
createIndexes()

import ElasticJsonWriter.createIndexName

val createIndexNameWithSuffix = createIndexName(settings.indexNameSuffix) _

if (settings.indexAutoCreate) {
createIndexes()
}

implicit object SinkRecordIndexable extends Indexable[SinkRecord] {
override def json(t: SinkRecord): String = convertValueToJson(t).toString
Expand All @@ -44,7 +52,12 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend
*
* */
private def createIndexes() : Unit = {
settings.tableMap.map({ case(_,v) => client.execute( { create index v.toLowerCase() })})
settings.tableMap.map({ case(topicName, indexName) => client.execute( {
settings.documentType match {
case Some(documentType) => create index createIndexNameWithSuffix(indexName) mappings documentType
case _ => create index createIndexNameWithSuffix(indexName)
}
})})
}

/**
Expand Down Expand Up @@ -74,26 +87,27 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend
*
* @param records A list of SinkRecords
* */
def insert(records: Map[String, Set[SinkRecord]]) : Iterable[Future[BulkResponse]] = {
def insert(records: Map[String, Set[SinkRecord]]) : Iterable[Future[BulkResult]] = {
val ret = records.map({
case (topic, sinkRecords) =>
val fields = settings.fields(topic)
val ignoreFields = settings.ignoreFields(topic)
val i = settings.tableMap(topic)
val i = createIndexNameWithSuffix(settings.tableMap(topic))
val documentType = settings.documentType.getOrElse(i)

val indexes = sinkRecords
.map(r => convert(r, fields, ignoreFields))
.map { r =>
configMap(r.topic).getWriteMode match {
case WriteModeEnum.INSERT => index into i.toLowerCase / i source r
case WriteModeEnum.INSERT => index into i / documentType source r
case WriteModeEnum.UPSERT =>
// Build a Struct field extractor to get the value from the PK field
val pkField = settings.pks(r.topic)
// Extractor includes all since we already converted the records to have only needed fields
val extractor = StructFieldsExtractor(includeAllFields = true, Map(pkField -> pkField))
val fieldsAndValues = extractor.get(r.value.asInstanceOf[Struct]).toMap
val pkValue = fieldsAndValues(pkField).toString
update id pkValue in i.toLowerCase / i docAsUpsert fieldsAndValues
update id pkValue in i / documentType docAsUpsert fieldsAndValues
}
}

Expand All @@ -111,3 +125,8 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend
ret
}
}

private object ElasticJsonWriter {
def createIndexName(maybeIndexNameSuffix: Option[String])(indexName: String): String =
maybeIndexNameSuffix.fold(indexName) { indexNameSuffix => s"$indexName${CustomIndexName.parseIndexName(indexNameSuffix)}" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}
import org.apache.kafka.connect.sink.SinkTaskContext
import org.elasticsearch.common.settings.Settings

object ElasticWriter {
object ElasticWriter {
/**
* Construct a JSONWriter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ case class ElasticSettings(routes: List[Config],
fields : Map[String, Map[String, String]],
ignoreFields: Map[String, Set[String]],
pks: Map[String, String],
tableMap: Map[String, String])
tableMap: Map[String, String],
indexNameSuffix: Option[String],
indexAutoCreate: Boolean,
documentType: Option[String])

object ElasticSettings {

def apply(config: ElasticSinkConfig): ElasticSettings = {
val raw = config.getString(ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY)
require(!raw.isEmpty,s"Empty ${ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY_DOC}")
Expand All @@ -51,6 +55,14 @@ object ElasticSettings {
(r.getSource, keys.head)
}.toMap

ElasticSettings(routes = routes, fields = fields,ignoreFields = ignoreFields, pks = pks, tableMap = tableMap)
ElasticSettings(routes = routes,
fields = fields,
ignoreFields = ignoreFields,
pks = pks,
tableMap = tableMap,
indexNameSuffix = Option(config.getString(ElasticSinkConfigConstants.INDEX_NAME_SUFFIX)),
indexAutoCreate = config.getBoolean(ElasticSinkConfigConstants.AUTO_CREATE_INDEX),
documentType = Option(config.getString(ElasticSinkConfigConstants.DOCUMENT_TYPE))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ object ElasticSinkConfig {
.define(ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY, Type.STRING, Importance.HIGH,
ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY_DOC, "Target", 1, ConfigDef.Width.LONG,
ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY)
.define(ElasticSinkConfigConstants.INDEX_NAME_SUFFIX, Type.STRING, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX_DEFAULT,
Importance.LOW, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX_DOC, "Target", 2, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX)
.define(ElasticSinkConfigConstants.AUTO_CREATE_INDEX, Type.BOOLEAN, ElasticSinkConfigConstants.AUTO_CREATE_INDEX_DEFAULT,
Importance.LOW, ElasticSinkConfigConstants.AUTO_CREATE_INDEX_DOC, "Target", 3, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.AUTO_CREATE_INDEX)
.define(ElasticSinkConfigConstants.DOCUMENT_TYPE, Type.STRING, ElasticSinkConfigConstants.DOCUMENT_TYPE_DEFAULT, Importance.LOW, ElasticSinkConfigConstants.DOCUMENT_TYPE_DOC, "Target", 4, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.DOCUMENT_TYPE)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,16 @@ object ElasticSinkConfigConstants {
val URL_PREFIX_DEFAULT = "elasticsearch"
val EXPORT_ROUTE_QUERY = "connect.elastic.sink.kcql"
val EXPORT_ROUTE_QUERY_DOC = "KCQL expression describing field selection and routes."

val INDEX_NAME_SUFFIX = "connect.elastic.index.suffix"
val INDEX_NAME_SUFFIX_DOC = "Suffix to append to the index name. Supports date time notation inside curly brackets. E.g. 'abc_{YYYY-MM-dd}_def'"
val INDEX_NAME_SUFFIX_DEFAULT: String = null

val AUTO_CREATE_INDEX = "connect.elastic.index.auto.create"
val AUTO_CREATE_INDEX_DOC = "The flag enables/disables auto creating the ElasticSearch index. Boolean value required. Defaults to TRUE."
val AUTO_CREATE_INDEX_DEFAULT = true

val DOCUMENT_TYPE = "connect.elastic.document.type"
val DOCUMENT_TYPE_DOC = "Sets the ElasticSearch document type. See https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-type-field.html for more info."
val DOCUMENT_TYPE_DEFAULT: String = null
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.datamountaineer.streamreactor.connect.elastic.indexname

import scala.annotation.tailrec

class InvalidCustomIndexNameException(message: String) extends RuntimeException(message)

case class CustomIndexName(fragments: Vector[IndexNameFragment]) {
override def toString = fragments.map(_.getFragment).mkString
}

object CustomIndexName {

@tailrec
private def parseIndexName(remainingChars: Vector[Char], currentFragment: StringBuilder, results: Vector[Option[IndexNameFragment]]): Vector[IndexNameFragment] =
remainingChars match {
case head +: rest => head match {
case DateTimeFragment.OpeningChar =>
val (dateTimeFormat, afterDateTimeFormatIncludingClosingChar) = rest.span { _ != DateTimeFragment.ClosingChar }
val afterDateTimeFormat = afterDateTimeFormatIncludingClosingChar.tail

val maybeCurrentFragment = currentFragment.mkString.toOption
val maybeDateTimeFormat = dateTimeFormat.mkString.toOption

val newResultsWithDateTimeFragment = results :+ maybeCurrentFragment.map(TextFragment.apply) :+ maybeDateTimeFormat.map(DateTimeFragment(_))

parseIndexName(afterDateTimeFormat, new StringBuilder, newResultsWithDateTimeFragment)
case DateTimeFragment.ClosingChar => throw new InvalidCustomIndexNameException(s"Found closing '${DateTimeFragment.ClosingChar}' but no opening character")
case anyOtherChar => parseIndexName(rest, currentFragment.append(anyOtherChar), results)
}
case Vector() =>
val maybeCurrentFragment = currentFragment.mkString.toOption
(results :+ maybeCurrentFragment.map(TextFragment.apply)).flatten
}

def parseIndexName(indexName: String): CustomIndexName =
CustomIndexName(parseIndexName(indexName.toVector, new StringBuilder, Vector.empty))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.datamountaineer.streamreactor.connect.elastic.indexname

import java.time.Clock
import java.time.LocalDateTime._
import java.time.format.DateTimeFormatter._

object ClockProvider {
val ClockInstance = Clock.systemUTC()
}

sealed trait IndexNameFragment {
def getFragment: String
}

case class TextFragment(text: String) extends IndexNameFragment {
override def getFragment: String = text
}

case class DateTimeFragment(dateTimeFormat: String, clock: Clock = ClockProvider.ClockInstance) extends IndexNameFragment {
override def getFragment: String = s"${now(clock).format(ofPattern(dateTimeFormat))}"
}
object DateTimeFragment {
val OpeningChar = '{'
val ClosingChar = '}'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.datamountaineer.streamreactor.connect.elastic

package object indexname {

implicit class StringToOption(text: String) {
def toOption: Option[String] = if (text.nonEmpty) Some(text) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.datamountaineer.streamreactor.connect.elastic

import java.time.LocalDateTime
import java.util

import com.datamountaineer.streamreactor.connect.elastic.config.{ElasticSinkConfig, ElasticSinkConfigConstants}
import com.datamountaineer.streamreactor.connect.elastic.config.ElasticSinkConfigConstants
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
Expand All @@ -27,11 +28,13 @@ import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}

import scala.collection.JavaConverters._
import scala.collection.mutable
import java.time.format.DateTimeFormatter._

trait TestElasticBase extends WordSpec with Matchers with BeforeAndAfter {
val ELASTIC_SEARCH_HOSTNAMES = "localhost:9300"
val TOPIC = "sink_test"
val INDEX = "index_andrew"
val INDEX_WITH_DATE = s"${INDEX}_${LocalDateTime.now.format(ofPattern("YYYY-MM-dd"))}"
//var TMP : File = _
val QUERY = s"INSERT INTO $INDEX SELECT * FROM $TOPIC"
val QUERY_SELECTION = s"INSERT INTO $INDEX SELECT id, string_field FROM $TOPIC"
Expand Down Expand Up @@ -131,9 +134,20 @@ trait TestElasticBase extends WordSpec with Matchers with BeforeAndAfter {
).asJava
}

def getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate: Boolean) = {
Map (
ElasticSinkConfigConstants.URL->ELASTIC_SEARCH_HOSTNAMES,
ElasticSinkConfigConstants.ES_CLUSTER_NAME->ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT,
ElasticSinkConfigConstants.URL_PREFIX->ElasticSinkConfigConstants.URL_PREFIX_DEFAULT,
ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY->QUERY,
ElasticSinkConfigConstants.INDEX_NAME_SUFFIX->"_{YYYY-MM-dd}",
ElasticSinkConfigConstants.AUTO_CREATE_INDEX->s"$autoCreate"
).asJava
}

def getElasticSinkConfigPropsDefaults = {
Map (
ElasticSinkConfigConstants.URL->ELASTIC_SEARCH_HOSTNAMES
).asJava
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.datamountaineer.streamreactor.connect.elastic

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.scalatest.{FlatSpec, Matchers}

class TestElasticJsonWriter extends FlatSpec with Matchers {
"ElasticJsonWriter" should "create an index name without suffix when suffix not set" in {
ElasticJsonWriter.createIndexName(None)("index_name") shouldBe "index_name"
}

it should "create an index name with suffix when suffix is set" in {
val formattedDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY-MM-dd"))
ElasticJsonWriter.createIndexName(Some("_suffix_{YYYY-MM-dd}"))("index_name") shouldBe s"index_name_suffix_$formattedDateTime"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import org.apache.kafka.connect.sink.SinkTaskContext
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.IndexNotFoundException
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar

Expand Down Expand Up @@ -103,4 +104,73 @@ class TestElasticWriter extends TestElasticBase with MockitoSugar {
client.close()
TMP.deleteRecursively()
}
}

"A ElasticWriter should update a number of records in Elastic Search with index suffix defined" in {
val TMP = File(System.getProperty("java.io.tmpdir") + "/elastic-" + UUID.randomUUID())
TMP.createDirectory()
//mock the context to return our assignment when called
val context = mock[SinkTaskContext]
when(context.assignment()).thenReturn(getAssignment)
//get test records
val testRecords = getTestRecords
//get config
val config = new ElasticSinkConfig(getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate = true))

val essettings = Settings
.settingsBuilder().put(ElasticSinkConfigConstants.ES_CLUSTER_NAME, ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT)
.put("path.home", TMP.toString).build()
val client = ElasticClient.local(essettings)

//get writer

val settings = ElasticSettings(config)
val writer = new ElasticJsonWriter(client = client, settings = settings)
//write records to elastic
writer.write(testRecords)

Thread.sleep(2000)
//check counts
val res = client.execute { search in INDEX_WITH_DATE }.await
res.totalHits shouldBe testRecords.size
//close writer
writer.close()
client.close()
TMP.deleteRecursively()
}

"it should fail writing to a non-existent index when auto creation is disabled" ignore {
val TMP = File(System.getProperty("java.io.tmpdir") + "/elastic-" + UUID.randomUUID())
TMP.createDirectory()
//mock the context to return our assignment when called
val context = mock[SinkTaskContext]
when(context.assignment()).thenReturn(getAssignment)
//get test records
val testRecords = getTestRecords
//get config
val config = new ElasticSinkConfig(getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate = false))

val essettings = Settings
.settingsBuilder().put(ElasticSinkConfigConstants.ES_CLUSTER_NAME, ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT)
.put("path.home", TMP.toString).put("action.auto_create_index", "false").build()
val client = ElasticClient.local(essettings)

//get writer

val settings = ElasticSettings(config)
val writer = new ElasticJsonWriter(client = client, settings = settings)
//write records to elastic
writer.write(testRecords)

Thread.sleep(2000)
//check counts
intercept[IndexNotFoundException] {
val res = client.execute {
search in INDEX_WITH_DATE
}.await
}
//close writer
writer.close()
client.close()
TMP.deleteRecursively()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.datamountaineer.streamreactor.connect.elastic.indexname

import java.time.{Clock, Instant, ZoneOffset}

trait ClockFixture {
val TestClock = Clock.fixed(Instant.parse("2016-10-02T14:00:00.00Z"), ZoneOffset.UTC)
}
Loading

0 comments on commit 484d385

Please sign in to comment.