Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentModificationException in Cluster status code? #4639

Open
big-andy-coates opened this issue Feb 26, 2020 · 3 comments · Fixed by #4650
Open

ConcurrentModificationException in Cluster status code? #4639

big-andy-coates opened this issue Feb 26, 2020 · 3 comments · Fixed by #4650
Labels
bug test-stability All our flakey tests.

Comments

@big-andy-coates
Copy link
Contributor

Had a build failure that might suggest we've got a race condition in the new cluster status code:

Here's the job that failed:

https://jenkins.confluent.io/job/confluentinc-pr/job/ksql/job/PR-4638/1/testReport/junit/io.confluent.ksql.rest.integration/HeartbeatAgentFunctionalTest/shouldMarkRemoteServerAsUpThenDownThenUp/

@big-andy-coates
Copy link
Contributor Author

And here's the output:

Failed
io.confluent.ksql.rest.integration.HeartbeatAgentFunctionalTest.shouldMarkRemoteServerAsUpThenDownThenUp

Error Message
Erroneous result: java.util.ConcurrentModificationException
java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
java.util.ArrayList$Itr.next(ArrayList.java:859)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.getActiveStandbyInformation(ClusterStatusResource.java:107)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.lambda$getResponse$1(ClusterStatusResource.java:88)
java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1575)
java.util.concurrent.ConcurrentHashMap$EntrySpliterator.forEachRemaining(ConcurrentHashMap.java:3606)
java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1600)
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.getResponse(ClusterStatusResource.java:84)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.checkClusterStatus(ClusterStatusResource.java:74)
sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
org.glassfish.jersey.internal.Errors.process(Errors.java:292)
org.glassfish.jersey.internal.Errors.process(Errors.java:274)
org.glassfish.jersey.internal.Errors.process(Errors.java:244)
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:386)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:561)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:502)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:439)
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1604)
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:545)
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607)
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297)
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1577)
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:767)
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
org.eclipse.jetty.server.Server.handle(Server.java:500)
org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:388)
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
java.lang.Thread.run(Thread.java:748)
Stacktrace
java.lang.AssertionError: 
Erroneous result: java.util.ConcurrentModificationException
java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
java.util.ArrayList$Itr.next(ArrayList.java:859)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.getActiveStandbyInformation(ClusterStatusResource.java:107)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.lambda$getResponse$1(ClusterStatusResource.java:88)
java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Collections.java:1575)
java.util.concurrent.ConcurrentHashMap$EntrySpliterator.forEachRemaining(ConcurrentHashMap.java:3606)
java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Collections.java:1600)
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.getResponse(ClusterStatusResource.java:84)
io.confluent.ksql.rest.server.resources.ClusterStatusResource.checkClusterStatus(ClusterStatusResource.java:74)
sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
org.glassfish.jersey.internal.Errors.process(Errors.java:292)
org.glassfish.jersey.internal.Errors.process(Errors.java:274)
org.glassfish.jersey.internal.Errors.process(Errors.java:244)
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:386)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:561)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:502)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:439)
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1604)
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:545)
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607)
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297)
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1577)
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:767)
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
org.eclipse.jetty.server.Server.handle(Server.java:500)
org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:388)
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
java.lang.Thread.run(Thread.java:748)

	at io.confluent.ksql.rest.integration.HeartbeatAgentFunctionalTest.shouldMarkRemoteServerAsUpThenDownThenUp(HeartbeatAgentFunctionalTest.java:141)

@purplefox
Copy link
Contributor

By looking at the code it seems this is happening because internal state from KS is being exposed without copying first (kafkaStreams.allMetadata())
However this needs to be done under the StreamsMetaData state monitor(streamsMetadataState.getAllMetadata()), so I think this needs to be fixed in KS.

@big-andy-coates big-andy-coates added the test-stability All our flakey tests. label Feb 26, 2020
big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 5, 2020
fixes: confluentinc#4639

Until the Streams bug https://issues.apache.org/jira/browse/KAFKA-9668 is fixed, ksql needs to protect itself from ConcurrentMod exceptions when accessing `KafkaSteams.allMetadata`.

This change accesses the internals of `KafkaStreams` to acquire a reference to the field that needs to be synchronised to protect against the concurrent modification.
@big-andy-coates big-andy-coates mentioned this issue Mar 5, 2020
2 tasks
@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Mar 5, 2020

This is still happening in 5.5 branch and master.

see https://confluentinc.atlassian.net/browse/KSQL-4296 for an example

big-andy-coates added a commit to big-andy-coates/ksql that referenced this issue Mar 5, 2020
fixes: confluentinc#4639

Until the Streams bug https://issues.apache.org/jira/browse/KAFKA-9668 is fixed, ksql needs to protect itself from ConcurrentMod exceptions when accessing `KafkaSteams.allMetadata`.

This change accesses the internals of `KafkaStreams` to acquire a reference to the field that needs to be synchronised to protect against the concurrent modification.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug test-stability All our flakey tests.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants