-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Make null key serialization/deserialization symmetrical #4351
Conversation
1fc4778
to
cf9a3a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @purplefox
Fix LGTM. Tests need a small change.
@@ -142,7 +142,7 @@ public Struct deserialize(final String topic, final byte[] bytes) { | |||
final Object primitive = delegate.deserialize(topic, bytes); | |||
final Struct struct = new Struct(schema); | |||
struct.put(field, primitive); | |||
return struct; | |||
return primitive == null ? null : struct; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if primitive
is null
, then could return after line 142.
if (value == null) { | ||
assertThat(result, is(nullValue())); | ||
} else { | ||
assertThat(result, is(struct)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if its null, then assert its null... weird asserting! ;)
Also, this means all of the tests will pass if deserialize always returns null
.
I think it would be more explicit if we revert this change and instead have explicit tests that test we:
a) serialize null
as null
, and
b) deserialize null
as null
.
Just played around with this, so may as well share the code.
i.e. we revert this to:
assertThat(result, is(struct));
And replace shouldHandleNulls
with:
@Test
public void shouldSerializeNullAsNull() {
// Given:
final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER);
final Serde<Object> serde = factory.createSerde(schema, ksqlConfig, srClientFactory);
// When:
final byte[] result = serde.serializer().serialize("topic", null);
// Then:
assertThat(result, is(nullValue()));
}
@Test
public void shouldDeserializeNullAsNull() {
// Given:
final PersistenceSchema schema = schemaWithFieldOfType(SqlTypes.INTEGER);
final Serde<Object> serde = factory.createSerde(schema, ksqlConfig, srClientFactory);
// When:
final Object result = serde.deserializer().deserialize("topic", null);
// Then:
assertThat(result, is(nullValue()));
}
Description
Fixes #4109
This fix makes sure that null keys are preserved during serialization followed by deserialization. I.e. the serialization/deserialization for null is now symmetrical.
Previously if a null key was serialized then deserialized it would end up as Struct{} with one null element, not the value null.
This meant that Streams graceful null key checking didn't work as the key wasn't null (it was Struct{}) as it was passed through the streams topology. When an attempt to extract the actual key before inserting into RocksDB occurred it gave an NPE.
Now we actually pass null as the key for the ConsumerRecord that is passed through the topology. This allows the streams null key checking to do it's job and for the null key record to be handled gracefully.
Testing done
Added new QTT test
Reviewer checklist