Skip to content

Commit

Permalink
fix: address review feedback from Andy and Jim
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Jan 21, 2020
1 parent 583f611 commit 11f97a1
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 98 deletions.
12 changes: 6 additions & 6 deletions docs-md/developer-guide/ksqldb-reference/show-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ configured to connect to (default setting for `bootstrap.servers`:
and their active consumer counts.

SHOW TOPICS does not display topics considered internal, such as:
* KSQL internal topics (i.e. the KSQL command topic)
* Topics found in the `system.internal.topics` configuration
* KSQL internal topics, like the KSQL command topic
* Topics found in the `ksql.internal.hidden.topics` configuration

SHOW ALL TOPICS will list all topics including those considered as
internal or found in the `system.internal.topics` configuration.
SHOW ALL TOPICS lists all topics, including those considered to be
internal or found in the `ksql.internal.hidden.topics` configuration.

Example
-------

```sql
ksql> SHOW TOPICS
ksql> SHOW TOPICS;

Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------
Expand All @@ -47,7 +47,7 @@ ksql> SHOW TOPICS


```sql
ksql> SHOW ALL TOPICS
ksql> SHOW ALL TOPICS;

Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------
Expand Down
38 changes: 26 additions & 12 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,23 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_AUTH_CACHE_MAX_ENTRIES_DOC = "Controls the size of the cache "
+ "to a maximum number of KSQL authorization responses entries.";

public static final String SYSTEM_INTERNAL_TOPICS_CONFIG = "system.internal.topics";
public static final String SYSTEM_INTERNAL_TOPICS_DEFAULT = "_confluent.*,__confluent.*,_schemas,"
+ "__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,"
+ "connect-statuses";
public static final String SYSTEM_INTERNAL_TOPICS_DOC = "List of topics considered part of "
+ "system internals which KSQL should not allow users to write data on them. This list will "
+ "not be displayed from the SHOW TOPICS command unless SHOW ALL TOPICS is used. The list "
+ "is separated by comma and may use regular expressions based on Java Patterns "
+ "(i.e. _confluent.* accepts any topic that starts with _confluent prefix).";
public static final String KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG = "ksql.internal.hidden.topics";
public static final String KSQL_INTERNAL_HIDDEN_TOPICS_DEFAULT = "_confluent.*,__confluent.*"
+ ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,"
+ "connect-status,connect-statuses";
public static final String KSQL_INTERNAL_HIDDEN_TOPICS_DOC = "List of topics that will not be "
+ "visible when running the SHOW TOPICS command unless SHOW ALL TOPICS is used. This list "
+ "is comma separated and may use Java regular expressions to specify each topic (i.e. "
+ " _confluent.* accepts any topic that starts with the _confluent prefix).";

public static final String KSQL_INTERNAL_READONLY_TOPICS_CONFIG = "ksql.internal.readonly.topics";
public static final String KSQL_INTERNAL_READONLY_TOPICS_DEFAULT = "_confluent.*,__confluent.*"
+ ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,"
+ "connect-status,connect-statuses";
public static final String KSQL_INTERNAL_READONLY_TOPICS_DOC = "List of topics that KSQL will "
+ " handle as read-only. These topics cannot be modified by any KSQL command. This list "
+ "is comma separated and may use Java regular expressions to specify each topic (i.e. "
+ " _confluent.* accepts any topic that starts with the _confluent prefix).";

private enum ConfigGeneration {
LEGACY,
Expand Down Expand Up @@ -543,11 +551,17 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_AUTH_CACHE_MAX_ENTRIES_DOC
).define(
SYSTEM_INTERNAL_TOPICS_CONFIG,
KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG,
Type.LIST,
SYSTEM_INTERNAL_TOPICS_DEFAULT,
KSQL_INTERNAL_HIDDEN_TOPICS_DEFAULT,
Importance.LOW,
SYSTEM_INTERNAL_TOPICS_DOC
KSQL_INTERNAL_HIDDEN_TOPICS_DOC
).define(
KSQL_INTERNAL_READONLY_TOPICS_CONFIG,
Type.LIST,
KSQL_INTERNAL_READONLY_TOPICS_DEFAULT,
Importance.LOW,
KSQL_INTERNAL_READONLY_TOPICS_DOC
)
.withClientSslSupport();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package io.confluent.ksql.util;

import com.google.common.collect.Streams;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ReservedInternalTopics {
private static final Logger LOG = LoggerFactory.getLogger(ReservedInternalTopics.class);
Expand Down Expand Up @@ -97,33 +98,56 @@ private static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final Str
);
}

private final List<Pattern> systemInternalTopics;
private final Pattern hiddenTopicsPattern;
private final Pattern readOnlyTopicsPattern;

public ReservedInternalTopics(final KsqlConfig ksqlConfig) {
final ProcessingLogConfig processingLogConfig =
new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated());

try {
this.hiddenTopicsPattern = Pattern.compile(
Streams.concat(
Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"),
ksqlConfig.getList(KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG).stream()
).collect(Collectors.joining("|"))
);
} catch (final Exception e) {
final String message = "Invalid pattern list in '"
+ KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG + "'";

LOG.error(message + ": " + e.getMessage());
throw new KsqlException(message, e);
}

try {
this.systemInternalTopics = ksqlConfig.getList(KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG)
.stream()
.map(Pattern::compile)
.collect(Collectors.toList());
this.readOnlyTopicsPattern = Pattern.compile(
Streams.concat(
Stream.of(processingLogTopic(processingLogConfig, ksqlConfig)),
Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"),
ksqlConfig.getList(KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG).stream()
).collect(Collectors.joining("|"))
);
} catch (final Exception e) {
final String message = "Cannot get a list of system internal topics due to an invalid " +
"configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'";
final String message = "Invalid pattern list in '"
+ KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG + "'";

LOG.error(message + ": " + e.getMessage());
throw new KsqlException(message, e);
}
}

public Set<String> filterInternalTopics(final Set<String> topicNames) {
public Set<String> removeHiddenTopics(final Set<String> topicNames) {
return topicNames.stream()
.filter(t -> !isInternalTopic(t))
.filter(t -> !isHidden(t))
.collect(Collectors.toSet());
}

public boolean isInternalTopic(final String topicName) {
return topicName.startsWith(KSQL_INTERNAL_TOPIC_PREFIX) || systemInternalTopics.stream()
.filter(p -> p.matcher(topicName).matches())
.findAny()
.isPresent();
public boolean isHidden(final String topicName) {
return hiddenTopicsPattern.matcher(topicName).matches();
}

public boolean isReadOnly(final String topicName) {
return readOnlyTopicsPattern.matcher(topicName).matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import java.util.List;
import java.util.Set;
import java.util.regex.PatternSyntaxException;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -32,6 +32,8 @@
import static org.junit.Assert.assertThat;

public class ReservedInternalTopicsTest {
private static final String KSQL_PROCESSING_LOG_TOPIC = "default_ksql_processing_log";

@Rule
public final ExpectedException expectedException = ExpectedException.none();

Expand All @@ -41,86 +43,129 @@ public class ReservedInternalTopicsTest {
@Before
public void setUp() {
ksqlConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "prefix_.*,literal,.*_suffix"
KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG, "prefix_.*,literal,.*_suffix",
KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG, "ro_prefix_.*,ro_literal,.*_suffix_ro"
));

internalTopics = new ReservedInternalTopics(ksqlConfig);
}


@Test
public void shouldReturnTrueOnAllInternalTopics() {
public void shouldReturnTrueOnAllHiddenTopics() {
// Given
final List<String> topicNames = ImmutableList.of(
ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test",
"prefix_", "_suffix", "prefix_topic", "topic_suffix", "literal"
);

topicNames.forEach(topic -> {
// When
final boolean isReserved = internalTopics.isInternalTopic(topic);
final boolean isHidden = internalTopics.isHidden(topic);

// Then
assertThat("Should return true on internal topic: " + topic,
isReserved, is(true));
assertThat("Should return true on hidden topic: " + topic,
isHidden, is(true));
});
}

@Test
public void shouldReturnFalseOnNonInternalTopics() {
public void shouldReturnTrueOnAllReadOnlyTopics() {
// Given
final List<String> topicNames = ImmutableList.of(
KSQL_PROCESSING_LOG_TOPIC,
ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test",
"ro_prefix_", "_suffix_ro", "ro_prefix_topic", "topic_suffix_ro", "ro_literal"
);

topicNames.forEach(topic -> {
// When
final boolean isReadOnly = internalTopics.isReadOnly(topic);

// Then
assertThat("Should return true on read-only topic: " + topic,
isReadOnly, is(true));
});
}

@Test
public void shouldReturnFalseOnNonHiddenTopics() {
// Given
final List<String> topicNames = ImmutableList.of(
KSQL_PROCESSING_LOG_TOPIC,
"topic_prefix_", "_suffix_topic"
);

// Given
topicNames.forEach(topic -> {
// When
final boolean isReserved = internalTopics.isInternalTopic(topic);
final boolean isHidden = internalTopics.isHidden(topic);

// Then
assertThat("Should return false on non-internal topic: " + topic,
isReserved, is(false));
assertThat("Should return false on non-hidden topic: " + topic,
isHidden, is(false));
});
}

@Test
public void shouldReturnTrueOnKsqlInternalTopics() {
public void shouldReturnFalseOnNonReadOnlyTopics() {
// Given
final String ksqlInternalTopic = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test";
final List<String> topicNames = ImmutableList.of(
"topic_prefix_", "_suffix_topic"
);

// When
final boolean isReserved =
internalTopics.isInternalTopic(ksqlInternalTopic);
// Given
topicNames.forEach(topic -> {
// When
final boolean isReadOnly = internalTopics.isReadOnly(topic);

// Then
assertThat(isReserved, is(true));
// Then
assertThat("Should return false on non read-only topic: " + topic,
isReadOnly, is(false));
});
}

@Test
public void shouldFilterAllInternalTopics() {
public void shouldRemoveAllHiddenTopics() {
// Given
final Set<String> topics = ImmutableSet.of(
"prefix_name", "literal", "tt", "name1", "suffix", "p_suffix"
);

// When
final Set<String> filteredTopics = internalTopics.filterInternalTopics(topics);
final Set<String> filteredTopics = internalTopics.removeHiddenTopics(topics);

// Then
assertThat(filteredTopics, is(ImmutableSet.of("tt", "name1", "suffix")));
}

@Test
public void shouldThrowWhenInvalidSystemTopicsListIsUsed() {
public void shouldThrowWhenInvalidInternalHiddenTopicsListIsUsed() {
// Given
final KsqlConfig givenConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG, "*_suffix"
));

// Then
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Invalid pattern list in '"
+ KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG + "'");

// When
new ReservedInternalTopics(givenConfig);
}

@Test
public void shouldThrowWhenInvalidInternalReadOnlyTopicsListIsUsed() {
// Given
final KsqlConfig givenConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "*_suffix"
KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG, "*_suffix"
));

// Then
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Cannot get a list of system internal topics due to" +
" an invalid configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'");
expectedException.expectMessage("Invalid pattern list in '"
+ KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG + "'");

// When
new ReservedInternalTopics(givenConfig);
Expand All @@ -132,7 +177,7 @@ public void shouldReturnCommandTopic() {
final String commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig);

// Then
assertThat("_confluent-ksql-default__command_topic", is(commandTopic));
assertThat(commandTopic, is("_confluent-ksql-default__command_topic"));
}

@Test
Expand All @@ -141,6 +186,17 @@ public void shouldReturnConfigsTopic() {
final String commandTopic = ReservedInternalTopics.configsTopic(ksqlConfig);

// Then
assertThat("_confluent-ksql-default__configs", is(commandTopic));
assertThat(commandTopic, is("_confluent-ksql-default__configs"));
}

@Test
public void shouldReturnProcessingLogTopic() {
// Given/When
final ProcessingLogConfig processingLogConfig = new ProcessingLogConfig(ImmutableMap.of());
final String processingLogTopic = ReservedInternalTopics.processingLogTopic(
processingLogConfig, ksqlConfig);

// Then
assertThat(processingLogTopic, is("default_ksql_processing_log"));
}
}
Loading

0 comments on commit 11f97a1

Please sign in to comment.