-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: fixes ConcurrentMod exception when accessing steams metadata #4719
fix: fixes ConcurrentMod exception when accessing steams metadata #4719
Conversation
fixes: confluentinc#4639 Until the Streams bug https://issues.apache.org/jira/browse/KAFKA-9668 is fixed, ksql needs to protect itself from ConcurrentMod exceptions when accessing `KafkaSteams.allMetadata`. This change accesses the internals of `KafkaStreams` to acquire a reference to the field that needs to be synchronised to protect against the concurrent modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice hack!.
LGTM
I don't think this workaround will work. Synchronizing from our side is not sufficient - updates to the internal collection from the Kafka Streams side also need to be synchronized (or have a memory barrier through some other mechanism) or there will be no happens-before relationship and we might see stale results. I don't think there is much we can do here as a workaround. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per comment
The streams side already has the required synchronisation. All methods that change the state are synchronised on the same instance that we are synchronising on. |
If you're sure then fine :) It wasn't obvious from a quick inspection - simply looking to see if the methods that change state in StreamsMetadataState are synchronized is not sufficient, as the state is leaked in two methods (allMetadata and allMetadataForStore) so would also need to check that anything that calls those methods doesn't also amend that state, and if it does that's also protected by synchronizing on the StreamsMetadataState monitor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved assuming analysis of callers of leaked state is done.
Noting outside of the class itself changes the state. My fix in Streams proves this, as it changes the classes to return unmodifiable collections: apache/kafka#8233 |
I think it's sufficient to release this as a patch as 0.7.2, or go directly to 0.8.0 if the timing lines up. I don't think we're at a point where we need to backport patches. |
return kafkaStreams.allMetadata(); | ||
// Synchronized block need until https://issues.apache.org/jira/browse/KAFKA-9668 fixed. | ||
synchronized (streamsMetadataState) { | ||
return kafkaStreams.allMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it good practice to return an immutable list here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally, no, as we'd expect kafkaStreams.allMetadata()
to be doing the right thing. However, in this case... Yes! As kafkaStreams.allMetadata()
is not doing the right thing.
with apache/kafka#8233 merged into Kafka 2.5 we shouldn't need this fix. The only reason we'd need it is if we wanted to patch 0.7.2. |
Description
fixes: #4639
Until the Streams bug https://issues.apache.org/jira/browse/KAFKA-9668 is fixed, ksql needs to protect itself from ConcurrentMod exceptions when accessing
KafkaSteams.allMetadata
.This change accesses the internals of
KafkaStreams
to acquire a reference to the field that needs to be synchronised to protect against the concurrent modification.All this reflection nastiness can be removed once the Streams fix is in: apache/kafka#8233. Though we'd need to backport that fix if we want to remove this nastiness from 5.5 as well as master.
NOTE: This issue existings in 0.7.x as well. We should consider back-porting to there as well. cc @apurvam @colinhicks @MichaelDrogalis : thoughts?
Testing done
Unit tests added to test fix viability, plus manual testing.
Reviewer checklist