Skip to content

Commit

Permalink
fix: Remove delete.topic.enable check (#3089)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Jul 18, 2019
1 parent d87923a commit 71ec1c0
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.services;

import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.topic.TopicProperties;
Expand All @@ -31,9 +30,10 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
Expand All @@ -45,6 +45,7 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.slf4j.Logger;
Expand All @@ -65,22 +66,13 @@ public class KafkaTopicClientImpl implements KafkaTopicClient {

private final AdminClient adminClient;

// This supplier solves two issues:
// 1. Avoids the constructor to check for the topic.delete.enable unnecessary. The AdminClient
// might not have access to this config, and it would fail for every Ksql command if it does
// the check initially.
// 2. It is a memoize supplier. Once this is call, the subsequent calls will return the cached
// value.
private final Supplier<Boolean> isTopicDeleteEnabledSupplier;

/**
* Construct a topic client from an existing admin client.
*
* @param adminClient the admin client.
*/
public KafkaTopicClientImpl(final AdminClient adminClient) {
this.adminClient = Objects.requireNonNull(adminClient, "adminClient");
this.isTopicDeleteEnabledSupplier = Suppliers.memoize(this::isTopicDeleteEnabled);
}

@Override
Expand Down Expand Up @@ -249,10 +241,7 @@ public void deleteTopics(final Collection<String> topicsToDelete) {
if (topicsToDelete.isEmpty()) {
return;
}
if (!isTopicDeleteEnabledSupplier.get()) {
LOG.info("Cannot delete topics since '" + DELETE_TOPIC_ENABLE + "' is false. ");
return;
}

final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
final List<String> failList = Lists.newArrayList();
Expand All @@ -261,6 +250,21 @@ public void deleteTopics(final Collection<String> topicsToDelete) {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
} catch (final Exception e) {
final Throwable rootCause = ExceptionUtils.getRootCause(e);

// Checking if the Kafka cluster has the DELETE_TOPIC_ENABLE configuration enabled requires
// the user to have DescribeConfigs permissions (or ACL) in Kafka. To avoid giving that
// unnecessary permission, we can detect it by catching the TopicDeletionDisabledException.
if (rootCause instanceof TopicDeletionDisabledException) {
// If TopicDeletionDisabledException is detected, we throw the exception immediately
// instead of going through the rest of the topics to delete.
// It is now up to the caller to ignore this exception.
throw new TopicDeletionDisabledException("Topic deletion is disabled. "
+ "To delete the topic, you must set '" + DELETE_TOPIC_ENABLE + "' to true in "
+ "the Kafka cluster configuration.");
}

LOG.error(String.format("Could not delete topic '%s'", entry.getKey()), e);
failList.add(entry.getKey());
}
}
Expand All @@ -271,10 +275,6 @@ public void deleteTopics(final Collection<String> topicsToDelete) {

@Override
public void deleteInternalTopics(final String applicationId) {
if (!isTopicDeleteEnabledSupplier.get()) {
LOG.warn("Cannot delete topics since '" + DELETE_TOPIC_ENABLE + "' is false. ");
return;
}
try {
final Set<String> topicNames = listTopicNames();
final List<String> internalTopics = Lists.newArrayList();
Expand All @@ -286,25 +286,16 @@ public void deleteInternalTopics(final String applicationId) {
if (!internalTopics.isEmpty()) {
deleteTopics(internalTopics);
}
} catch (final TopicDeletionDisabledException e) {
// Ignore TopicDeletionDisabledException should not be logged as an error
LOG.info("Did not delete any topics: ", e.getMessage());
} catch (final Exception e) {
LOG.error("Exception while trying to clean up internal topics for application id: {}.",
applicationId, e
);
}
}

private boolean isTopicDeleteEnabled() {
try {
final ConfigEntry configEntry = getConfig().get(DELETE_TOPIC_ENABLE);
// default to true if there is no entry
return configEntry == null || Boolean.valueOf(configEntry.value());
} catch (final Exception e) {
LOG.error("Failed to initialize TopicClient: {}", e.getMessage());
throw new KafkaResponseGetFailedException(
"Could not fetch broker information. KSQL cannot initialize", e);
}
}

private Config getConfig() {
return KafkaClusterUtil.getConfig(adminClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.ExecutorUtil.RetryBehaviour;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
Expand Down Expand Up @@ -95,7 +94,7 @@ public <T extends Statement> ConfiguredStatement<T> inject(
try {
ExecutorUtil.executeWithRetries(
() -> topicClient.deleteTopics(ImmutableList.of(source.getKafkaTopicName())),
RetryBehaviour.ALWAYS);
ExecutorUtil.RetryBehaviour.ALWAYS);
} catch (Exception e) {
throw new KsqlException("Could not delete the corresponding kafka topic: "
+ source.getKafkaTopicName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -239,9 +240,6 @@ public void shouldListTopicNames() {

@Test
public void shouldDeleteTopics() {
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.emptyList()));
expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
Expand All @@ -264,9 +262,6 @@ public void shouldReturnIfDeleteTopicsIsEmpty() {

@Test
public void shouldDeleteInternalTopics() {
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.emptyList()));
expect(adminClient.listTopics()).andReturn(getListTopicsResultWithInternalTopics());
expect(adminClient.deleteTopics(Arrays.asList(internalTopic2, internalTopic1)))
.andReturn(getDeleteInternalTopicsResult());
Expand All @@ -280,47 +275,14 @@ public void shouldDeleteInternalTopics() {
}

@Test
public void shouldDeleteTopicsIfDeleteTopicEnableTrue() {
// Given:
givenDeleteTopicEnableTrue();
expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2));

// Then:
verify(adminClient);
}

@Test
public void shouldDeleteTopicsIfBrokerDoesNotReturnValueForDeleteTopicEnable() {
// Given:
givenDeleteTopicEnableNotReturnedByBroker();
expect(adminClient.deleteTopics(anyObject())).andReturn(getDeleteTopicsResult());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2));

// Then:
verify(adminClient);
}

@Test
public void shouldNotDeleteTopicIfDeleteTopicEnableFalse() {
public void shouldDeleteTopicThrowOnTopicDeletionDisabledException() {
// Given:
givenDeleteTopicEnableFalse();
expect(adminClient.deleteTopics(anyObject())).andReturn(getTopicDeletionDisableException());
replay(adminClient);
final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);

// When:
kafkaTopicClient.deleteTopics(Collections.singletonList(topicName2));

// Then:
verify(adminClient);
}

@Test
Expand Down Expand Up @@ -597,29 +559,23 @@ private Collection<ConfigResource> describeBrokerRequest() {
return Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, node.idString()));
}

private void givenDeleteTopicEnableTrue() {
reset(adminClient);
expect(adminClient.describeCluster()).andReturn(describeClusterResult());

final ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true");
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.singletonList(configEntryDeleteEnable)));
}
private DeleteTopicsResult getTopicDeletionDisableException() {
final DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class);
final KafkaFuture<Void> kafkaFuture = mock(KafkaFuture.class);

private void givenDeleteTopicEnableFalse() {
reset(adminClient);
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
try {
expect(kafkaFuture.get()).andThrow(
new TopicDeletionDisabledException("Topic deletion is disabled")
);
} catch (final Exception e) {
// this should not happen in the test
}

final ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "false");
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.singletonList(configEntryDeleteEnable)));
}
expect(deleteTopicsResult.values())
.andReturn(Collections.singletonMap(topicName1, kafkaFuture));

private void givenDeleteTopicEnableNotReturnedByBroker() {
reset(adminClient);
expect(adminClient.describeCluster()).andReturn(describeClusterResult());
expect(adminClient.describeConfigs(describeBrokerRequest()))
.andReturn(describeBrokerResult(Collections.emptyList()));
replay(deleteTopicsResult);
return deleteTopicsResult;
}

private DescribeConfigsResult describeBrokerResult(final List<ConfigEntry> brokerConfigs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,6 +99,9 @@ private void deleteTopics(final Collection<String> topicsToBeDeleted) {
() -> serviceContext.getTopicClient().deleteTopics(
filterNonExistingTopics(topicsToBeDeleted)),
ExecutorUtil.RetryBehaviour.ALWAYS);
} catch (final TopicDeletionDisabledException e) {
// Ignore TopicDeletionDisabledException when a Cluster termination is requested.
LOGGER.info("Did not delete any topics: ", e.getMessage());
} catch (final Exception e) {
throw new KsqlException(
"Exception while deleting topics: " + StringUtils.join(topicsToBeDeleted, ", "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -374,6 +376,21 @@ public void shouldThrowIfCannotDeleteManagedTopic() {

}

@Test
public void shouldNotThrowOnTopicDeletionDisabledException() throws Exception {
// Given:
givenTopicsExistInKafka("K_Foo");
givenSinkTopicsExistInMetastore(Format.AVRO,"K_Foo");
givenSchemasForTopicsExistInSchemaRegistry("K_Foo");
doThrow(TopicDeletionDisabledException.class).when(kafkaTopicClient).deleteTopics(any());

// When:
clusterTerminator.terminateCluster(ImmutableList.of("K_Foo"));

// Then:
verifySchemaDeletedForTopics("K_Foo");
}

@Test
public void shouldNotThrowIfCannotCleanUpSchema() throws Exception {
// Given:
Expand Down

0 comments on commit 71ec1c0

Please sign in to comment.