-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support all insert/update/delete operations for dynamic table sink #81
Support all insert/update/delete operations for dynamic table sink #81
Conversation
Codecov ReportBase: 61.59% // Head: 65.18% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #81 +/- ##
============================================
+ Coverage 61.59% 65.18% +3.59%
- Complexity 291 308 +17
============================================
Files 52 53 +1
Lines 1786 1873 +87
Branches 166 167 +1
============================================
+ Hits 1100 1221 +121
+ Misses 596 566 -30
+ Partials 90 86 -4
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
.setGraphSpace("flinkSink") | ||
.setTag("player") | ||
.setIdIndex(0) | ||
.setFields(Arrays.asList("name", "age")) | ||
.setPositions(Arrays.asList(1, 2)) | ||
.setBatch(2) | ||
.builder(); | ||
.setBatchSize(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We suggest not change the user interfaces in current version, may update them in next major version.
vesoft-inc/nebula-java#486 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! Yeah I think it's a good idea to ensure compatibility of user-facing interfaces within the same major version.
I've added back the methods such as .setBatch()
and .builder()
and marked them as @Deprecated
, while in README.md
I use the new methods. Hopefully this can encourage users to move to the new methods, without breaking existing code. Does this approach look fine to you? @Nicole00
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully this can encourage
Great approach, thanks for changing it.
@@ -81,7 +81,7 @@ public VidTypeEnum getVidType(MetaClient metaClient, String space) { | |||
spaceItem = metaClient.getSpace(space); | |||
} catch (TException | ExecuteFailedException e) { | |||
LOG.error("get space info error, ", e); | |||
return null; | |||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good change, thanks~
EdgeExecutionOptions deleteOptions = executionOptions.toBuilder() | ||
.setWriteMode(WriteModeEnum.DELETE) | ||
.build(); | ||
return new NebulaTableBufferReducedExecutor(dataStructureConverter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the insert mode is not upsert, and there is no update mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should change the upsert variable to insert
, to avoid conflicts with nebula's upsert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we use the Nebula INSERT
statements for both insert and update events in the data stream. What I found is that NebulaGraph will override existing vertices/edges during insert, so it works for the update event as well. (UPSERT
can be expensive in NebulaGraph so I tried to avoid it.)
maybe we should change the upsert variable to
insert
Sounds good! I've renamed the variables.
It's an excellent pr, thanks very mach for your contribution @linhr |
public static final ConfigOption<Integer> ID_INDEX = ConfigOptions | ||
.key("id-index") | ||
.intType() | ||
.defaultValue(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better and more uniform to define a default value in NebulaConstant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Fixed. Thanks!
<version>${flink.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeated dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this test-jar
as a test dependency so that we can use the 'values'
table connector when writing Flink integration tests. This connector allows us to build a table from test data in the code. I've seen this used in the official JDBC connector as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it, got a new approach for testing.
@Nicole00 Thanks for your review! I've made some changes to the code according to your feedback. Let me know if they look good. |
@Nicole00 Do you think if this PR can be made into the 3.4.0 release? Let me know what you think about my changes after your initial review. Thanks a lot! |
What type of PR is this?
What problem(s) does this PR solve?
Issue(s) number: #77
Description:
This code change enables the dynamic table sink to process upstream change log and performs vertex/edge insert/update/delete operations for each individual row.
How do you solve it?
The solution buffers the rows (and deduplicates them by primary keys) and delegates the execution to three executors based on the row kind (for insert, update, and delete, respectively) when committing the batch.
The primary key for vertices is the vertex ID, and the primary key for edges is the combination of source vertex ID, destination vertex ID, and the rank.
New and existing test cases can show that the solution is working.
Special notes for your reviewer, ex. impact of this fix, design document, etc:
To improve code readability, there are some minor (but backward-incompatible) changes to the public interface. The changes can be seen from the diff in
README.md
and the Java code in theexample
directory. Specifically, the following has changed:.builder()
method of various execution option builder classes have been corrected to.build()
, and old.builder()
remains for compatibility.NebulaBatchOutputFormat
andNebulaSinkFunction
).batch
remains for compatibility.