-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make KStream health result serializable (#1026) #1088
Make KStream health result serializable (#1026) #1088
Conversation
* Additional Serdes for Kafka streams health types returned as part of the {@link io.micronaut.management.health.indicator.HealthResult}. | ||
*/ | ||
@SerdeImport(TaskId.class) | ||
public class KafkaStreamHealthSerdes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks can you make this non-public and add @Internal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, a test would be good if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had written one, but wasn't sure if it was too specific a test to use a JsonMapper
to check if a TaskId
is serializable. If you think its appropriate here I'll add it.
The kafka-streams instances injected into the tests have 0 active or standby tasks which is why this was never caught.
Lmk your thoughts on this. If you want the concrete test I'll add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a check that the type is serialisable is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While testing I've realised that the TaskId
isn't serializable out of the box with Jackson, and if it were it would yield:
"taskId": {
"topicGroupId": 0,
"partition": 0
}
but I'm not sure this is helpful as part of the health report however.
The TaskId
object has a custom .toString()
which formats it in a readable manner. I think its more appropriate here.
"taskId": "0_0"
After all, the task id is meant to be an identifier, showing its component internal fields in this manner doesn't seem helpful.
This would be the change, adding the toString()
. I'd make it null safe of course
details.put("taskId", taskMetadata.taskId().toString());
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, for that case you probably need to implement a Serializer
and remove the @SerdeImport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I was just going to call .toString()
on the task object that gets put into the Map<String, Object>
.
Or would you rather a custom Serializer to allow users to override the serializer if they want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with the custom Serde
...streams/src/main/java/io/micronaut/configuration/kafka/streams/health/serde/TaskIdSerde.java
Outdated
Show resolved
Hide resolved
b556691
to
0688c17
Compare
sorry would you mind rebasing and then I will merge |
0688c17
to
6f42fd2
Compare
Rebased @graemerocher |
Thanks for the contribution! |
Summary
This pull request addresses issue #1026 by implementing a custom
Serde<TaskId>
to kafka-streams module. This avoid a serialization error when calling the/health
endpoint, which throws in turn a:Changes Made
annotationProcessor
, and serde api ascompileOnly
to load in the annotation.Related Issues