Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HELLO!!] Adds routing support, configurable via 'routing.field.name' #558

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

hartmut-co-uk
Copy link

@hartmut-co-uk hartmut-co-uk commented Jun 24, 2021

Refs:

Problem

ElasticSearch _routing field currently is not supported.

Solution

Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

Parent/Child relationships -> join field type

Test Strategy

DataConverterTest has been added.
Integration test might require refactoring of existing tests.
ElasticsearchConnectorIT / ElasticsearchConnectorBaseIT seems to use a very basic schema / payload.
Also ElasticsearchHelperClient would need to be improved to also support _routing.

Please advise how to approach.

Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

TODOs / open topics

  • 'routing.field.name' config
  • read field value, populate bulk
  • Unit tests
  • ? Advanced support for parent-join field
    • ? add SMT for parent-join field
    • ? add support directly to connector
  • Integration tests
  • update documentation

Release Plan

  • New feature
  • Non-breaking change
  • New config fields are optional
  • Does not require any change or migration upon upgrade

@hartmut-co-uk hartmut-co-uk requested a review from a team as a code owner June 24, 2021 00:56
@ghost
Copy link

ghost commented Jun 24, 2021

@confluentinc It looks like @hartmut-co-uk just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@hartmut-co-uk
Copy link
Author

hartmut-co-uk commented Jun 24, 2021

I noticed for parent-join use case - in addition to adding the routing, the payload also potentially needs to be enriched with the my_join_field.name and my_join_field.parent (for children).
https://www.elastic.co/guide/en/elasticsearch/reference/current/parent-join.html

PUT my-index-000001/_doc/3?routing=1&refresh 
{
  "my_id": "3",
  "parent_id": "1",
  "text": "This is an answer",
  "my_join_field": {
    "name": "answer", 
    "parent": "1" 
  }
}

I wonder if it would be worth to also natively build into this connector, instead of forcing the user to enrich data upfront, or build+require a custom SMT.

Note: I tried with InsertField SMT - but since it only supports flat fields it's impossible to enrich the struct for children.
https://docs.confluent.io/platform/current/connect/transforms/insertfield.html

So one solution I could see is to add following config options:

  • join.field.name in above example would be 'my_join_field'
  • join.relation.name in above example would be 'answer'
  • join.parent.id.field.name would essentially be the value field (path), -> e.g. same as would be configured for routing.field.name

@hartmut-co-uk
Copy link
Author

A custom SMT might be the best fit to create the ‘join-field struct’ for ES parent/child relationship on the fly.
I wonder if it's possible and would be an acceptable pattern to just add + ship a custom SMT for this along with the connector jar?

@hartmut-co-uk
Copy link
Author

Further room for improvement: allow to use data from $key (not only from a $value field)

@hartmut-co-uk
Copy link
Author

I've implemented a simple SMT (also works just fine having it embedded in the connector jar).

sticking with the ES example it's configured like:

  "transforms.insertParentJoin.join.field.name": "my_join_field",
  "transforms.insertParentJoin.relation.type.name": "answer",
  "transforms.insertParentJoin.parent.id.field.path": "questionId"

(with the actual msg value having a field questionId as the 'question' parent ID of the answer doc - this would also be used for routing...)

Please provide some feedback on how to proceed further.

@frankkoornstra
Copy link

Amazing work @hartmut-co-uk 😍 This is going to help us a lot!

@hartmut-co-uk
Copy link
Author

@levzem could you please provide initial feedback?

Also

  • advise how to proceed with integration tests?

    Integration test might require refactoring of existing tests.
    ElasticsearchConnectorIT / ElasticsearchConnectorBaseIT seems to use a very basic schema / payload.
    Also ElasticsearchHelperClient would need to be improved to also support _routing.

  • Do you think it would make sense to add an SMT to the connector? I have an implementation of a
    public class InsertParentJoin<R extends ConnectRecord<R>> implements Transformation<R> {
    (I've had to add dependency org.apache.kafka:connect-transforms:${kafka.version})

    • Or would/should it make more sense to embed the functionality directly into the connector?
    • Or do not add but maybe create + provide as a separate maven artifact?

@hartmut-co-uk
Copy link
Author

I'm still keen to help wrap this up, write more tests, .. if we can agree how to proceed.

@frankkoornstra
Copy link

frankkoornstra commented Aug 19, 2021

@yanglei99 @kkonstantine @levzem @dosvath anyone? We so desperately need this and the work is done but it's dead in the water for ~2 months now without any apparent reason 😕

@hartmut-co-uk
Copy link
Author

ping

@hartmut-co-uk hartmut-co-uk changed the title Adds routing support, configurable via 'routing.field.name' [FEEDBACK REQUIRED] Adds routing support, configurable via 'routing.field.name' Sep 16, 2021
@hartmut-co-uk hartmut-co-uk changed the title [FEEDBACK REQUIRED] Adds routing support, configurable via 'routing.field.name' [FEEDBACK NEEDED] Adds routing support, configurable via 'routing.field.name' Sep 16, 2021
@frankkoornstra
Copy link

# Conflicts:
#	src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorIT.java
@hartmut-co-uk hartmut-co-uk changed the title [FEEDBACK NEEDED] Adds routing support, configurable via 'routing.field.name' [HELLO!!] Adds routing support, configurable via 'routing.field.name' Dec 20, 2021
@frankkoornstra
Copy link

frankkoornstra commented Feb 1, 2022

Went on a sabbatical for 3 months and still 😂 I thought for a moment that this project was abandoned all together but there's still stuff merged few days ago.

@kkonstantine 🙏 🙏 🙏 what needs to happen to get some eyes on this? Over half a year open now...

@hartmut-co-uk
Copy link
Author

I don't think it's been abandoned in general. I'm currently busy with other things.
(I've not yet come to use the proposed changes beyond a POC)

Still happy to contribute if someone confirms that the feature/changes are welcome and will be merged + how to proceed with adding tests / SMT.

zigam pushed a commit to fentik/kafka-connect-elasticsearch that referenced this pull request May 17, 2022
zigam pushed a commit to fentik/kafka-connect-elasticsearch that referenced this pull request May 18, 2022
zigam pushed a commit to fentik/kafka-connect-elasticsearch that referenced this pull request May 18, 2022
@dania-ru
Copy link

Any plans to merge this PR?

@hartmut-co-uk
Copy link
Author

Dear maintainers, is this feature/PR welcome, and are further changes required?
Is the config option fine as is, what about the new private methods added (are there better alternatives/libraries to use)?

I'm still keen to help wrap this up and write tests, ... if we can agree on how to proceed.

@hartmut-co-uk
Copy link
Author

...

@frankkoornstra
Copy link

If you're still searching for a platform-type of solution, take a look at Apache Flink. You get pretty much access to the bare-bones Elasticsearch API so you can do whatever you want, including routing, timeouts per bulk action, etc. Plus an actual Web UI 🤩

@hartmut-co-uk
Copy link
Author

If you're still searching for a platform-type of solution, take a look at Apache Flink. You get pretty much access to the bare-bones Elasticsearch API so you can do whatever you want, including routing, timeouts per bulk action, etc. Plus an actual Web UI 🤩

Use flink kafka source -> flink elasticsearch sink -- instead of kafka connect? 🤔
This doesn't unblock getting this PR over the line ^^ but it's a great suggestion of an alternative solution way easier to customise. 👍 👍

Many thanks for bringing this up, given I've been considering Apache Flink for other use cases in my project, too...

zigam pushed a commit to fentik/kafka-connect-elasticsearch that referenced this pull request Oct 18, 2022
zigam pushed a commit to fentik/kafka-connect-elasticsearch that referenced this pull request Oct 18, 2022
@cla-assistant
Copy link

cla-assistant bot commented Sep 11, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@sandyplace
Copy link

I am no expert but I think this approach can be difficult, routing based upon a value in a record is tricky when it comes to deleting. Typically deleting is triggered by a null record. We noticed when that is the case the delete request does not route the delete to the correct shard because the routing key is missing from the record. We had to take a different approach to get this to work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants