Skip to content

Commit

Permalink
fix: patch KafkaStreamsInternalTopicsAccessor as KS internals changed (
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Feb 25, 2020
1 parent e40d6c9 commit eb07370
Showing 1 changed file with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
package io.confluent.ksql.test.tools;

import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;

/**
* Hack to get around the fact that the {@link TopologyTestDriver} class does not expose its set of
* internal topics, which is needed to determine if any unexpected topics have been created.
*
* <p>Note: We should find a better way of doing this - this approach is very brittle
*/
final class KafkaStreamsInternalTopicsAccessor {

private static final Field INTERNAL_TOPICS_FIELD = getInternalTopicsField();
private static final Field INTERNAL_TOPOLOGY_BUILDER_FIELD = getInternalTopologyBuilderField();
private static final Field INTERNAL_TOPIC_NAMES_FIELD = getInternalTopicNamesField();

private KafkaStreamsInternalTopicsAccessor() {
}
Expand All @@ -35,20 +40,32 @@ static Set<String> getInternalTopics(
final TopologyTestDriver topologyTestDriver
) {
try {
return (Set<String>) INTERNAL_TOPICS_FIELD.get(topologyTestDriver);
final Object internalTopologyBuilder = INTERNAL_TOPOLOGY_BUILDER_FIELD
.get(topologyTestDriver);
// Note - there is no memory barrier here so we could end up reading stale data if
// the internal topics are updated
return new HashSet<>((Set<String>) INTERNAL_TOPIC_NAMES_FIELD.get(internalTopologyBuilder));
} catch (final IllegalAccessException e) {
throw new AssertionError("Failed to get internal topic names", e);
}
}

private static Field getInternalTopicsField() {
private static Field getInternalTopologyBuilderField() {
return getField("internalTopologyBuilder", TopologyTestDriver.class);
}

private static Field getInternalTopicNamesField() {
return getField("internalTopicNames", InternalTopologyBuilder.class);
}

private static Field getField(final String fieldName, final Class<?> clazz) {
try {
final Field field = TopologyTestDriver.class.getDeclaredField("internalTopics");
final Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (final NoSuchFieldException e) {
throw new AssertionError(
"Kafka Streams's TopologyTestDriver class has changed its internals", e);
"Kafka Streams's has changed its internals", e);
}
}
}

0 comments on commit eb07370

Please sign in to comment.