Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ElasticSearch connector with date in index and custom document type #173

Merged
merged 4 commits into from
May 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kafka-connect-elastic/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
project(":kafka-connect-elastic") {

ext {
elastic4sVersion = '2.1.0'
elasticVersion = '2.2.0'
elastic4sVersion = '2.4.0'
elasticVersion = '2.4.5'
jnaVersion = '3.0.9'
}

Expand All @@ -30,4 +30,4 @@ project(":kafka-connect-elastic") {
compile "com.sun.jna:jna:$jnaVersion"
testCompile "org.elasticsearch:elasticsearch:$elasticVersion"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,23 @@ import com.sksamuel.elastic4s.source.Indexable
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord
import org.elasticsearch.action.bulk.BulkResponse
import com.sksamuel.elastic4s.BulkResult

import scala.collection.immutable.Iterable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import com.datamountaineer.streamreactor.connect.elastic.indexname.CustomIndexName

class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extends StrictLogging with ConverterUtil {
logger.info("Initialising Elastic Json writer")
createIndexes()

import ElasticJsonWriter.createIndexName

val createIndexNameWithSuffix = createIndexName(settings.indexNameSuffix) _

if (settings.indexAutoCreate) {
createIndexes()
}

implicit object SinkRecordIndexable extends Indexable[SinkRecord] {
override def json(t: SinkRecord): String = convertValueToJson(t).toString
Expand All @@ -44,7 +52,12 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend
*
* */
private def createIndexes() : Unit = {
settings.tableMap.map({ case(_,v) => client.execute( { create index v.toLowerCase() })})
settings.tableMap.map({ case(topicName, indexName) => client.execute( {
settings.documentType match {
case Some(documentType) => create index createIndexNameWithSuffix(indexName) mappings documentType
case _ => create index createIndexNameWithSuffix(indexName)
}
})})
}

/**
Expand Down Expand Up @@ -74,26 +87,27 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend
*
* @param records A list of SinkRecords
* */
def insert(records: Map[String, Set[SinkRecord]]) : Iterable[Future[BulkResponse]] = {
def insert(records: Map[String, Set[SinkRecord]]) : Iterable[Future[BulkResult]] = {
val ret = records.map({
case (topic, sinkRecords) =>
val fields = settings.fields(topic)
val ignoreFields = settings.ignoreFields(topic)
val i = settings.tableMap(topic)
val i = createIndexNameWithSuffix(settings.tableMap(topic))
val documentType = settings.documentType.getOrElse(i)

val indexes = sinkRecords
.map(r => convert(r, fields, ignoreFields))
.map { r =>
configMap(r.topic).getWriteMode match {
case WriteModeEnum.INSERT => index into i.toLowerCase / i source r
case WriteModeEnum.INSERT => index into i / documentType source r
case WriteModeEnum.UPSERT =>
// Build a Struct field extractor to get the value from the PK field
val pkField = settings.pks(r.topic)
// Extractor includes all since we already converted the records to have only needed fields
val extractor = StructFieldsExtractor(includeAllFields = true, Map(pkField -> pkField))
val fieldsAndValues = extractor.get(r.value.asInstanceOf[Struct]).toMap
val pkValue = fieldsAndValues(pkField).toString
update id pkValue in i.toLowerCase / i docAsUpsert fieldsAndValues
update id pkValue in i / documentType docAsUpsert fieldsAndValues
}
}

Expand All @@ -111,3 +125,8 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend
ret
}
}

private object ElasticJsonWriter {
def createIndexName(maybeIndexNameSuffix: Option[String])(indexName: String): String =
maybeIndexNameSuffix.fold(indexName) { indexNameSuffix => s"$indexName${CustomIndexName.parseIndexName(indexNameSuffix)}" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}
import org.apache.kafka.connect.sink.SinkTaskContext
import org.elasticsearch.common.settings.Settings

object ElasticWriter {
object ElasticWriter {
/**
* Construct a JSONWriter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ case class ElasticSettings(routes: List[Config],
fields : Map[String, Map[String, String]],
ignoreFields: Map[String, Set[String]],
pks: Map[String, String],
tableMap: Map[String, String])
tableMap: Map[String, String],
indexNameSuffix: Option[String],
indexAutoCreate: Boolean,
documentType: Option[String])

object ElasticSettings {

def apply(config: ElasticSinkConfig): ElasticSettings = {
val raw = config.getString(ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY)
require(!raw.isEmpty,s"Empty ${ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY_DOC}")
Expand All @@ -51,6 +55,14 @@ object ElasticSettings {
(r.getSource, keys.head)
}.toMap

ElasticSettings(routes = routes, fields = fields,ignoreFields = ignoreFields, pks = pks, tableMap = tableMap)
ElasticSettings(routes = routes,
fields = fields,
ignoreFields = ignoreFields,
pks = pks,
tableMap = tableMap,
indexNameSuffix = Option(config.getString(ElasticSinkConfigConstants.INDEX_NAME_SUFFIX)),
indexAutoCreate = config.getBoolean(ElasticSinkConfigConstants.AUTO_CREATE_INDEX),
documentType = Option(config.getString(ElasticSinkConfigConstants.DOCUMENT_TYPE))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ object ElasticSinkConfig {
.define(ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY, Type.STRING, Importance.HIGH,
ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY_DOC, "Target", 1, ConfigDef.Width.LONG,
ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY)
.define(ElasticSinkConfigConstants.INDEX_NAME_SUFFIX, Type.STRING, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX_DEFAULT,
Importance.LOW, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX_DOC, "Target", 2, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX)
.define(ElasticSinkConfigConstants.AUTO_CREATE_INDEX, Type.BOOLEAN, ElasticSinkConfigConstants.AUTO_CREATE_INDEX_DEFAULT,
Importance.LOW, ElasticSinkConfigConstants.AUTO_CREATE_INDEX_DOC, "Target", 3, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.AUTO_CREATE_INDEX)
.define(ElasticSinkConfigConstants.DOCUMENT_TYPE, Type.STRING, ElasticSinkConfigConstants.DOCUMENT_TYPE_DEFAULT, Importance.LOW, ElasticSinkConfigConstants.DOCUMENT_TYPE_DOC, "Target", 4, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.DOCUMENT_TYPE)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,16 @@ object ElasticSinkConfigConstants {
val URL_PREFIX_DEFAULT = "elasticsearch"
val EXPORT_ROUTE_QUERY = "connect.elastic.sink.kcql"
val EXPORT_ROUTE_QUERY_DOC = "KCQL expression describing field selection and routes."

val INDEX_NAME_SUFFIX = "connect.elastic.index.suffix"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robvadai
Shouldn't we add support for this in KCQL? The reason i am asking is for scenarios where you route messages from topic1 to index1 and topic2 to index2 and maybe you want one of the index to have suffix. Furthermore you might use different document type for messages coming from two different topics.
Thoughts?

Copy link
Contributor

@stheppi stheppi May 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KCQL example: INSERT INTO $THE_INDEX SELECT * FROM $THE_TOPIC [WITHDOCTYPE($docType)] [WITHINDEXSUFFIX($suffix)] [AUTOCREATE]

Autocreate - already exists in the KCQL grammar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes this could be part of KCQL, good idea, obviously updating KCQL was out of scope for me, we needed a way to support date string in ES index names
  2. We had a requirement to make sure indexes are never auto created because we create indexes with custom policies and document types. The Connector was automatically creating missing indexes which is what we wanted to avoid. Hence the conditional index creation was added here: https://github.com/ConnectedHomes/stream-reactor/blob/9bc01bd61034037cacb14f5154bbfa01ae8e4b47/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticJsonWriter.scala#L42

val INDEX_NAME_SUFFIX_DOC = "Suffix to append to the index name. Supports date time notation inside curly brackets. E.g. 'abc_{YYYY-MM-dd}_def'"
val INDEX_NAME_SUFFIX_DEFAULT: String = null

val AUTO_CREATE_INDEX = "connect.elastic.index.auto.create"
val AUTO_CREATE_INDEX_DOC = "The flag enables/disables auto creating the ElasticSearch index. Boolean value required. Defaults to TRUE."
val AUTO_CREATE_INDEX_DEFAULT = true

val DOCUMENT_TYPE = "connect.elastic.document.type"
val DOCUMENT_TYPE_DOC = "Sets the ElasticSearch document type. See https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-type-field.html for more info."
val DOCUMENT_TYPE_DEFAULT: String = null
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.datamountaineer.streamreactor.connect.elastic.indexname

import scala.annotation.tailrec

class InvalidCustomIndexNameException(message: String) extends RuntimeException(message)

case class CustomIndexName(fragments: Vector[IndexNameFragment]) {
override def toString = fragments.map(_.getFragment).mkString
}

object CustomIndexName {

@tailrec
private def parseIndexName(remainingChars: Vector[Char], currentFragment: StringBuilder, results: Vector[Option[IndexNameFragment]]): Vector[IndexNameFragment] =
remainingChars match {
case head +: rest => head match {
case DateTimeFragment.OpeningChar =>
val (dateTimeFormat, afterDateTimeFormatIncludingClosingChar) = rest.span { _ != DateTimeFragment.ClosingChar }
val afterDateTimeFormat = afterDateTimeFormatIncludingClosingChar.tail

val maybeCurrentFragment = currentFragment.mkString.toOption
val maybeDateTimeFormat = dateTimeFormat.mkString.toOption

val newResultsWithDateTimeFragment = results :+ maybeCurrentFragment.map(TextFragment.apply) :+ maybeDateTimeFormat.map(DateTimeFragment(_))

parseIndexName(afterDateTimeFormat, new StringBuilder, newResultsWithDateTimeFragment)
case DateTimeFragment.ClosingChar => throw new InvalidCustomIndexNameException(s"Found closing '${DateTimeFragment.ClosingChar}' but no opening character")
case anyOtherChar => parseIndexName(rest, currentFragment.append(anyOtherChar), results)
}
case Vector() =>
val maybeCurrentFragment = currentFragment.mkString.toOption
(results :+ maybeCurrentFragment.map(TextFragment.apply)).flatten
}

def parseIndexName(indexName: String): CustomIndexName =
CustomIndexName(parseIndexName(indexName.toVector, new StringBuilder, Vector.empty))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.datamountaineer.streamreactor.connect.elastic.indexname

import java.time.Clock
import java.time.LocalDateTime._
import java.time.format.DateTimeFormatter._

object ClockProvider {
val ClockInstance = Clock.systemUTC()
}

sealed trait IndexNameFragment {
def getFragment: String
}

case class TextFragment(text: String) extends IndexNameFragment {
override def getFragment: String = text
}

case class DateTimeFragment(dateTimeFormat: String, clock: Clock = ClockProvider.ClockInstance) extends IndexNameFragment {
override def getFragment: String = s"${now(clock).format(ofPattern(dateTimeFormat))}"
}
object DateTimeFragment {
val OpeningChar = '{'
val ClosingChar = '}'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.datamountaineer.streamreactor.connect.elastic

package object indexname {

implicit class StringToOption(text: String) {
def toOption: Option[String] = if (text.nonEmpty) Some(text) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.datamountaineer.streamreactor.connect.elastic

import java.time.LocalDateTime
import java.util

import com.datamountaineer.streamreactor.connect.elastic.config.{ElasticSinkConfig, ElasticSinkConfigConstants}
import com.datamountaineer.streamreactor.connect.elastic.config.ElasticSinkConfigConstants
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
Expand All @@ -27,11 +28,13 @@ import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}

import scala.collection.JavaConverters._
import scala.collection.mutable
import java.time.format.DateTimeFormatter._

trait TestElasticBase extends WordSpec with Matchers with BeforeAndAfter {
val ELASTIC_SEARCH_HOSTNAMES = "localhost:9300"
val TOPIC = "sink_test"
val INDEX = "index_andrew"
val INDEX_WITH_DATE = s"${INDEX}_${LocalDateTime.now.format(ofPattern("YYYY-MM-dd"))}"
//var TMP : File = _
val QUERY = s"INSERT INTO $INDEX SELECT * FROM $TOPIC"
val QUERY_SELECTION = s"INSERT INTO $INDEX SELECT id, string_field FROM $TOPIC"
Expand Down Expand Up @@ -131,9 +134,20 @@ trait TestElasticBase extends WordSpec with Matchers with BeforeAndAfter {
).asJava
}

def getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate: Boolean) = {
Map (
ElasticSinkConfigConstants.URL->ELASTIC_SEARCH_HOSTNAMES,
ElasticSinkConfigConstants.ES_CLUSTER_NAME->ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT,
ElasticSinkConfigConstants.URL_PREFIX->ElasticSinkConfigConstants.URL_PREFIX_DEFAULT,
ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY->QUERY,
ElasticSinkConfigConstants.INDEX_NAME_SUFFIX->"_{YYYY-MM-dd}",
ElasticSinkConfigConstants.AUTO_CREATE_INDEX->s"$autoCreate"
).asJava
}

def getElasticSinkConfigPropsDefaults = {
Map (
ElasticSinkConfigConstants.URL->ELASTIC_SEARCH_HOSTNAMES
).asJava
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.datamountaineer.streamreactor.connect.elastic

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.scalatest.{FlatSpec, Matchers}

class TestElasticJsonWriter extends FlatSpec with Matchers {
"ElasticJsonWriter" should "create an index name without suffix when suffix not set" in {
ElasticJsonWriter.createIndexName(None)("index_name") shouldBe "index_name"
}

it should "create an index name with suffix when suffix is set" in {
val formattedDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY-MM-dd"))
ElasticJsonWriter.createIndexName(Some("_suffix_{YYYY-MM-dd}"))("index_name") shouldBe s"index_name_suffix_$formattedDateTime"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import org.apache.kafka.connect.sink.SinkTaskContext
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.IndexNotFoundException
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar

Expand Down Expand Up @@ -103,4 +104,73 @@ class TestElasticWriter extends TestElasticBase with MockitoSugar {
client.close()
TMP.deleteRecursively()
}
}

"A ElasticWriter should update a number of records in Elastic Search with index suffix defined" in {
val TMP = File(System.getProperty("java.io.tmpdir") + "/elastic-" + UUID.randomUUID())
TMP.createDirectory()
//mock the context to return our assignment when called
val context = mock[SinkTaskContext]
when(context.assignment()).thenReturn(getAssignment)
//get test records
val testRecords = getTestRecords
//get config
val config = new ElasticSinkConfig(getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate = true))

val essettings = Settings
.settingsBuilder().put(ElasticSinkConfigConstants.ES_CLUSTER_NAME, ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT)
.put("path.home", TMP.toString).build()
val client = ElasticClient.local(essettings)

//get writer

val settings = ElasticSettings(config)
val writer = new ElasticJsonWriter(client = client, settings = settings)
//write records to elastic
writer.write(testRecords)

Thread.sleep(2000)
//check counts
val res = client.execute { search in INDEX_WITH_DATE }.await
res.totalHits shouldBe testRecords.size
//close writer
writer.close()
client.close()
TMP.deleteRecursively()
}

"it should fail writing to a non-existent index when auto creation is disabled" ignore {
val TMP = File(System.getProperty("java.io.tmpdir") + "/elastic-" + UUID.randomUUID())
TMP.createDirectory()
//mock the context to return our assignment when called
val context = mock[SinkTaskContext]
when(context.assignment()).thenReturn(getAssignment)
//get test records
val testRecords = getTestRecords
//get config
val config = new ElasticSinkConfig(getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate = false))

val essettings = Settings
.settingsBuilder().put(ElasticSinkConfigConstants.ES_CLUSTER_NAME, ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT)
.put("path.home", TMP.toString).put("action.auto_create_index", "false").build()
val client = ElasticClient.local(essettings)

//get writer

val settings = ElasticSettings(config)
val writer = new ElasticJsonWriter(client = client, settings = settings)
//write records to elastic
writer.write(testRecords)

Thread.sleep(2000)
//check counts
intercept[IndexNotFoundException] {
val res = client.execute {
search in INDEX_WITH_DATE
}.await
}
//close writer
writer.close()
client.close()
TMP.deleteRecursively()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.datamountaineer.streamreactor.connect.elastic.indexname

import java.time.{Clock, Instant, ZoneOffset}

trait ClockFixture {
val TestClock = Clock.fixed(Instant.parse("2016-10-02T14:00:00.00Z"), ZoneOffset.UTC)
}
Loading