diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 56d74e75e918..088cc345f59d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -15,7 +15,6 @@ package io.confluent.ksql.services; -import com.google.common.base.Suppliers; import com.google.common.collect.Lists; import io.confluent.ksql.exception.KafkaResponseGetFailedException; import io.confluent.ksql.topic.TopicProperties; @@ -31,9 +30,10 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; + +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; @@ -45,6 +45,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.slf4j.Logger; @@ -65,14 +66,6 @@ public class KafkaTopicClientImpl implements KafkaTopicClient { private final AdminClient adminClient; - // This supplier solves two issues: - // 1. Avoids the constructor to check for the topic.delete.enable unnecessary. The AdminClient - // might not have access to this config, and it would fail for every Ksql command if it does - // the check initially. - // 2. It is a memoize supplier. Once this is call, the subsequent calls will return the cached - // value. - private final Supplier isTopicDeleteEnabledSupplier; - /** * Construct a topic client from an existing admin client. * @@ -80,7 +73,6 @@ public class KafkaTopicClientImpl implements KafkaTopicClient { */ public KafkaTopicClientImpl(final AdminClient adminClient) { this.adminClient = Objects.requireNonNull(adminClient, "adminClient"); - this.isTopicDeleteEnabledSupplier = Suppliers.memoize(this::isTopicDeleteEnabled); } @Override @@ -249,10 +241,7 @@ public void deleteTopics(final Collection topicsToDelete) { if (topicsToDelete.isEmpty()) { return; } - if (!isTopicDeleteEnabledSupplier.get()) { - LOG.info("Cannot delete topics since '" + DELETE_TOPIC_ENABLE + "' is false. "); - return; - } + final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete); final Map> results = deleteTopicsResult.values(); final List failList = Lists.newArrayList(); @@ -261,6 +250,21 @@ public void deleteTopics(final Collection topicsToDelete) { try { entry.getValue().get(30, TimeUnit.SECONDS); } catch (final Exception e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + + // Checking if the Kafka cluster has the DELETE_TOPIC_ENABLE configuration enabled requires + // the user to have DescribeConfigs permissions (or ACL) in Kafka. To avoid giving that + // unnecessary permission, we can detect it by catching the TopicDeletionDisabledException. + if (rootCause instanceof TopicDeletionDisabledException) { + // If TopicDeletionDisabledException is detected, we throw the exception immediately + // instead of going through the rest of the topics to delete. + // It is now up to the caller to ignore this exception. + throw new TopicDeletionDisabledException("Topic deletion is disabled. " + + "To delete the topic, you must set '" + DELETE_TOPIC_ENABLE + "' to true in " + + "the Kafka cluster configuration."); + } + + LOG.error(String.format("Could not delete topic '%s'", entry.getKey()), e); failList.add(entry.getKey()); } } @@ -271,10 +275,6 @@ public void deleteTopics(final Collection topicsToDelete) { @Override public void deleteInternalTopics(final String applicationId) { - if (!isTopicDeleteEnabledSupplier.get()) { - LOG.warn("Cannot delete topics since '" + DELETE_TOPIC_ENABLE + "' is false. "); - return; - } try { final Set topicNames = listTopicNames(); final List internalTopics = Lists.newArrayList(); @@ -286,6 +286,9 @@ public void deleteInternalTopics(final String applicationId) { if (!internalTopics.isEmpty()) { deleteTopics(internalTopics); } + } catch (final TopicDeletionDisabledException e) { + // Ignore TopicDeletionDisabledException should not be logged as an error + LOG.info("Did not delete any topics: ", e.getMessage()); } catch (final Exception e) { LOG.error("Exception while trying to clean up internal topics for application id: {}.", applicationId, e @@ -293,18 +296,6 @@ public void deleteInternalTopics(final String applicationId) { } } - private boolean isTopicDeleteEnabled() { - try { - final ConfigEntry configEntry = getConfig().get(DELETE_TOPIC_ENABLE); - // default to true if there is no entry - return configEntry == null || Boolean.valueOf(configEntry.value()); - } catch (final Exception e) { - LOG.error("Failed to initialize TopicClient: {}", e.getMessage()); - throw new KafkaResponseGetFailedException( - "Could not fetch broker information. KSQL cannot initialize", e); - } - } - private Config getConfig() { return KafkaClusterUtil.getConfig(adminClient); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java index 2684325e22e0..d219dc3b7df1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java @@ -31,7 +31,6 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.statement.Injector; import io.confluent.ksql.util.ExecutorUtil; -import io.confluent.ksql.util.ExecutorUtil.RetryBehaviour; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import java.util.Objects; @@ -95,7 +94,7 @@ public ConfiguredStatement inject( try { ExecutorUtil.executeWithRetries( () -> topicClient.deleteTopics(ImmutableList.of(source.getKafkaTopicName())), - RetryBehaviour.ALWAYS); + ExecutorUtil.RetryBehaviour.ALWAYS); } catch (Exception e) { throw new KsqlException("Could not delete the corresponding kafka topic: " + source.getKafkaTopicName(), e); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index a32be56bbd42..768a78861da3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -65,6 +65,7 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -239,9 +240,6 @@ public void shouldListTopicNames() { @Test public void shouldDeleteTopics() { - expect(adminClient.describeCluster()).andReturn(describeClusterResult()); - expect(adminClient.describeConfigs(describeBrokerRequest())) - .andReturn(describeBrokerResult(Collections.emptyList())); expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult()); replay(adminClient); final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); @@ -264,9 +262,6 @@ public void shouldReturnIfDeleteTopicsIsEmpty() { @Test public void shouldDeleteInternalTopics() { - expect(adminClient.describeCluster()).andReturn(describeClusterResult()); - expect(adminClient.describeConfigs(describeBrokerRequest())) - .andReturn(describeBrokerResult(Collections.emptyList())); expect(adminClient.listTopics()).andReturn(getListTopicsResultWithInternalTopics()); expect(adminClient.deleteTopics(Arrays.asList(internalTopic2, internalTopic1))) .andReturn(getDeleteInternalTopicsResult()); @@ -280,47 +275,14 @@ public void shouldDeleteInternalTopics() { } @Test - public void shouldDeleteTopicsIfDeleteTopicEnableTrue() { - // Given: - givenDeleteTopicEnableTrue(); - expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult()); - replay(adminClient); - final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); - - // When: - kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2)); - - // Then: - verify(adminClient); - } - - @Test - public void shouldDeleteTopicsIfBrokerDoesNotReturnValueForDeleteTopicEnable() { - // Given: - givenDeleteTopicEnableNotReturnedByBroker(); - expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult()); - replay(adminClient); - final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); - - // When: - kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2)); - - // Then: - verify(adminClient); - } - - @Test - public void shouldNotDeleteTopicIfDeleteTopicEnableFalse() { + public void shouldDeleteTopicThrowOnTopicDeletionDisabledException() { // Given: - givenDeleteTopicEnableFalse(); + expect(adminClient.deleteTopics(anyObject())).andReturn(getTopicDeletionDisableException()); replay(adminClient); final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient); // When: kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2)); - - // Then: - verify(adminClient); } @Test @@ -597,29 +559,23 @@ private Collection describeBrokerRequest() { return Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, node.idString())); } - private void givenDeleteTopicEnableTrue() { - reset(adminClient); - expect(adminClient.describeCluster()).andReturn(describeClusterResult()); - - final ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true"); - expect(adminClient.describeConfigs(describeBrokerRequest())) - .andReturn(describeBrokerResult(Collections.singletonList(configEntryDeleteEnable))); - } + private DeleteTopicsResult getTopicDeletionDisableException() { + final DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class); + final KafkaFuture kafkaFuture = mock(KafkaFuture.class); - private void givenDeleteTopicEnableFalse() { - reset(adminClient); - expect(adminClient.describeCluster()).andReturn(describeClusterResult()); + try { + expect(kafkaFuture.get()).andThrow( + new TopicDeletionDisabledException("Topic deletion is disabled") + ); + } catch (final Exception e) { + // this should not happen in the test + } - final ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "false"); - expect(adminClient.describeConfigs(describeBrokerRequest())) - .andReturn(describeBrokerResult(Collections.singletonList(configEntryDeleteEnable))); - } + expect(deleteTopicsResult.values()) + .andReturn(Collections.singletonMap(topicName1, kafkaFuture)); - private void givenDeleteTopicEnableNotReturnedByBroker() { - reset(adminClient); - expect(adminClient.describeCluster()).andReturn(describeClusterResult()); - expect(adminClient.describeConfigs(describeBrokerRequest())) - .andReturn(describeBrokerResult(Collections.emptyList())); + replay(deleteTopicsResult); + return deleteTopicsResult; } private DescribeConfigsResult describeBrokerResult(final List brokerConfigs) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java index 79f6ef9390b9..27afacc768b7 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java @@ -36,6 +36,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +99,9 @@ private void deleteTopics(final Collection topicsToBeDeleted) { () -> serviceContext.getTopicClient().deleteTopics( filterNonExistingTopics(topicsToBeDeleted)), ExecutorUtil.RetryBehaviour.ALWAYS); + } catch (final TopicDeletionDisabledException e) { + // Ignore TopicDeletionDisabledException when a Cluster termination is requested. + LOGGER.info("Did not delete any topics: ", e.getMessage()); } catch (final Exception e) { throw new KsqlException( "Exception while deleting topics: " + StringUtils.join(topicsToBeDeleted, ", ")); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java index f55c16dec5a6..f96cd88286a1 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java @@ -55,6 +55,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; + +import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -374,6 +376,21 @@ public void shouldThrowIfCannotDeleteManagedTopic() { } + @Test + public void shouldNotThrowOnTopicDeletionDisabledException() throws Exception { + // Given: + givenTopicsExistInKafka("K_Foo"); + givenSinkTopicsExistInMetastore(Format.AVRO,"K_Foo"); + givenSchemasForTopicsExistInSchemaRegistry("K_Foo"); + doThrow(TopicDeletionDisabledException.class).when(kafkaTopicClient).deleteTopics(any()); + + // When: + clusterTerminator.terminateCluster(ImmutableList.of("K_Foo")); + + // Then: + verifySchemaDeletedForTopics("K_Foo"); + } + @Test public void shouldNotThrowIfCannotCleanUpSchema() throws Exception { // Given: