-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Full insert/update/delete support for dynamic table sinks #77
Comments
Ping @Nicole00 BTW. Have you checked this project? https://github.com/nebula-contrib/nebula-real-time-exchange |
Thanks for the prompt reply! Let me take a look at the real time exchange project as well. |
I took a brief look at the real time exchange project, and it seems that it reads the MySQL binlog directly (using the Flink CDC connector which runs Debezium embedded inside the connector). It is definitely a viable solution for replicating MySQL databases to NebulaGraph. (We were aware of the Flink CDC project, and it is really exciting to learn that the NebulaGraph ecosystem already has something built on top of it!) However, in our current production architecture, we already have Debezium running separately, which writes database change events to Kafka. In this way our Flink applications do not access MySQL binlog directly. In the future we may also have other Kafka-based streaming events (not from MySQL data changes) that we'd like to store in NebulaGraph, so the full dynamic table support would be very beneficial. |
Sure, also, nebula-exchange can be wired to Kafka, apart from the Flink SQL part, which I am not familiar with, before @Nicole00 or others could help. |
Thanks @wey-gu for sharing the nebula-exchange project as well. It seems to support Spark use cases. Right now I'm working on Flink applications, but I'll definitely take a look at it when I need to work with Kafka and NebulaGraph in Spark. |
I looked into the source code in more details and it seems there is per-executor I think the proper way is to buffer the rows (and deduplicate them by primary keys) and delegate the execution to three executors (for insert, update, and delete, respectively) when committing the batch. The official JDBC connector can serve as an example for such an implementation (link). I'm happy to contribute a PR if the above makes sense. |
Hi, @linhr, you can find examples in sink and source now, and I will try to enrich the documentation later |
Yes, simultaneous operation is not supported yet. Your suggestion seems good for me. Look forward to your contribution and improve Nebula Flink Connector with you. And What's your idea? @Nicole00 |
Hi @linhr , Sorry for late reply. I carefully consider your requirements, yeah, the flink connector does not consider the
|
@Nicole00 Thanks for your reply! Side-output seems a reasonable alternative to this issue (assuming we switch to the Data Stream API). Actually I have been working on this issue in the past few days. I've got something working locally. I'll send out a PR once my code change is in good shape (probably next week). (cc @liuxiaocs7) |
Wow, looking forward to your PR 🎉. Don't feel pressured and do it on your own time. |
My pleasure, I'll think about it carefully. :) |
I have an upstream data source and I'd like to replicate the data to NebulaGraph in real time. In my current setup I have one dynamic table A powered by the Kafka connector with the Debezium format, which captures data change events of the upstream MySQL database. I'd like to use Flink SQL
INSERT
statements to write to another dynamic table B powered by Nebula Flink connector. If everything is working, any insert/update/delete operation to dynamic table A will trigger a write to dynamic table B, where the Nebula Flink connector is responsible for understanding the change log and perform corresponding insert/update/delete operations to the underlying graph database. Does the Nebula Flink connector support such a use case?I was looking at the source code and it seems to me that the Nebula Flink connector do supports Flink SQL, but I cannot find examples how this could be used for streaming applications. Any guidance would be much appreciated.
Thanks a lot!
The text was updated successfully, but these errors were encountered: