From ca90a8480183bf287d07baf6b3794bc51f76b89e Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Fri, 6 Mar 2020 21:08:52 +0000 Subject: [PATCH] KAFKA-9668: Iterating over KafkaStreams.getAllMetadata() results in ConcurrentModificationException (#8233) `KafkaStreams.getAllMetadata()` returns `StreamsMetadataState.getAllMetadata()`. All the latter methods is `synchronized` it returns a reference to internal mutable state. Not only does this break encapsulation, but it means any thread iterating over the returned collection when the metadata gets rebuilt will encounter a `ConcurrentModificationException`. This change: * switches from clearing and rebuild `allMetadata` when `onChange` is called to building a new list and swapping this in. This is thread safe and has the benefit that the returned list is not empty during a rebuild: you either get the old or the new list. * removes synchronisation from `getAllMetadata` and `getLocalMetadata`. These are returning member variables. Synchronisation adds nothing. * changes `getAllMetadata` to wrap its return value in an unmodifiable wrapper to avoid breaking encapsulation. * changes the getters in `StreamsMetadata` to wrap their return values in unmodifiable wrapper to avoid breaking encapsulation. Co-authored-by: Andy Coates Reviewers: Guozhang Wang --- .../internals/StreamsMetadataState.java | 21 +++--- .../kafka/streams/state/StreamsMetadata.java | 8 +-- .../internals/StreamsMetadataStateTest.java | 26 ++++++++ .../streams/state/StreamsMetadataTest.java | 64 +++++++++++++++++++ 4 files changed, 106 insertions(+), 13 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index c74605923159a..154d795362a50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; @@ -47,9 +46,9 @@ public class StreamsMetadataState { public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1); private final InternalTopologyBuilder builder; - private final List allMetadata = new ArrayList<>(); private final Set globalStores; private final HostInfo thisHost; + private List allMetadata = Collections.emptyList(); private Cluster clusterMetadata; private StreamsMetadata localMetadata; @@ -79,7 +78,7 @@ public String toString(final String indent) { * * @return the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams} application */ - public synchronized StreamsMetadata getLocalMetadata() { + public StreamsMetadata getLocalMetadata() { return localMetadata; } @@ -89,8 +88,8 @@ public synchronized StreamsMetadata getLocalMetadata() { * * @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application */ - public synchronized Collection getAllMetadata() { - return allMetadata; + public Collection getAllMetadata() { + return Collections.unmodifiableList(allMetadata); } /** @@ -316,10 +315,12 @@ private Set getStoresOnHost(final Map> storeToSourc private void rebuildMetadata(final Map> activePartitionHostMap, final Map> standbyPartitionHostMap) { - allMetadata.clear(); if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) { + allMetadata = Collections.emptyList(); return; } + + final List rebuiltMetadata = new ArrayList<>(); final Map> storeToSourceTopics = builder.stateStoreNameToSourceTopics(); Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()) .distinct() @@ -344,11 +345,13 @@ private void rebuildMetadata(final Map> activePart activePartitionsOnHost, standbyStoresOnHost, standbyPartitionsOnHost); - allMetadata.add(metadata); + rebuiltMetadata.add(metadata); if (hostInfo.equals(thisHost)) { localMetadata = metadata; } }); + + allMetadata = rebuiltMetadata; } private KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, @@ -409,8 +412,8 @@ private StreamsMetadata getStreamsMetadataForKey(final String storeName, } private SourceTopicsInfo getSourceTopicsInfo(final String storeName) { - final List sourceTopics = builder.sourceTopicsForStore(storeName).stream().collect(Collectors.toList()); - if (sourceTopics == null || sourceTopics.isEmpty()) { + final List sourceTopics = new ArrayList<>(builder.sourceTopicsForStore(storeName)); + if (sourceTopics.isEmpty()) { return null; } return new SourceTopicsInfo(sourceTopics); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java index 50c2d6837fd2d..b1d3f4aa13e62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java @@ -80,7 +80,7 @@ public HostInfo hostInfo() { * @return set of active state store names */ public Set stateStoreNames() { - return stateStoreNames; + return Collections.unmodifiableSet(stateStoreNames); } /** @@ -89,7 +89,7 @@ public Set stateStoreNames() { * @return set of active topic partitions */ public Set topicPartitions() { - return topicPartitions; + return Collections.unmodifiableSet(topicPartitions); } /** @@ -98,7 +98,7 @@ public Set topicPartitions() { * @return set of standby topic partitions */ public Set standbyTopicPartitions() { - return standbyTopicPartitions; + return Collections.unmodifiableSet(standbyTopicPartitions); } /** @@ -107,7 +107,7 @@ public Set standbyTopicPartitions() { * @return set of standby state store names */ public Set standbyStateStoreNames() { - return standbyStateStoreNames; + return Collections.unmodifiableSet(standbyStateStoreNames); } public String host() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index e1656229c01b9..d38b6c7064d27 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -49,6 +50,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; public class StreamsMetadataStateTest { @@ -339,4 +341,28 @@ public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() { streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, cluster); assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner)); } + + @Test + public void shouldReturnAllMetadataThatRemainsValidAfterChange() { + final Collection allMetadata = metadataState.getAllMetadata(); + final Collection copy = new ArrayList<>(allMetadata); + assertFalse("invalid test", allMetadata.isEmpty()); + metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), cluster); + assertEquals("encapsulation broken", allMetadata, copy); + } + + @Test + public void shouldNotReturnMutableReferenceToInternalAllMetadataCollection() { + final Collection allMetadata = metadataState.getAllMetadata(); + assertFalse("invalid test", allMetadata.isEmpty()); + + try { + // Either this should not affect internal state of 'metadataState' + allMetadata.clear(); + } catch (final UnsupportedOperationException e) { + // Or should fail. + } + + assertFalse("encapsulation broken", metadataState.getAllMetadata().isEmpty()); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java new file mode 100644 index 0000000000000..98022bbdbaa87 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.Assert.assertTrue; + +public class StreamsMetadataTest { + + private static final HostInfo HOST_INFO = new HostInfo("local", 12); + private static final TopicPartition TP_0 = new TopicPartition("t", 0); + private static final TopicPartition TP_1 = new TopicPartition("t", 1); + + private StreamsMetadata streamsMetadata; + + @Before + public void setUp() { + streamsMetadata = new StreamsMetadata( + HOST_INFO, + mkSet("store1", "store2"), + mkSet(TP_0, TP_1), + mkSet("store2"), + mkSet(TP_1) + ); + } + + @Test + public void shouldNotAllowModificationOfInternalStateViaGetters() { + assertTrue(isUnmodifiable(streamsMetadata.stateStoreNames())); + assertTrue(isUnmodifiable(streamsMetadata.topicPartitions())); + assertTrue(isUnmodifiable(streamsMetadata.standbyTopicPartitions())); + assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames())); + } + + private static boolean isUnmodifiable(final Collection collection) { + try { + collection.clear(); + return false; + } catch (final UnsupportedOperationException e) { + return true; + } + } +} \ No newline at end of file