Skip to content

Commit

Permalink
Fix the UTO upgrade issue reported in #9470. (#9474)
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri authored Dec 18, 2023
1 parent 12b27a8 commit 391179f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import java.io.InterruptedIOException;
Expand Down Expand Up @@ -283,7 +284,12 @@ private PartitionedByError<ReconcilableTopic, Void> createTopics(List<Reconcilab
values.get(reconcilableTopic.topicName()).get();
return pair(reconcilableTopic, Either.ofRight((null)));
} catch (ExecutionException e) {
return pair(reconcilableTopic, Either.ofLeft(handleAdminException(e)));
if (e.getCause() != null && e.getCause() instanceof TopicExistsException) {
// we treat this as a success, the next reconciliation checks the configuration
return pair(reconcilableTopic, Either.ofRight((null)));
} else {
return pair(reconcilableTopic, Either.ofLeft(handleAdminException(e)));
}
} catch (InterruptedException e) {
throw new UncheckedInterruptedException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.logging.log4j.Level;
Expand Down Expand Up @@ -1883,7 +1884,7 @@ public void shouldFailIfNumPartitionsDivergedWithConfigChange(@BrokerConfig(name
"Decreasing partitions not supported"));
}

private static <T> KafkaFuture<T> failedFuture(Throwable error) throws ExecutionException, InterruptedException {
private static <T> KafkaFuture<T> failedFuture(Throwable error) {
var future = new KafkaFutureImpl<T>();
future.completeExceptionally(error);
return future;
Expand Down Expand Up @@ -2080,4 +2081,22 @@ public void shouldNotReconcileKafkaTopicWithMissingSpec(

assertNotExistsInKafka(expectedTopicName(created));
}

@Test
public void shouldReconcileOnTopicExistsException(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
var config = topicOperatorConfig(NAMESPACE, kafkaCluster);
var topicName = randomTopicName();

var creteTopicResult = mock(CreateTopicsResult.class);
var existsException = new TopicExistsException(format("Topic '%s' already exists.", topicName));
Mockito.doReturn(failedFuture(existsException)).when(creteTopicResult).all();
Mockito.doReturn(Map.of(topicName, failedFuture(existsException))).when(creteTopicResult).values();
operatorAdmin = new Admin[]{Mockito.spy(Admin.create(config.adminClientConfig()))};
Mockito.doReturn(creteTopicResult).when(operatorAdmin[0]).createTopics(any());

KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1));
assertTrue(readyIsTrue().test(kafkaTopic));
}
}

0 comments on commit 391179f

Please sign in to comment.