Skip to content

Commit

Permalink
feat: hide internal/system topics from SHOW TOPICS (#4322)
Browse files Browse the repository at this point in the history
Hide all KSQL, Confluent, Kafka, SR and Connect internal or system topics from the SHOW TOPICS command and prevents writing on streams/tables that are based on an internal/system topic.

Also, it adds the SHOW ALL TOPICS syntax in case users want to list all topics including internal/system topics.
  • Loading branch information
spena authored Feb 3, 2020
1 parent 2335e76 commit 075fed3
Show file tree
Hide file tree
Showing 38 changed files with 769 additions and 174 deletions.
33 changes: 30 additions & 3 deletions docs-md/developer-guide/ksqldb-reference/show-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Synopsis
--------

```sql
SHOW | LIST TOPICS [EXTENDED];
SHOW | LIST [ALL] TOPICS [EXTENDED];
```

Description
Expand All @@ -24,10 +24,37 @@ configured to connect to (default setting for `bootstrap.servers`:
`localhost:9092`). SHOW TOPICS EXTENDED also displays consumer groups
and their active consumer counts.

`SHOW TOPICS` does not display hidden topics by default, such as:
* KSQL internal topics, like the KSQL command topic or changelog & repartition topics, or
topics that match any pattern in the `ksql.hidden.topics` configuration.

`SHOW ALL TOPICS` lists all topics, including hidden topics.

Example
-------

TODO: example
```sql
ksql> SHOW TOPICS;

Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------------------------------------------------
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
--------------------------------------------------------------------------------------------------------
```


Page last revised on: {{ git_revision_date }}
```sql
ksql> SHOW ALL TOPICS;

Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------------------------------------------------
_confluent-ksql-default__command_topic | 1 | 1
_confluent-ksql-default_query_CTAS_USERS_0-Aggregate-Aggregate-Materialize-changelog | 1 | 1
_confluent-ksql-default_query_CTAS_USERS_0-Aggregate-GroupBy-repartition | 1 | 1
default_ksql_processing_log | 1 | 1
pageviews | 1 | 1
users | 1 | 1
--------------------------------------------------------------------------------------------------------
```
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public final class ImmutableProperties {
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)
.add(KsqlConfig.KSQL_HIDDEN_TOPICS_CONFIG)
.add(KsqlConfig.KSQL_READONLY_TOPICS_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -92,6 +93,35 @@ public static Validator validUrl() {
};
}

public static Validator validRegex() {
return (name, val) -> {
if (!(val instanceof List)) {
throw new IllegalArgumentException("validator should only be used with "
+ "LIST of STRING defs");
}

final StringBuilder regexBuilder = new StringBuilder();
for (Object item : (List)val) {
if (!(item instanceof String)) {
throw new IllegalArgumentException("validator should only be used with "
+ "LIST of STRING defs");
}

if (regexBuilder.length() > 0) {
regexBuilder.append("|");
}

regexBuilder.append((String)item);
}

try {
Pattern.compile(regexBuilder.toString());
} catch (final Exception e) {
throw new ConfigException(name, val, "Not valid regular expression: " + e.getMessage());
}
};
}

public static final class ValidCaseInsensitiveString implements Validator {

private final List<String> validStrings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -125,6 +127,10 @@ private static String propertyName(final String name) {
INCLUDE_ROWS_DOC
);

public static Set<String> configNames() {
return CONFIG_DEF.names();
}

public ProcessingLogConfig(final Map<?, ?> properties) {
super(CONFIG_DEF, properties);
}
Expand Down
50 changes: 50 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.config.ConfigItem;
import io.confluent.ksql.config.KsqlConfigResolver;
import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.model.SemanticVersion;
import io.confluent.ksql.testing.EffectivelyImmutable;
import java.util.Collection;
Expand Down Expand Up @@ -202,6 +204,36 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_AUTH_CACHE_MAX_ENTRIES_DOC = "Controls the size of the cache "
+ "to a maximum number of KSQL authorization responses entries.";

public static final String KSQL_HIDDEN_TOPICS_CONFIG = "ksql.hidden.topics";
public static final String KSQL_HIDDEN_TOPICS_DEFAULT = "_confluent.*,__confluent.*"
+ ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,"
+ "connect-status,connect-statuses";
public static final String KSQL_HIDDEN_TOPICS_DOC = "Comma-separated list of topics that will "
+ "be hidden. Entries in the list may be literal topic names or "
+ "[Java regular expressions](https://docs.oracle.com/javase/8/docs/api/java/util/regex/"
+ "Pattern.html). "
+ "For example, `_confluent.*` will match any topic whose name starts with the `_confluent`)."
+ "\nHidden topics will not visible when running the `SHOW TOPICS` command unless "
+ "`SHOW ALL TOPICS` is used."
+ "\nThe default value hides known system topics from Kafka and Confluent products."
+ "\nKSQL also marks its own internal topics as hidden. This is not controlled by this "
+ "config.";

public static final String KSQL_READONLY_TOPICS_CONFIG = "ksql.readonly.topics";
public static final String KSQL_READONLY_TOPICS_DEFAULT = "_confluent.*,__confluent.*"
+ ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,"
+ "connect-status,connect-statuses";
public static final String KSQL_READONLY_TOPICS_DOC = "Comma-separated list of topics that "
+ "should be marked as read-only. Entries in the list may be literal topic names or "
+ "[Java regular expressions](https://docs.oracle.com/javase/8/docs/api/java/util/regex/"
+ "Pattern.html). "
+ "For example, `_confluent.*` will match any topic whose name starts with the `_confluent`)."
+ "\nRead-only topics cannot be modified by any KSQL command."
+ "\nThe default value marks known system topics from Kafka and Confluent products as "
+ "read-only."
+ "\nKSQL also marks its own internal topics as read-only. This is not controlled by this "
+ "config.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -532,6 +564,20 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_AUTH_CACHE_MAX_ENTRIES_DEFAULT,
Importance.LOW,
KSQL_AUTH_CACHE_MAX_ENTRIES_DOC
).define(
KSQL_HIDDEN_TOPICS_CONFIG,
Type.LIST,
KSQL_HIDDEN_TOPICS_DEFAULT,
ConfigValidators.validRegex(),
Importance.LOW,
KSQL_HIDDEN_TOPICS_DOC
).define(
KSQL_READONLY_TOPICS_CONFIG,
Type.LIST,
KSQL_READONLY_TOPICS_DEFAULT,
ConfigValidators.validRegex(),
Importance.LOW,
KSQL_READONLY_TOPICS_DOC
)
.withClientSslSupport();

Expand Down Expand Up @@ -669,6 +715,10 @@ public Map<String, Object> getProducerClientConfigProps() {
return getConfigsFor(ProducerConfig.configNames());
}

public Map<String, Object> getProcessingLogConfigProps() {
return getConfigsFor(ProcessingLogConfig.configNames());
}

private Map<String, Object> getConfigsFor(final Set<String> configs) {
final Map<String, Object> props = new HashMap<>();
ksqlStreamConfigProps.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ private KsqlConstants() {

public static final String CONFLUENT_AUTHOR = "Confluent";

public static final String KSQL_INTERNAL_TOPIC_PREFIX = "_confluent-ksql-";
public static final String CONFLUENT_INTERNAL_TOPIC_PREFIX = "__confluent";

public static final String STREAMS_CHANGELOG_TOPIC_SUFFIX = "-changelog";
public static final String STREAMS_REPARTITION_TOPIC_SUFFIX = "-repartition";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import com.google.common.collect.Streams;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;

import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ReservedInternalTopics {
private static final Logger LOG = LoggerFactory.getLogger(ReservedInternalTopics.class);

// These constant should not be part of KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG because they're
// not configurable.
public static final String KSQL_INTERNAL_TOPIC_PREFIX = "_confluent-ksql-";
public static final String KSQL_COMMAND_TOPIC_SUFFIX = "command_topic";
public static final String KSQL_CONFIGS_TOPIC_SUFFIX = "configs";

/**
* Returns the internal KSQL command topic.
*
* @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix.
* @return The command topic name.
*/
public static String commandTopic(final KsqlConfig ksqlConfig) {
return toKsqlInternalTopic(ksqlConfig, KSQL_COMMAND_TOPIC_SUFFIX);
}

/**
* Returns the internal KSQL configs topic (used for KSQL standalone)
*
* @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix.
* @return The configurations topic name.
*/
public static String configsTopic(final KsqlConfig ksqlConfig) {
return toKsqlInternalTopic(ksqlConfig, KSQL_CONFIGS_TOPIC_SUFFIX);
}

/**
* Returns the KSQL processing log topic.
* <p/>
* This is not an internal topic in the sense that users are intentionally meant to read from
* this topic to identify deserialization and other processing errors, define a KSQL stream on
* it, and potentially issue queries to filter from it, etc. This is why it is not prefixed in
* the way KSQL internal topics are.
*
* @param config The Processing log config, which is used to extract the processing topic suffix
* @param ksqlConfig The KSQL config, which is used to extract the KSQL service id.
* @return The processing log topic name.
*/
public static String processingLogTopic(
final ProcessingLogConfig config,
final KsqlConfig ksqlConfig
) {
final String topicNameConfig = config.getString(ProcessingLogConfig.TOPIC_NAME);
if (topicNameConfig.equals(ProcessingLogConfig.TOPIC_NAME_NOT_SET)) {
return String.format(
"%s%s",
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ProcessingLogConfig.TOPIC_NAME_DEFAULT_SUFFIX
);
} else {
return topicNameConfig;
}
}

/**
* Compute a name for a KSQL internal topic.
*
* @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix.
* @param topicSuffix A suffix that is appended to the topic name.
* @return The computed topic name.
*/
private static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final String topicSuffix) {
return String.format(
"%s%s_%s",
KSQL_INTERNAL_TOPIC_PREFIX,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
topicSuffix
);
}

private final Pattern hiddenTopicsPattern;
private final Pattern readOnlyTopicsPattern;

public ReservedInternalTopics(final KsqlConfig ksqlConfig) {
final ProcessingLogConfig processingLogConfig =
new ProcessingLogConfig(ksqlConfig.getProcessingLogConfigProps());

this.hiddenTopicsPattern = Pattern.compile(
Streams.concat(
Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"),
ksqlConfig.getList(KsqlConfig.KSQL_HIDDEN_TOPICS_CONFIG).stream()
).collect(Collectors.joining("|"))
);

this.readOnlyTopicsPattern = Pattern.compile(
Streams.concat(
Stream.of(processingLogTopic(processingLogConfig, ksqlConfig)),
Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"),
ksqlConfig.getList(KsqlConfig.KSQL_READONLY_TOPICS_CONFIG).stream()
).collect(Collectors.joining("|"))
);
}

public Set<String> removeHiddenTopics(final Set<String> topicNames) {
return topicNames.stream()
.filter(t -> !isHidden(t))
.collect(Collectors.toSet());
}

public boolean isHidden(final String topicName) {
return hiddenTopicsPattern.matcher(topicName).matches();
}

public boolean isReadOnly(final String topicName) {
return readOnlyTopicsPattern.matcher(topicName).matches();
}
}
Loading

0 comments on commit 075fed3

Please sign in to comment.