diff --git a/kafka-connect-elastic/build.gradle b/kafka-connect-elastic/build.gradle index ff844d8fa..48890b52c 100644 --- a/kafka-connect-elastic/build.gradle +++ b/kafka-connect-elastic/build.gradle @@ -17,8 +17,8 @@ project(":kafka-connect-elastic") { ext { - elastic4sVersion = '2.1.0' - elasticVersion = '2.2.0' + elastic4sVersion = '2.4.0' + elasticVersion = '2.4.5' jnaVersion = '3.0.9' } @@ -30,4 +30,4 @@ project(":kafka-connect-elastic") { compile "com.sun.jna:jna:$jnaVersion" testCompile "org.elasticsearch:elasticsearch:$elasticVersion" } -} \ No newline at end of file +} diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticJsonWriter.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticJsonWriter.scala index 4c2a13710..9298e7912 100644 --- a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticJsonWriter.scala +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticJsonWriter.scala @@ -25,15 +25,23 @@ import com.sksamuel.elastic4s.source.Indexable import com.typesafe.scalalogging.slf4j.StrictLogging import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.sink.SinkRecord -import org.elasticsearch.action.bulk.BulkResponse +import com.sksamuel.elastic4s.BulkResult import scala.collection.immutable.Iterable import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import com.datamountaineer.streamreactor.connect.elastic.indexname.CustomIndexName class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extends StrictLogging with ConverterUtil { logger.info("Initialising Elastic Json writer") - createIndexes() + + import ElasticJsonWriter.createIndexName + + val createIndexNameWithSuffix = createIndexName(settings.indexNameSuffix) _ + + if (settings.indexAutoCreate) { + createIndexes() + } implicit object SinkRecordIndexable extends Indexable[SinkRecord] { override def json(t: SinkRecord): String = convertValueToJson(t).toString @@ -44,7 +52,12 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend * * */ private def createIndexes() : Unit = { - settings.tableMap.map({ case(_,v) => client.execute( { create index v.toLowerCase() })}) + settings.tableMap.map({ case(topicName, indexName) => client.execute( { + settings.documentType match { + case Some(documentType) => create index createIndexNameWithSuffix(indexName) mappings documentType + case _ => create index createIndexNameWithSuffix(indexName) + } + })}) } /** @@ -74,18 +87,19 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend * * @param records A list of SinkRecords * */ - def insert(records: Map[String, Set[SinkRecord]]) : Iterable[Future[BulkResponse]] = { + def insert(records: Map[String, Set[SinkRecord]]) : Iterable[Future[BulkResult]] = { val ret = records.map({ case (topic, sinkRecords) => val fields = settings.fields(topic) val ignoreFields = settings.ignoreFields(topic) - val i = settings.tableMap(topic) + val i = createIndexNameWithSuffix(settings.tableMap(topic)) + val documentType = settings.documentType.getOrElse(i) val indexes = sinkRecords .map(r => convert(r, fields, ignoreFields)) .map { r => configMap(r.topic).getWriteMode match { - case WriteModeEnum.INSERT => index into i.toLowerCase / i source r + case WriteModeEnum.INSERT => index into i / documentType source r case WriteModeEnum.UPSERT => // Build a Struct field extractor to get the value from the PK field val pkField = settings.pks(r.topic) @@ -93,7 +107,7 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend val extractor = StructFieldsExtractor(includeAllFields = true, Map(pkField -> pkField)) val fieldsAndValues = extractor.get(r.value.asInstanceOf[Struct]).toMap val pkValue = fieldsAndValues(pkField).toString - update id pkValue in i.toLowerCase / i docAsUpsert fieldsAndValues + update id pkValue in i / documentType docAsUpsert fieldsAndValues } } @@ -111,3 +125,8 @@ class ElasticJsonWriter(client: ElasticClient, settings: ElasticSettings) extend ret } } + +private object ElasticJsonWriter { + def createIndexName(maybeIndexNameSuffix: Option[String])(indexName: String): String = + maybeIndexNameSuffix.fold(indexName) { indexNameSuffix => s"$indexName${CustomIndexName.parseIndexName(indexNameSuffix)}" } +} diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticWriter.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticWriter.scala index 883b0a571..0cf28703e 100644 --- a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticWriter.scala +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticWriter.scala @@ -21,7 +21,7 @@ import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri} import org.apache.kafka.connect.sink.SinkTaskContext import org.elasticsearch.common.settings.Settings -object ElasticWriter { +object ElasticWriter { /** * Construct a JSONWriter. * diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSettings.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSettings.scala index 2cddf6b62..2ebab86e5 100644 --- a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSettings.scala +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSettings.scala @@ -28,9 +28,13 @@ case class ElasticSettings(routes: List[Config], fields : Map[String, Map[String, String]], ignoreFields: Map[String, Set[String]], pks: Map[String, String], - tableMap: Map[String, String]) + tableMap: Map[String, String], + indexNameSuffix: Option[String], + indexAutoCreate: Boolean, + documentType: Option[String]) object ElasticSettings { + def apply(config: ElasticSinkConfig): ElasticSettings = { val raw = config.getString(ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY) require(!raw.isEmpty,s"Empty ${ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY_DOC}") @@ -51,6 +55,14 @@ object ElasticSettings { (r.getSource, keys.head) }.toMap - ElasticSettings(routes = routes, fields = fields,ignoreFields = ignoreFields, pks = pks, tableMap = tableMap) + ElasticSettings(routes = routes, + fields = fields, + ignoreFields = ignoreFields, + pks = pks, + tableMap = tableMap, + indexNameSuffix = Option(config.getString(ElasticSinkConfigConstants.INDEX_NAME_SUFFIX)), + indexAutoCreate = config.getBoolean(ElasticSinkConfigConstants.AUTO_CREATE_INDEX), + documentType = Option(config.getString(ElasticSinkConfigConstants.DOCUMENT_TYPE)) + ) } } diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfig.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfig.scala index dadcaa8a0..50c1f2e52 100644 --- a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfig.scala +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfig.scala @@ -36,6 +36,11 @@ object ElasticSinkConfig { .define(ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY, Type.STRING, Importance.HIGH, ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY_DOC, "Target", 1, ConfigDef.Width.LONG, ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY) + .define(ElasticSinkConfigConstants.INDEX_NAME_SUFFIX, Type.STRING, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX_DEFAULT, + Importance.LOW, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX_DOC, "Target", 2, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.INDEX_NAME_SUFFIX) + .define(ElasticSinkConfigConstants.AUTO_CREATE_INDEX, Type.BOOLEAN, ElasticSinkConfigConstants.AUTO_CREATE_INDEX_DEFAULT, + Importance.LOW, ElasticSinkConfigConstants.AUTO_CREATE_INDEX_DOC, "Target", 3, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.AUTO_CREATE_INDEX) + .define(ElasticSinkConfigConstants.DOCUMENT_TYPE, Type.STRING, ElasticSinkConfigConstants.DOCUMENT_TYPE_DEFAULT, Importance.LOW, ElasticSinkConfigConstants.DOCUMENT_TYPE_DOC, "Target", 4, ConfigDef.Width.MEDIUM, ElasticSinkConfigConstants.DOCUMENT_TYPE) } /** diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfigConstants.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfigConstants.scala index bafae6e17..d484bea34 100644 --- a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfigConstants.scala +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/config/ElasticSinkConfigConstants.scala @@ -29,4 +29,16 @@ object ElasticSinkConfigConstants { val URL_PREFIX_DEFAULT = "elasticsearch" val EXPORT_ROUTE_QUERY = "connect.elastic.sink.kcql" val EXPORT_ROUTE_QUERY_DOC = "KCQL expression describing field selection and routes." + + val INDEX_NAME_SUFFIX = "connect.elastic.index.suffix" + val INDEX_NAME_SUFFIX_DOC = "Suffix to append to the index name. Supports date time notation inside curly brackets. E.g. 'abc_{YYYY-MM-dd}_def'" + val INDEX_NAME_SUFFIX_DEFAULT: String = null + + val AUTO_CREATE_INDEX = "connect.elastic.index.auto.create" + val AUTO_CREATE_INDEX_DOC = "The flag enables/disables auto creating the ElasticSearch index. Boolean value required. Defaults to TRUE." + val AUTO_CREATE_INDEX_DEFAULT = true + + val DOCUMENT_TYPE = "connect.elastic.document.type" + val DOCUMENT_TYPE_DOC = "Sets the ElasticSearch document type. See https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-type-field.html for more info." + val DOCUMENT_TYPE_DEFAULT: String = null } diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/CustomIndexName.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/CustomIndexName.scala new file mode 100644 index 000000000..bc8f4b350 --- /dev/null +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/CustomIndexName.scala @@ -0,0 +1,37 @@ +package com.datamountaineer.streamreactor.connect.elastic.indexname + +import scala.annotation.tailrec + +class InvalidCustomIndexNameException(message: String) extends RuntimeException(message) + +case class CustomIndexName(fragments: Vector[IndexNameFragment]) { + override def toString = fragments.map(_.getFragment).mkString +} + +object CustomIndexName { + + @tailrec + private def parseIndexName(remainingChars: Vector[Char], currentFragment: StringBuilder, results: Vector[Option[IndexNameFragment]]): Vector[IndexNameFragment] = + remainingChars match { + case head +: rest => head match { + case DateTimeFragment.OpeningChar => + val (dateTimeFormat, afterDateTimeFormatIncludingClosingChar) = rest.span { _ != DateTimeFragment.ClosingChar } + val afterDateTimeFormat = afterDateTimeFormatIncludingClosingChar.tail + + val maybeCurrentFragment = currentFragment.mkString.toOption + val maybeDateTimeFormat = dateTimeFormat.mkString.toOption + + val newResultsWithDateTimeFragment = results :+ maybeCurrentFragment.map(TextFragment.apply) :+ maybeDateTimeFormat.map(DateTimeFragment(_)) + + parseIndexName(afterDateTimeFormat, new StringBuilder, newResultsWithDateTimeFragment) + case DateTimeFragment.ClosingChar => throw new InvalidCustomIndexNameException(s"Found closing '${DateTimeFragment.ClosingChar}' but no opening character") + case anyOtherChar => parseIndexName(rest, currentFragment.append(anyOtherChar), results) + } + case Vector() => + val maybeCurrentFragment = currentFragment.mkString.toOption + (results :+ maybeCurrentFragment.map(TextFragment.apply)).flatten + } + + def parseIndexName(indexName: String): CustomIndexName = + CustomIndexName(parseIndexName(indexName.toVector, new StringBuilder, Vector.empty)) +} diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/IndexNameFragment.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/IndexNameFragment.scala new file mode 100644 index 000000000..d33868073 --- /dev/null +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/IndexNameFragment.scala @@ -0,0 +1,25 @@ +package com.datamountaineer.streamreactor.connect.elastic.indexname + +import java.time.Clock +import java.time.LocalDateTime._ +import java.time.format.DateTimeFormatter._ + +object ClockProvider { + val ClockInstance = Clock.systemUTC() +} + +sealed trait IndexNameFragment { + def getFragment: String +} + +case class TextFragment(text: String) extends IndexNameFragment { + override def getFragment: String = text +} + +case class DateTimeFragment(dateTimeFormat: String, clock: Clock = ClockProvider.ClockInstance) extends IndexNameFragment { + override def getFragment: String = s"${now(clock).format(ofPattern(dateTimeFormat))}" +} +object DateTimeFragment { + val OpeningChar = '{' + val ClosingChar = '}' +} diff --git a/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/package.scala b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/package.scala new file mode 100644 index 000000000..82ac07f6e --- /dev/null +++ b/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/package.scala @@ -0,0 +1,8 @@ +package com.datamountaineer.streamreactor.connect.elastic + +package object indexname { + + implicit class StringToOption(text: String) { + def toOption: Option[String] = if (text.nonEmpty) Some(text) else None + } +} diff --git a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticBase.scala b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticBase.scala index 0f585f9ef..e695205c3 100644 --- a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticBase.scala +++ b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticBase.scala @@ -16,9 +16,10 @@ package com.datamountaineer.streamreactor.connect.elastic +import java.time.LocalDateTime import java.util -import com.datamountaineer.streamreactor.connect.elastic.config.{ElasticSinkConfig, ElasticSinkConfigConstants} +import com.datamountaineer.streamreactor.connect.elastic.config.ElasticSinkConfigConstants import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.TimestampType import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct} @@ -27,11 +28,13 @@ import org.scalatest.{BeforeAndAfter, Matchers, WordSpec} import scala.collection.JavaConverters._ import scala.collection.mutable +import java.time.format.DateTimeFormatter._ trait TestElasticBase extends WordSpec with Matchers with BeforeAndAfter { val ELASTIC_SEARCH_HOSTNAMES = "localhost:9300" val TOPIC = "sink_test" val INDEX = "index_andrew" + val INDEX_WITH_DATE = s"${INDEX}_${LocalDateTime.now.format(ofPattern("YYYY-MM-dd"))}" //var TMP : File = _ val QUERY = s"INSERT INTO $INDEX SELECT * FROM $TOPIC" val QUERY_SELECTION = s"INSERT INTO $INDEX SELECT id, string_field FROM $TOPIC" @@ -131,9 +134,20 @@ trait TestElasticBase extends WordSpec with Matchers with BeforeAndAfter { ).asJava } + def getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate: Boolean) = { + Map ( + ElasticSinkConfigConstants.URL->ELASTIC_SEARCH_HOSTNAMES, + ElasticSinkConfigConstants.ES_CLUSTER_NAME->ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT, + ElasticSinkConfigConstants.URL_PREFIX->ElasticSinkConfigConstants.URL_PREFIX_DEFAULT, + ElasticSinkConfigConstants.EXPORT_ROUTE_QUERY->QUERY, + ElasticSinkConfigConstants.INDEX_NAME_SUFFIX->"_{YYYY-MM-dd}", + ElasticSinkConfigConstants.AUTO_CREATE_INDEX->s"$autoCreate" + ).asJava + } + def getElasticSinkConfigPropsDefaults = { Map ( ElasticSinkConfigConstants.URL->ELASTIC_SEARCH_HOSTNAMES ).asJava } -} \ No newline at end of file +} diff --git a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticJsonWriter.scala b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticJsonWriter.scala new file mode 100644 index 000000000..8c50df2eb --- /dev/null +++ b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticJsonWriter.scala @@ -0,0 +1,17 @@ +package com.datamountaineer.streamreactor.connect.elastic + +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter + +import org.scalatest.{FlatSpec, Matchers} + +class TestElasticJsonWriter extends FlatSpec with Matchers { + "ElasticJsonWriter" should "create an index name without suffix when suffix not set" in { + ElasticJsonWriter.createIndexName(None)("index_name") shouldBe "index_name" + } + + it should "create an index name with suffix when suffix is set" in { + val formattedDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("YYYY-MM-dd")) + ElasticJsonWriter.createIndexName(Some("_suffix_{YYYY-MM-dd}"))("index_name") shouldBe s"index_name_suffix_$formattedDateTime" + } +} diff --git a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticWriter.scala b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticWriter.scala index 94c2270ef..0e0c58b19 100644 --- a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticWriter.scala +++ b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/TestElasticWriter.scala @@ -23,6 +23,7 @@ import com.sksamuel.elastic4s.ElasticClient import com.sksamuel.elastic4s.ElasticDsl._ import org.apache.kafka.connect.sink.SinkTaskContext import org.elasticsearch.common.settings.Settings +import org.elasticsearch.index.IndexNotFoundException import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar @@ -103,4 +104,73 @@ class TestElasticWriter extends TestElasticBase with MockitoSugar { client.close() TMP.deleteRecursively() } -} \ No newline at end of file + + "A ElasticWriter should update a number of records in Elastic Search with index suffix defined" in { + val TMP = File(System.getProperty("java.io.tmpdir") + "/elastic-" + UUID.randomUUID()) + TMP.createDirectory() + //mock the context to return our assignment when called + val context = mock[SinkTaskContext] + when(context.assignment()).thenReturn(getAssignment) + //get test records + val testRecords = getTestRecords + //get config + val config = new ElasticSinkConfig(getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate = true)) + + val essettings = Settings + .settingsBuilder().put(ElasticSinkConfigConstants.ES_CLUSTER_NAME, ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT) + .put("path.home", TMP.toString).build() + val client = ElasticClient.local(essettings) + + //get writer + + val settings = ElasticSettings(config) + val writer = new ElasticJsonWriter(client = client, settings = settings) + //write records to elastic + writer.write(testRecords) + + Thread.sleep(2000) + //check counts + val res = client.execute { search in INDEX_WITH_DATE }.await + res.totalHits shouldBe testRecords.size + //close writer + writer.close() + client.close() + TMP.deleteRecursively() + } + + "it should fail writing to a non-existent index when auto creation is disabled" ignore { + val TMP = File(System.getProperty("java.io.tmpdir") + "/elastic-" + UUID.randomUUID()) + TMP.createDirectory() + //mock the context to return our assignment when called + val context = mock[SinkTaskContext] + when(context.assignment()).thenReturn(getAssignment) + //get test records + val testRecords = getTestRecords + //get config + val config = new ElasticSinkConfig(getElasticSinkConfigPropsWithDateSuffixAndIndexAutoCreation(autoCreate = false)) + + val essettings = Settings + .settingsBuilder().put(ElasticSinkConfigConstants.ES_CLUSTER_NAME, ElasticSinkConfigConstants.ES_CLUSTER_NAME_DEFAULT) + .put("path.home", TMP.toString).put("action.auto_create_index", "false").build() + val client = ElasticClient.local(essettings) + + //get writer + + val settings = ElasticSettings(config) + val writer = new ElasticJsonWriter(client = client, settings = settings) + //write records to elastic + writer.write(testRecords) + + Thread.sleep(2000) + //check counts + intercept[IndexNotFoundException] { + val res = client.execute { + search in INDEX_WITH_DATE + }.await + } + //close writer + writer.close() + client.close() + TMP.deleteRecursively() + } +} diff --git a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/ClockFixture.scala b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/ClockFixture.scala new file mode 100644 index 000000000..957f3b44d --- /dev/null +++ b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/ClockFixture.scala @@ -0,0 +1,7 @@ +package com.datamountaineer.streamreactor.connect.elastic.indexname + +import java.time.{Clock, Instant, ZoneOffset} + +trait ClockFixture { + val TestClock = Clock.fixed(Instant.parse("2016-10-02T14:00:00.00Z"), ZoneOffset.UTC) +} diff --git a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/TestCustomIndexName.scala b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/TestCustomIndexName.scala new file mode 100644 index 000000000..9ef2f78d3 --- /dev/null +++ b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/TestCustomIndexName.scala @@ -0,0 +1,48 @@ +package com.datamountaineer.streamreactor.connect.elastic.indexname + +import org.scalatest.prop.{GeneratorDrivenPropertyChecks, TableDrivenPropertyChecks} +import org.scalatest.{FlatSpec, Matchers} + +class TestCustomIndexName extends FlatSpec with TableDrivenPropertyChecks with GeneratorDrivenPropertyChecks with Matchers { + + val ValidIndexNames = Table( + ("Valid index name", "Expectations"), + ("", Vector()), + ("abc", Vector(TextFragment("abc"))), + ("abc{YYYY-MM-dd}", Vector(TextFragment("abc"), DateTimeFragment("YYYY-MM-dd"))), + ("{YYYY-MM-dd}abc", Vector(DateTimeFragment("YYYY-MM-dd"), TextFragment("abc"))), + ("{YYYY-MM-dd}abc{HH-MM-ss}", Vector(DateTimeFragment("YYYY-MM-dd"), TextFragment("abc"), DateTimeFragment("HH-MM-ss"))), + ("{YYYY-MM-dd}{HH-MM-ss}", Vector(DateTimeFragment("YYYY-MM-dd"), DateTimeFragment("HH-MM-ss"))), + ("abc{}", Vector(TextFragment("abc"))), + ("{}abc", Vector(TextFragment("abc"))) + ) + + val InvalidIndexNames = Table( + ("Invalid index name"), + ("}abc"), + ("abc}"), + ("abc}def") + ) + + "Custom index name" should "parse a valid String with date time formatting options" in { + forAll (ValidIndexNames) { case (validIndexName, expectations) => + CustomIndexName.parseIndexName(validIndexName) shouldBe CustomIndexName(expectations) + } + } + + it should "throw an exception when using invalid index name" in { + forAll (InvalidIndexNames) { case (invalidIndexName) => + intercept[InvalidCustomIndexNameException] { + CustomIndexName.parseIndexName(invalidIndexName) + } + } + } + + it should "return a valid String from a list of fragments" in new ClockFixture { + CustomIndexName( + Vector(DateTimeFragment("YYYY-MM-dd", TestClock), + TextFragment("ABC"), + DateTimeFragment("HH:mm:ss", TestClock)) + ).toString shouldBe "2016-10-02ABC14:00:00" + } +} diff --git a/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/TestIndexNameFragment.scala b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/TestIndexNameFragment.scala new file mode 100644 index 000000000..d4f49de47 --- /dev/null +++ b/kafka-connect-elastic/src/test/scala/com/datamountaineer/streamreactor/connect/elastic/indexname/TestIndexNameFragment.scala @@ -0,0 +1,20 @@ +package com.datamountaineer.streamreactor.connect.elastic.indexname + +import org.scalacheck.Gen +import org.scalatest.prop.GeneratorDrivenPropertyChecks +import org.scalatest.{FlatSpec, Matchers} + +class TestIndexNameFragment extends FlatSpec with Matchers with GeneratorDrivenPropertyChecks { + + "TextFragment" should "return the original text when using getFragment()" in { + forAll(Gen.alphaStr) { someString => + TextFragment(someString).getFragment shouldBe someString + } + } + + "DateTimeFragment" should "return the formatted date when using getFragment()" in new ClockFixture { + val dateTimeFormat = "YYYY-MM-dd HH:mm:ss" + val expectedResult = "2016-10-02 14:00:00" + DateTimeFragment(dateTimeFormat, TestClock).getFragment shouldBe expectedResult + } +}