From 11f97a145ca52f00983b49c57195ceaf6b142d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Tue, 21 Jan 2020 16:57:55 -0600 Subject: [PATCH] fix: address review feedback from Andy and Jim --- .../ksqldb-reference/show-topics.md | 12 +- .../io/confluent/ksql/util/KsqlConfig.java | 38 +++++-- .../ksql/util/ReservedInternalTopics.java | 60 +++++++--- .../ksql/util/ReservedInternalTopicsTest.java | 106 +++++++++++++----- .../ksql/engine/InsertValuesExecutor.java | 14 +-- .../ksql/engine/InsertValuesExecutorTest.java | 4 +- .../services/KafkaTopicClientImplTest.java | 5 +- .../computation/DistributingExecutor.java | 26 ++--- .../server/execution/ListTopicsExecutor.java | 2 +- .../rest/server/resources/KsqlResource.java | 1 + .../computation/DistributingExecutorTest.java | 7 +- 11 files changed, 177 insertions(+), 98 deletions(-) diff --git a/docs-md/developer-guide/ksqldb-reference/show-topics.md b/docs-md/developer-guide/ksqldb-reference/show-topics.md index 44e7ba02309..ee7494248b8 100644 --- a/docs-md/developer-guide/ksqldb-reference/show-topics.md +++ b/docs-md/developer-guide/ksqldb-reference/show-topics.md @@ -25,17 +25,17 @@ configured to connect to (default setting for `bootstrap.servers`: and their active consumer counts. SHOW TOPICS does not display topics considered internal, such as: -* KSQL internal topics (i.e. the KSQL command topic) -* Topics found in the `system.internal.topics` configuration +* KSQL internal topics, like the KSQL command topic +* Topics found in the `ksql.internal.hidden.topics` configuration -SHOW ALL TOPICS will list all topics including those considered as -internal or found in the `system.internal.topics` configuration. +SHOW ALL TOPICS lists all topics, including those considered to be +internal or found in the `ksql.internal.hidden.topics` configuration. Example ------- ```sql -ksql> SHOW TOPICS +ksql> SHOW TOPICS; Kafka Topic | Partitions | Partition Replicas ------------------------------------------------------------------------- @@ -47,7 +47,7 @@ ksql> SHOW TOPICS ```sql -ksql> SHOW ALL TOPICS +ksql> SHOW ALL TOPICS; Kafka Topic | Partitions | Partition Replicas ------------------------------------------------------------------------- diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index c2bd4ba2039..09ad44cc7e0 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -202,15 +202,23 @@ public class KsqlConfig extends AbstractConfig { public static final String KSQL_AUTH_CACHE_MAX_ENTRIES_DOC = "Controls the size of the cache " + "to a maximum number of KSQL authorization responses entries."; - public static final String SYSTEM_INTERNAL_TOPICS_CONFIG = "system.internal.topics"; - public static final String SYSTEM_INTERNAL_TOPICS_DEFAULT = "_confluent.*,__confluent.*,_schemas," - + "__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status," - + "connect-statuses"; - public static final String SYSTEM_INTERNAL_TOPICS_DOC = "List of topics considered part of " - + "system internals which KSQL should not allow users to write data on them. This list will " - + "not be displayed from the SHOW TOPICS command unless SHOW ALL TOPICS is used. The list " - + "is separated by comma and may use regular expressions based on Java Patterns " - + "(i.e. _confluent.* accepts any topic that starts with _confluent prefix)."; + public static final String KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG = "ksql.internal.hidden.topics"; + public static final String KSQL_INTERNAL_HIDDEN_TOPICS_DEFAULT = "_confluent.*,__confluent.*" + + ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets," + + "connect-status,connect-statuses"; + public static final String KSQL_INTERNAL_HIDDEN_TOPICS_DOC = "List of topics that will not be " + + "visible when running the SHOW TOPICS command unless SHOW ALL TOPICS is used. This list " + + "is comma separated and may use Java regular expressions to specify each topic (i.e. " + + " _confluent.* accepts any topic that starts with the _confluent prefix)."; + + public static final String KSQL_INTERNAL_READONLY_TOPICS_CONFIG = "ksql.internal.readonly.topics"; + public static final String KSQL_INTERNAL_READONLY_TOPICS_DEFAULT = "_confluent.*,__confluent.*" + + ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets," + + "connect-status,connect-statuses"; + public static final String KSQL_INTERNAL_READONLY_TOPICS_DOC = "List of topics that KSQL will " + + " handle as read-only. These topics cannot be modified by any KSQL command. This list " + + "is comma separated and may use Java regular expressions to specify each topic (i.e. " + + " _confluent.* accepts any topic that starts with the _confluent prefix)."; private enum ConfigGeneration { LEGACY, @@ -543,11 +551,17 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_AUTH_CACHE_MAX_ENTRIES_DOC ).define( - SYSTEM_INTERNAL_TOPICS_CONFIG, + KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG, Type.LIST, - SYSTEM_INTERNAL_TOPICS_DEFAULT, + KSQL_INTERNAL_HIDDEN_TOPICS_DEFAULT, Importance.LOW, - SYSTEM_INTERNAL_TOPICS_DOC + KSQL_INTERNAL_HIDDEN_TOPICS_DOC + ).define( + KSQL_INTERNAL_READONLY_TOPICS_CONFIG, + Type.LIST, + KSQL_INTERNAL_READONLY_TOPICS_DEFAULT, + Importance.LOW, + KSQL_INTERNAL_READONLY_TOPICS_DOC ) .withClientSslSupport(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java b/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java index 8c754b536ff..21e5ed61e1d 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java @@ -15,14 +15,15 @@ package io.confluent.ksql.util; +import com.google.common.collect.Streams; import io.confluent.ksql.logging.processing.ProcessingLogConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class ReservedInternalTopics { private static final Logger LOG = LoggerFactory.getLogger(ReservedInternalTopics.class); @@ -97,33 +98,56 @@ private static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final Str ); } - private final List systemInternalTopics; + private final Pattern hiddenTopicsPattern; + private final Pattern readOnlyTopicsPattern; public ReservedInternalTopics(final KsqlConfig ksqlConfig) { + final ProcessingLogConfig processingLogConfig = + new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); + + try { + this.hiddenTopicsPattern = Pattern.compile( + Streams.concat( + Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"), + ksqlConfig.getList(KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG).stream() + ).collect(Collectors.joining("|")) + ); + } catch (final Exception e) { + final String message = "Invalid pattern list in '" + + KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG + "'"; + + LOG.error(message + ": " + e.getMessage()); + throw new KsqlException(message, e); + } + try { - this.systemInternalTopics = ksqlConfig.getList(KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG) - .stream() - .map(Pattern::compile) - .collect(Collectors.toList()); + this.readOnlyTopicsPattern = Pattern.compile( + Streams.concat( + Stream.of(processingLogTopic(processingLogConfig, ksqlConfig)), + Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"), + ksqlConfig.getList(KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG).stream() + ).collect(Collectors.joining("|")) + ); } catch (final Exception e) { - final String message = "Cannot get a list of system internal topics due to an invalid " + - "configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'"; + final String message = "Invalid pattern list in '" + + KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG + "'"; LOG.error(message + ": " + e.getMessage()); throw new KsqlException(message, e); } } - public Set filterInternalTopics(final Set topicNames) { + public Set removeHiddenTopics(final Set topicNames) { return topicNames.stream() - .filter(t -> !isInternalTopic(t)) + .filter(t -> !isHidden(t)) .collect(Collectors.toSet()); } - public boolean isInternalTopic(final String topicName) { - return topicName.startsWith(KSQL_INTERNAL_TOPIC_PREFIX) || systemInternalTopics.stream() - .filter(p -> p.matcher(topicName).matches()) - .findAny() - .isPresent(); + public boolean isHidden(final String topicName) { + return hiddenTopicsPattern.matcher(topicName).matches(); + } + + public boolean isReadOnly(final String topicName) { + return readOnlyTopicsPattern.matcher(topicName).matches(); } } diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java index 0046a2550bf..940679dd40f 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java @@ -20,9 +20,9 @@ import java.util.List; import java.util.Set; -import java.util.regex.PatternSyntaxException; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.logging.processing.ProcessingLogConfig; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -32,6 +32,8 @@ import static org.junit.Assert.assertThat; public class ReservedInternalTopicsTest { + private static final String KSQL_PROCESSING_LOG_TOPIC = "default_ksql_processing_log"; + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -41,7 +43,8 @@ public class ReservedInternalTopicsTest { @Before public void setUp() { ksqlConfig = new KsqlConfig(ImmutableMap.of( - KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "prefix_.*,literal,.*_suffix" + KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG, "prefix_.*,literal,.*_suffix", + KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG, "ro_prefix_.*,ro_literal,.*_suffix_ro" )); internalTopics = new ReservedInternalTopics(ksqlConfig); @@ -49,78 +52,120 @@ public void setUp() { @Test - public void shouldReturnTrueOnAllInternalTopics() { + public void shouldReturnTrueOnAllHiddenTopics() { // Given final List topicNames = ImmutableList.of( + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test", "prefix_", "_suffix", "prefix_topic", "topic_suffix", "literal" ); topicNames.forEach(topic -> { // When - final boolean isReserved = internalTopics.isInternalTopic(topic); + final boolean isHidden = internalTopics.isHidden(topic); // Then - assertThat("Should return true on internal topic: " + topic, - isReserved, is(true)); + assertThat("Should return true on hidden topic: " + topic, + isHidden, is(true)); }); } @Test - public void shouldReturnFalseOnNonInternalTopics() { + public void shouldReturnTrueOnAllReadOnlyTopics() { // Given final List topicNames = ImmutableList.of( + KSQL_PROCESSING_LOG_TOPIC, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test", + "ro_prefix_", "_suffix_ro", "ro_prefix_topic", "topic_suffix_ro", "ro_literal" + ); + + topicNames.forEach(topic -> { + // When + final boolean isReadOnly = internalTopics.isReadOnly(topic); + + // Then + assertThat("Should return true on read-only topic: " + topic, + isReadOnly, is(true)); + }); + } + + @Test + public void shouldReturnFalseOnNonHiddenTopics() { + // Given + final List topicNames = ImmutableList.of( + KSQL_PROCESSING_LOG_TOPIC, "topic_prefix_", "_suffix_topic" ); // Given topicNames.forEach(topic -> { // When - final boolean isReserved = internalTopics.isInternalTopic(topic); + final boolean isHidden = internalTopics.isHidden(topic); // Then - assertThat("Should return false on non-internal topic: " + topic, - isReserved, is(false)); + assertThat("Should return false on non-hidden topic: " + topic, + isHidden, is(false)); }); } @Test - public void shouldReturnTrueOnKsqlInternalTopics() { + public void shouldReturnFalseOnNonReadOnlyTopics() { // Given - final String ksqlInternalTopic = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test"; + final List topicNames = ImmutableList.of( + "topic_prefix_", "_suffix_topic" + ); - // When - final boolean isReserved = - internalTopics.isInternalTopic(ksqlInternalTopic); + // Given + topicNames.forEach(topic -> { + // When + final boolean isReadOnly = internalTopics.isReadOnly(topic); - // Then - assertThat(isReserved, is(true)); + // Then + assertThat("Should return false on non read-only topic: " + topic, + isReadOnly, is(false)); + }); } @Test - public void shouldFilterAllInternalTopics() { + public void shouldRemoveAllHiddenTopics() { // Given final Set topics = ImmutableSet.of( "prefix_name", "literal", "tt", "name1", "suffix", "p_suffix" ); // When - final Set filteredTopics = internalTopics.filterInternalTopics(topics); + final Set filteredTopics = internalTopics.removeHiddenTopics(topics); // Then assertThat(filteredTopics, is(ImmutableSet.of("tt", "name1", "suffix"))); } @Test - public void shouldThrowWhenInvalidSystemTopicsListIsUsed() { + public void shouldThrowWhenInvalidInternalHiddenTopicsListIsUsed() { + // Given + final KsqlConfig givenConfig = new KsqlConfig(ImmutableMap.of( + KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG, "*_suffix" + )); + + // Then + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Invalid pattern list in '" + + KsqlConfig.KSQL_INTERNAL_HIDDEN_TOPICS_CONFIG + "'"); + + // When + new ReservedInternalTopics(givenConfig); + } + + @Test + public void shouldThrowWhenInvalidInternalReadOnlyTopicsListIsUsed() { // Given final KsqlConfig givenConfig = new KsqlConfig(ImmutableMap.of( - KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG, "*_suffix" + KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG, "*_suffix" )); // Then expectedException.expect(KsqlException.class); - expectedException.expectMessage("Cannot get a list of system internal topics due to" + - " an invalid configuration in '" + KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG + "'"); + expectedException.expectMessage("Invalid pattern list in '" + + KsqlConfig.KSQL_INTERNAL_READONLY_TOPICS_CONFIG + "'"); // When new ReservedInternalTopics(givenConfig); @@ -132,7 +177,7 @@ public void shouldReturnCommandTopic() { final String commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig); // Then - assertThat("_confluent-ksql-default__command_topic", is(commandTopic)); + assertThat(commandTopic, is("_confluent-ksql-default__command_topic")); } @Test @@ -141,6 +186,17 @@ public void shouldReturnConfigsTopic() { final String commandTopic = ReservedInternalTopics.configsTopic(ksqlConfig); // Then - assertThat("_confluent-ksql-default__configs", is(commandTopic)); + assertThat(commandTopic, is("_confluent-ksql-default__configs")); + } + + @Test + public void shouldReturnProcessingLogTopic() { + // Given/When + final ProcessingLogConfig processingLogConfig = new ProcessingLogConfig(ImmutableMap.of()); + final String processingLogTopic = ReservedInternalTopics.processingLogTopic( + processingLogConfig, ksqlConfig); + + // Then + assertThat(processingLogTopic, is("default_ksql_processing_log")); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 63dbc30dd23..31dfb7a03f8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -28,7 +28,6 @@ import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.NoopProcessingLogContext; -import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; @@ -190,17 +189,8 @@ private DataSource getDataSource( } final ReservedInternalTopics internalTopics = new ReservedInternalTopics(ksqlConfig); - if (internalTopics.isInternalTopic(dataSource.getKafkaTopicName())) { - throw new KsqlException("Cannot insert values into the reserved internal topic: " - + dataSource.getKafkaTopicName()); - } - - final ProcessingLogConfig processingLogConfig = - new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); - final String processingLogTopic = - ReservedInternalTopics.processingLogTopic(processingLogConfig, ksqlConfig); - if (dataSource.getKafkaTopicName().equals(processingLogTopic)) { - throw new KsqlException("Cannot insert into the processing log topic: " + if (internalTopics.isReadOnly(dataSource.getKafkaTopicName())) { + throw new KsqlException("Cannot insert values into read-only topic: " + dataSource.getKafkaTopicName()); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java index 5bf5da4ebba..e1c1d7233cc 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java @@ -562,7 +562,7 @@ public void shouldThrowWhenInsertValuesOnReservedInternalTopic() { // Expect: expectedException.expect(KsqlException.class); expectedException.expectMessage( - "Cannot insert values into the reserved internal topic: _confluent-ksql-default__command-topic"); + "Cannot insert values into read-only topic: _confluent-ksql-default__command-topic"); // When: executor.execute(statement, ImmutableMap.of(), engine, serviceContext); @@ -592,7 +592,7 @@ public void shouldThrowWhenInsertValuesOnProcessingLogTopic() { // Expect: expectedException.expect(KsqlException.class); expectedException.expectMessage( - "Cannot insert into the processing log topic: default_ksql_processing_log"); + "Cannot insert values into read-only topic: default_ksql_processing_log"); // When: executor.execute(statement, ImmutableMap.of(), engine, serviceContext); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 26d181169dd..0daf095db77 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -105,7 +105,8 @@ public class KafkaTopicClientImplTest { "default", "query_CTAS_USERS_BY_CITY-KSTREAM-AGGREGATE" + "-STATE-STORE-0000000006-changelog"); - private static final String confluentInternalTopic = "_confluent-confluent-control-center"; + private static final String internalTopic = String.format("%s_internal", + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX); private Node node; @Mock private AdminClient adminClient; @@ -645,7 +646,7 @@ private static ListTopicsResult getListTopicsResultWithInternalTopics() { final ListTopicsResult listTopicsResult = mock(ListTopicsResult.class); final List topicNamesList = Arrays.asList(topicName1, topicName2, topicName3, internalTopic1, internalTopic2, - confluentInternalTopic); + internalTopic); expect(listTopicsResult.names()) .andReturn(KafkaFuture.completedFuture(new HashSet<>(topicNamesList))); replay(listTopicsResult); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 06022043f8b..82f44211970 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -15,7 +15,6 @@ package io.confluent.ksql.rest.server.computation; import io.confluent.ksql.KsqlExecutionContext; -import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.parser.tree.InsertInto; @@ -49,20 +48,24 @@ * {@code distributedCmdResponseTimeout}. */ public class DistributingExecutor { + private final KsqlConfig ksqlConfig; private final CommandQueue commandQueue; private final Duration distributedCmdResponseTimeout; private final BiFunction injectorFactory; private final Optional authorizationValidator; private final ValidatedCommandFactory validatedCommandFactory; private final CommandIdAssigner commandIdAssigner; + private final ReservedInternalTopics internalTopics; public DistributingExecutor( + final KsqlConfig ksqlConfig, final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final BiFunction injectorFactory, final Optional authorizationValidator, final ValidatedCommandFactory validatedCommandFactory ) { + this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); this.distributedCmdResponseTimeout = Objects.requireNonNull(distributedCmdResponseTimeout, "distributedCmdResponseTimeout"); @@ -74,6 +77,7 @@ public DistributingExecutor( "validatedCommandFactory" ); this.commandIdAssigner = new CommandIdAssigner(); + this.internalTopics = new ReservedInternalTopics(ksqlConfig); } /** @@ -96,8 +100,7 @@ public Optional execute( .inject(statement); if (injected.getStatement() instanceof InsertInto) { - throwIfInsertOnInternalTopic( - statement.getConfig(), + throwIfInsertOnReadOnlyTopic( executionContext.getMetaStore(), (InsertInto)injected.getStatement() ); @@ -174,8 +177,7 @@ private void checkAuthorization( } } - private void throwIfInsertOnInternalTopic( - final KsqlConfig ksqlConfig, + private void throwIfInsertOnReadOnlyTopic( final MetaStore metaStore, final InsertInto insertInto ) { @@ -185,18 +187,8 @@ private void throwIfInsertOnInternalTopic( + insertInto.getTarget()); } - final ReservedInternalTopics internalTopics = new ReservedInternalTopics(ksqlConfig); - if (internalTopics.isInternalTopic(dataSource.getKafkaTopicName())) { - throw new KsqlException("Cannot insert into the reserved internal topic: " - + dataSource.getKafkaTopicName()); - } - - final ProcessingLogConfig processingLogConfig = - new ProcessingLogConfig(ksqlConfig.getAllConfigPropsWithSecretsObfuscated()); - final String processingLogTopic = - ReservedInternalTopics.processingLogTopic(processingLogConfig, ksqlConfig); - if (dataSource.getKafkaTopicName().equals(processingLogTopic)) { - throw new KsqlException("Cannot insert into the processing log topic: " + if (internalTopics.isReadOnly(dataSource.getKafkaTopicName())) { + throw new KsqlException("Cannot insert into read-only topic: " + dataSource.getKafkaTopicName()); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java index 8fd6513a128..93e01b18e61 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java @@ -90,7 +90,7 @@ private static Map listTopics( final Set topics = statement.getStatement().getShowAll() ? topicClient.listTopicNames() - : internalTopics.filterInternalTopics(topicClient.listTopicNames()); + : internalTopics.removeHiddenTopics(topicClient.listTopicNames()); return new TreeMap<>( filterKsqlInternalTopics(topicClient.describeTopics(topics), statement.getConfig()) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index f7d0044b99a..3f36e789ca6 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -162,6 +162,7 @@ public void configure(final KsqlConfig config) { this.handler = new RequestHandler( CustomExecutors.EXECUTOR_MAP, new DistributingExecutor( + config, commandQueue, distributedCmdResponseTimeout, injectorFactory, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 206e548f9f2..8e127d6c095 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -153,6 +153,7 @@ public void setUp() throws InterruptedException { securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext); distributor = new DistributingExecutor( + KSQL_CONFIG, queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), @@ -307,7 +308,7 @@ public void shouldThrowExceptionWhenInsertIntoUnknownStream() { } @Test - public void shouldThrowExceptionWhenInsertIntoReservedInternalTopic() { + public void shouldThrowExceptionWhenInsertIntoReadOnlyTopic() { // Given final PreparedStatement preparedStatement = PreparedStatement.of("", new InsertInto(SourceName.of("s1"), mock(Query.class))); @@ -319,7 +320,7 @@ public void shouldThrowExceptionWhenInsertIntoReservedInternalTopic() { // Expect: expectedException.expect(KsqlException.class); - expectedException.expectMessage("Cannot insert into the reserved internal topic: " + expectedException.expectMessage("Cannot insert into read-only topic: " + "_confluent-ksql-default__command-topic"); // When: @@ -339,7 +340,7 @@ public void shouldThrowExceptionWhenInsertIntoProcessingLogTopic() { // Expect: expectedException.expect(KsqlException.class); - expectedException.expectMessage("Cannot insert into the processing log topic: " + expectedException.expectMessage("Cannot insert into read-only topic: " + "default_ksql_processing_log"); // When: