Skip to content

Commit

Permalink
KAFKA-9668: Iterating over KafkaStreams.getAllMetadata() results in C…
Browse files Browse the repository at this point in the history
…oncurrentModificationException (#8233)

`KafkaStreams.getAllMetadata()` returns `StreamsMetadataState.getAllMetadata()`. All the latter methods is `synchronized` it returns a reference to internal mutable state.  Not only does this break encapsulation, but it means any thread iterating over the returned collection when the metadata gets rebuilt will encounter a `ConcurrentModificationException`.

This change:
 * switches from clearing and rebuild `allMetadata` when `onChange` is called to building a new list and swapping this in. This is thread safe and has the benefit that the returned list is not empty during a rebuild: you either get the old or the new list.
 * removes synchronisation from `getAllMetadata` and `getLocalMetadata`. These are returning member variables. Synchronisation adds nothing.
 * changes `getAllMetadata` to wrap its return value in an unmodifiable wrapper to avoid breaking encapsulation.
 * changes the getters in `StreamsMetadata` to wrap their return values in unmodifiable wrapper to avoid breaking encapsulation.

Co-authored-by: Andy Coates <[email protected]>

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
big-andy-coates authored Mar 6, 2020
1 parent 6b41993 commit ca90a84
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -47,9 +46,9 @@
public class StreamsMetadataState {
public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1);
private final InternalTopologyBuilder builder;
private final List<StreamsMetadata> allMetadata = new ArrayList<>();
private final Set<String> globalStores;
private final HostInfo thisHost;
private List<StreamsMetadata> allMetadata = Collections.emptyList();
private Cluster clusterMetadata;
private StreamsMetadata localMetadata;

Expand Down Expand Up @@ -79,7 +78,7 @@ public String toString(final String indent) {
*
* @return the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams} application
*/
public synchronized StreamsMetadata getLocalMetadata() {
public StreamsMetadata getLocalMetadata() {
return localMetadata;
}

Expand All @@ -89,8 +88,8 @@ public synchronized StreamsMetadata getLocalMetadata() {
*
* @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
*/
public synchronized Collection<StreamsMetadata> getAllMetadata() {
return allMetadata;
public Collection<StreamsMetadata> getAllMetadata() {
return Collections.unmodifiableList(allMetadata);
}

/**
Expand Down Expand Up @@ -316,10 +315,12 @@ private Set<String> getStoresOnHost(final Map<String, List<String>> storeToSourc

private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
allMetadata.clear();
if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
allMetadata = Collections.emptyList();
return;
}

final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
final Map<String, List<String>> storeToSourceTopics = builder.stateStoreNameToSourceTopics();
Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream())
.distinct()
Expand All @@ -344,11 +345,13 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart
activePartitionsOnHost,
standbyStoresOnHost,
standbyPartitionsOnHost);
allMetadata.add(metadata);
rebuiltMetadata.add(metadata);
if (hostInfo.equals(thisHost)) {
localMetadata = metadata;
}
});

allMetadata = rebuiltMetadata;
}

private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
Expand Down Expand Up @@ -409,8 +412,8 @@ private <K> StreamsMetadata getStreamsMetadataForKey(final String storeName,
}

private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
final List<String> sourceTopics = builder.sourceTopicsForStore(storeName).stream().collect(Collectors.toList());
if (sourceTopics == null || sourceTopics.isEmpty()) {
final List<String> sourceTopics = new ArrayList<>(builder.sourceTopicsForStore(storeName));
if (sourceTopics.isEmpty()) {
return null;
}
return new SourceTopicsInfo(sourceTopics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public HostInfo hostInfo() {
* @return set of active state store names
*/
public Set<String> stateStoreNames() {
return stateStoreNames;
return Collections.unmodifiableSet(stateStoreNames);
}

/**
Expand All @@ -89,7 +89,7 @@ public Set<String> stateStoreNames() {
* @return set of active topic partitions
*/
public Set<TopicPartition> topicPartitions() {
return topicPartitions;
return Collections.unmodifiableSet(topicPartitions);
}

/**
Expand All @@ -98,7 +98,7 @@ public Set<TopicPartition> topicPartitions() {
* @return set of standby topic partitions
*/
public Set<TopicPartition> standbyTopicPartitions() {
return standbyTopicPartitions;
return Collections.unmodifiableSet(standbyTopicPartitions);
}

/**
Expand All @@ -107,7 +107,7 @@ public Set<TopicPartition> standbyTopicPartitions() {
* @return set of standby state store names
*/
public Set<String> standbyStateStoreNames() {
return standbyStateStoreNames;
return Collections.unmodifiableSet(standbyStateStoreNames);
}

public String host() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -49,6 +50,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

public class StreamsMetadataStateTest {

Expand Down Expand Up @@ -339,4 +341,28 @@ public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() {
streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, cluster);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner));
}

@Test
public void shouldReturnAllMetadataThatRemainsValidAfterChange() {
final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
final Collection<StreamsMetadata> copy = new ArrayList<>(allMetadata);
assertFalse("invalid test", allMetadata.isEmpty());
metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), cluster);
assertEquals("encapsulation broken", allMetadata, copy);
}

@Test
public void shouldNotReturnMutableReferenceToInternalAllMetadataCollection() {
final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
assertFalse("invalid test", allMetadata.isEmpty());

try {
// Either this should not affect internal state of 'metadataState'
allMetadata.clear();
} catch (final UnsupportedOperationException e) {
// Or should fail.
}

assertFalse("encapsulation broken", metadataState.getAllMetadata().isEmpty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.state;

import org.apache.kafka.common.TopicPartition;
import org.junit.Before;
import org.junit.Test;

import java.util.Collection;

import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.Assert.assertTrue;

public class StreamsMetadataTest {

private static final HostInfo HOST_INFO = new HostInfo("local", 12);
private static final TopicPartition TP_0 = new TopicPartition("t", 0);
private static final TopicPartition TP_1 = new TopicPartition("t", 1);

private StreamsMetadata streamsMetadata;

@Before
public void setUp() {
streamsMetadata = new StreamsMetadata(
HOST_INFO,
mkSet("store1", "store2"),
mkSet(TP_0, TP_1),
mkSet("store2"),
mkSet(TP_1)
);
}

@Test
public void shouldNotAllowModificationOfInternalStateViaGetters() {
assertTrue(isUnmodifiable(streamsMetadata.stateStoreNames()));
assertTrue(isUnmodifiable(streamsMetadata.topicPartitions()));
assertTrue(isUnmodifiable(streamsMetadata.standbyTopicPartitions()));
assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
}

private static boolean isUnmodifiable(final Collection<?> collection) {
try {
collection.clear();
return false;
} catch (final UnsupportedOperationException e) {
return true;
}
}
}

0 comments on commit ca90a84

Please sign in to comment.