-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[issue-180] Provide method for deserialization with metadata #305
Conversation
Signed-off-by: Brian Zhou <[email protected]>
Codecov Report
@@ Coverage Diff @@
## master #305 +/- ##
=========================================
Coverage ? 84.53%
Complexity ? 345
=========================================
Files ? 36
Lines ? 1623
Branches ? 177
=========================================
Hits ? 1372
Misses ? 139
Partials ? 112
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job!
src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
Outdated
Show resolved
Hide resolved
src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderTest.java
Outdated
Show resolved
Hide resolved
…e example Signed-off-by: Brian Zhou <[email protected]>
Signed-off-by: Brian Zhou <[email protected]>
src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderTest.java
Outdated
Show resolved
Hide resolved
* | ||
* @return the deserialized event with metadata | ||
*/ | ||
public T deserializeWithMetadata(EventRead<T> eventRead) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name suggests that it replaces deserialize
which is not the case. This method does not actually deserialize. I recommending renaming it to insertMetadata
or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true that this one does not do deserialization
, but it really "get" the data. It has a default implementation eventRead.getEvent()
and gets called in the FlinkPravegaReader
here: https://github.com/pravega/flink-connectors/pull/305/files#diff-005207bb16199a05351084fea79fd9acR271
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also not a fan of the current name, but I don't think insertMetadata
is correct because if it's not overriden which is most cases, it won't insert metadata. Maybe getData
, getEvent
is better? Any suggestions of a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reviewing this code I also got confused by the name of this method because it's not actually obvious that both deserialize
methods will get called, one at the Pravega level and one at the Flink Connector level.
It's almost like the method should be called extractEvent
since it's extracting it from the Pravega level API (and providing the chance to inject Metadata into the event). I considered perhaps it should be called getEvent
or retrieveEvent
but this would get confusing with the Pravega semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extractEvent
is a good one, I can buy in this name
Signed-off-by: Brian Zhou <[email protected]>
Signed-off-by: Brian Zhou <[email protected]>
@@ -29,6 +30,9 @@ | |||
* <p>This adapter exposes the Pravega serializer as a Flink Deserialization schema and | |||
* exposes the produced type (TypeInformation) to allow Flink to configure its internal | |||
* serialization and persistence stack. | |||
* | |||
* <p>An additional method {@link #deserializeWithMetadata(EventRead)} is provided for | |||
* applying the metadata in the deserialization. This method can be overriden in the extended class. </p> | |||
*/ | |||
public class PravegaDeserializationSchema<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type T
here is overloaded in that it expresses both the type of object expected from the Pravega stream (used by the Pravega API) and the type of object expected from the connector (with possible Metadata injected). This means that the object being stored in the stream MUST serialize to a Java class that expects that Metadata. For some formats (such as JSON) this may be easy but for other formats such as Java it's difficult and messy. In fact this can be seen in the Unit tests where IntegerWithEventPointer
is the type being "stored" in the stream.
I'm not sure if it's possible, but it feels like there should be two types, a type T
to represent the type from the stream and a type C
to represent the type from the connector. In most cases these could be the same but in other cases where MetaData needs to be injected these could be different.
Or maybe (just thinking allowed) we're talking about two completely different interfaces, i.e. a PravegaMetaDataSchema<T, C> that extends PravegaDeserializationSchema and just has the method
public C deserializeWithMetadata(EventRead<T> eventRead) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was my initial thought, and I also tried with it, but finally chose this. This is also what Kafka's approach to keep the same format of the event: https://github.com/apache/flink/blob/c6d9b37a8362317cc364c132ced5ac10dd3e10f2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java.
I wrote some investigation and the reason here: https://docs.google.com/document/d/1Dp8KYA6N81869dFjhD7w3z2hXd2QkU-vS1DNkz5aRC4/edit#
Signed-off-by: Brian Zhou <[email protected]>
Signed-off-by: Brian Zhou <[email protected]>
Signed-off-by: Brian Zhou <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* [issue-180] Provide method for deserialization with metadata Signed-off-by: Brian Zhou <[email protected]>
* [issue-180] Provide method for deserialization with metadata Signed-off-by: Brian Zhou <[email protected]>
Signed-off-by: Brian Zhou [email protected]
Change log description
Purpose of the change
Fixes #180
What the code does
Provide a method
deserializeWithMetadata
for user to override that can leverageEventRead
in the deserialization.Added unit test for the case.
How to verify it
./gradlew clean build
passesTarget to master, should cherry-pick to all
0.7
branch