-
Notifications
You must be signed in to change notification settings - Fork 367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ElasticSearch connector with date in index and custom document type #173
ElasticSearch connector with date in index and custom document type #173
Conversation
@@ -29,4 +29,16 @@ object ElasticSinkConfigConstants { | |||
val URL_PREFIX_DEFAULT = "elasticsearch" | |||
val EXPORT_ROUTE_QUERY = "connect.elastic.sink.kcql" | |||
val EXPORT_ROUTE_QUERY_DOC = "KCQL expression describing field selection and routes." | |||
|
|||
val INDEX_NAME_SUFFIX = "connect.elastic.index.suffix" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@robvadai
Shouldn't we add support for this in KCQL? The reason i am asking is for scenarios where you route messages from topic1 to index1 and topic2 to index2 and maybe you want one of the index to have suffix. Furthermore you might use different document type for messages coming from two different topics.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KCQL example: INSERT INTO $THE_INDEX SELECT * FROM $THE_TOPIC [WITHDOCTYPE($docType)] [WITHINDEXSUFFIX($suffix)] [AUTOCREATE]
Autocreate - already exists in the KCQL grammar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yes this could be part of KCQL, good idea, obviously updating KCQL was out of scope for me, we needed a way to support date string in ES index names
- We had a requirement to make sure indexes are never auto created because we create indexes with custom policies and document types. The Connector was automatically creating missing indexes which is what we wanted to avoid. Hence the conditional index creation was added here: https://github.com/ConnectedHomes/stream-reactor/blob/9bc01bd61034037cacb14f5154bbfa01ae8e4b47/kafka-connect-elastic/src/main/scala/com/datamountaineer/streamreactor/connect/elastic/ElasticJsonWriter.scala#L42
|
||
val DOCUMENT_TYPE = "connect.elastic.document.type" | ||
val DOCUMENT_TYPE_DOC = "Sets the ElasticSearch document type. See https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-type-field.html for more info." | ||
val DOCUMENT_TYPE_DEFAULT = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably use null and avoid toOption over an empty string
@robvadai thank you for the improvement |
👍 |
The ElasticSearch connector is updated with: