From 47301d64f69206fe9132081d901299d56ac52e99 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 15 Sep 2021 08:52:45 -0700 Subject: [PATCH] KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch fixes several problems with the `ElectLeaders` API in KRaft: - `KafkaApis` did not properly forward this request type to the controller. - `ControllerApis` did not handle the request type. - `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null. - Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election. - Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election. - Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected. - When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary. In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft. Reviewers: dengziming , José Armando García Sancio , David Arthur --- .../apache/kafka/common/protocol/ApiKeys.java | 2 +- .../common/requests/ElectLeadersRequest.java | 24 +- .../kafka/server/BrokerLifecycleManager.scala | 8 +- .../scala/kafka/server/BrokerServer.scala | 17 +- .../scala/kafka/server/ControllerApis.scala | 24 +- .../main/scala/kafka/server/KafkaApis.scala | 9 +- .../metadata/BrokerMetadataPublisher.scala | 2 + .../test/java/kafka/test/ClusterInstance.java | 6 + .../junit/RaftClusterInvocationContext.java | 63 +- .../junit/ZkClusterInvocationContext.java | 43 +- .../kafka/server/IntegrationTestUtils.scala | 38 +- .../LeaderElectionCommandErrorTest.scala | 97 +++ .../admin/LeaderElectionCommandTest.scala | 371 +++++------- .../kafka/server/ControllerApisTest.scala | 105 +++- .../unit/kafka/server/KafkaApisTest.scala | 92 ++- .../scala/unit/kafka/utils/TestUtils.scala | 77 ++- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 37 +- .../kafka/zookeeper/ZooKeeperClientTest.scala | 7 +- .../controller/ClusterControlManager.java | 6 +- .../controller/PartitionChangeBuilder.java | 23 + .../controller/ReplicationControlManager.java | 59 +- .../apache/kafka/image/ClientQuotasDelta.java | 7 + .../org/apache/kafka/image/ClusterDelta.java | 7 + .../kafka/image/ConfigurationDelta.java | 9 + .../kafka/image/ConfigurationsDelta.java | 7 + .../org/apache/kafka/image/FeaturesDelta.java | 7 + .../org/apache/kafka/image/MetadataDelta.java | 11 + .../org/apache/kafka/image/TopicDelta.java | 7 + .../org/apache/kafka/image/TopicsDelta.java | 8 + .../PartitionChangeBuilderTest.java | 8 + .../ReplicationControlManagerTest.java | 560 +++++++++++++----- 31 files changed, 1210 insertions(+), 531 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 423093aef9c20..428e4a81b6835 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -84,7 +84,7 @@ public enum ApiKeys { EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true), DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN), DELETE_GROUPS(ApiMessageType.DELETE_GROUPS), - ELECT_LEADERS(ApiMessageType.ELECT_LEADERS), + ELECT_LEADERS(ApiMessageType.ELECT_LEADERS, false, true), INCREMENTAL_ALTER_CONFIGS(ApiMessageType.INCREMENTAL_ALTER_CONFIGS, false, true), ALTER_PARTITION_REASSIGNMENTS(ApiMessageType.ALTER_PARTITION_REASSIGNMENTS, false, true), LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS, false, true), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java index 1bc888af6050c..febb0302ed883 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java @@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { ApiError apiError = ApiError.fromThrowable(e); List electionResults = new ArrayList<>(); - for (TopicPartitions topic : data.topicPartitions()) { - ReplicaElectionResult electionResult = new ReplicaElectionResult(); + if (data.topicPartitions() != null) { + for (TopicPartitions topic : data.topicPartitions()) { + ReplicaElectionResult electionResult = new ReplicaElectionResult(); - electionResult.setTopic(topic.topic()); - for (Integer partitionId : topic.partitions()) { - PartitionResult partitionResult = new PartitionResult(); - partitionResult.setPartitionId(partitionId); - partitionResult.setErrorCode(apiError.error().code()); - partitionResult.setErrorMessage(apiError.message()); + electionResult.setTopic(topic.topic()); + for (Integer partitionId : topic.partitions()) { + PartitionResult partitionResult = new PartitionResult(); + partitionResult.setPartitionId(partitionId); + partitionResult.setErrorCode(apiError.error().code()); + partitionResult.setErrorMessage(apiError.message()); - electionResult.partitionResult().add(partitionResult); - } + electionResult.partitionResult().add(partitionResult); + } - electionResults.add(electionResult); + electionResults.add(electionResult); + } } return new ElectLeadersResponse(throttleTimeMs, apiError.error().code(), electionResults, version()); diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index e15a3e695d660..394c353e45c1a 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -208,7 +208,7 @@ class BrokerLifecycleManager(val config: KafkaConfig, _state = BrokerState.PENDING_CONTROLLED_SHUTDOWN // Send the next heartbeat immediately in order to let the controller // begin processing the controlled shutdown as soon as possible. - scheduleNextCommunication(0) + scheduleNextCommunicationImmediately() case _ => info(s"Skipping controlled shutdown because we are in state ${_state}.") @@ -284,8 +284,8 @@ class BrokerLifecycleManager(val config: KafkaConfig, setIncarnationId(incarnationId). setListeners(_advertisedListeners). setRack(rack.orNull) - if (isTraceEnabled) { - trace(s"Sending broker registration ${data}") + if (isDebugEnabled) { + debug(s"Sending broker registration ${data}") } _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data), new BrokerRegistrationResponseHandler()) @@ -406,7 +406,7 @@ class BrokerLifecycleManager(val config: KafkaConfig, scheduleNextCommunicationAfterSuccess() } } else { - info(s"The controlled has asked us to exit controlled shutdown.") + info(s"The controller has asked us to exit controlled shutdown.") beginShutdown() } gotControlledShutdownResponse = true diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index d2079c42b2c56..533036f42a72e 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -17,11 +17,11 @@ package kafka.server +import java.net.InetAddress import java.util import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} -import java.net.InetAddress import kafka.cluster.Broker.ServerInfo import kafka.coordinator.group.GroupCoordinator @@ -34,7 +34,6 @@ import kafka.security.CredentialProvider import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder} import kafka.utils.{CoreUtils, KafkaScheduler} -import org.apache.kafka.snapshot.SnapshotWriter import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection} import org.apache.kafka.common.metrics.Metrics @@ -45,14 +44,15 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} import org.apache.kafka.common.{ClusterResource, Endpoint} import org.apache.kafka.metadata.{BrokerState, VersionRange} -import org.apache.kafka.raft.{RaftClient, RaftConfig} import org.apache.kafka.raft.RaftConfig.AddressSpec +import org.apache.kafka.raft.{RaftClient, RaftConfig} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.snapshot.SnapshotWriter import scala.collection.{Map, Seq} -import scala.jdk.CollectionConverters._ import scala.compat.java8.OptionConverters._ +import scala.jdk.CollectionConverters._ class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion]) @@ -92,8 +92,7 @@ class BrokerServer( this.logIdent = logContext.logPrefix - val lifecycleManager: BrokerLifecycleManager = - new BrokerLifecycleManager(config, time, threadNamePrefix) + @volatile private var lifecycleManager: BrokerLifecycleManager = null private val isShuttingDown = new AtomicBoolean(false) @@ -105,7 +104,7 @@ class BrokerServer( var controlPlaneRequestProcessor: KafkaApis = null var authorizer: Option[Authorizer] = None - var socketServer: SocketServer = null + @volatile var socketServer: SocketServer = null var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null var logDirFailureChannel: LogDirFailureChannel = null @@ -162,6 +161,8 @@ class BrokerServer( lock.lock() try { if (status != from) return false + info(s"Transition from $status to $to") + status = to if (to == SHUTTING_DOWN) { isShuttingDown.set(true) @@ -182,6 +183,8 @@ class BrokerServer( try { info("Starting broker") + lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix) + /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) kafkaScheduler.startup() diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 2f86e1f7cfd03..ed9b55a1a7c39 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -20,8 +20,8 @@ package kafka.server import java.util import java.util.Collections import java.util.Map.Entry -import java.util.concurrent.{CompletableFuture, ExecutionException} import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS} +import java.util.concurrent.{CompletableFuture, ExecutionException} import kafka.network.RequestChannel import kafka.raft.RaftManager @@ -36,11 +36,10 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse} import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse -import org.apache.kafka.common.message._ +import org.apache.kafka.common.message.{CreateTopicsRequestData, _} import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ @@ -108,6 +107,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request) + case ApiKeys.ELECT_LEADERS => handleElectLeaders(request) case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } } catch { @@ -488,6 +488,24 @@ class ControllerApis(val requestChannel: RequestChannel, handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData])) } + def handleElectLeaders(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, ALTER) + + val electLeadersRequest = request.body[ElectLeadersRequest] + val future = controller.electLeaders(electLeadersRequest.data) + future.whenComplete { (responseData, exception) => + if (exception != null) { + requestHelper.sendResponseMaybeThrottle(request, throttleMs => { + electLeadersRequest.getErrorResponse(throttleMs, exception) + }) + } else { + requestHelper.sendResponseMaybeThrottle(request, throttleMs => { + new ElectLeadersResponse(responseData.setThrottleTimeMs(throttleMs)) + }) + } + } + } + def handleAlterIsrRequest(request: RequestChannel.Request): Unit = { val alterIsrRequest = request.body[AlterIsrRequest] authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c2e0c15a36991..e20a36a0e3ebd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -209,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal) - case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request) + case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders) case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest) case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest) case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest) @@ -2993,9 +2993,8 @@ class KafkaApis(val requestChannel: RequestChannel, true } - def handleElectReplicaLeader(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request)) - + def handleElectLeaders(request: RequestChannel.Request): Unit = { + val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) val electionRequest = request.body[ElectLeadersRequest] def sendResponseCallback( @@ -3006,7 +3005,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { val adjustedResults = if (electionRequest.data.topicPartitions == null) { /* When performing elections across all of the partitions we should only return - * partitions for which there was an eleciton or resulted in an error. In other + * partitions for which there was an election or resulted in an error. In other * words, partitions that didn't need election because they ready have the correct * leader are not returned to the client. */ diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 4fe49384b1731..cedef47fa5d57 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, delta: MetadataDelta, newImage: MetadataImage): Unit = { try { + trace(s"Publishing delta $delta with highest offset $newHighestMetadataOffset") + // Publish the new metadata image to the metadata cache. metadataCache.setImage(newImage) diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 021db5a34381e..23b417e09da20 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -95,5 +95,11 @@ default Admin createAdminClient() { void stop(); + void shutdownBroker(int brokerId); + + void startBroker(int brokerId); + void rollingBrokerRestart(); + + void waitForReadyBrokers() throws InterruptedException; } diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index c60e0ec011272..599bdf02e319d 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.BrokerState; import org.junit.jupiter.api.extension.AfterTestExecutionCallback; import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; @@ -36,10 +37,14 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this @@ -73,6 +78,7 @@ public String getDisplayName(int invocationIndex) { @Override public List getAdditionalExtensions() { + RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, clusterConfig); return Arrays.asList( (BeforeTestExecutionCallback) context -> { TestKitNodes nodes = new TestKitNodes.Builder(). @@ -97,8 +103,8 @@ public List getAdditionalExtensions() { org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); }, - (AfterTestExecutionCallback) context -> clusterReference.get().close(), - new ClusterInstanceParameterResolver(new RaftClusterInstance(clusterReference, clusterConfig)), + (AfterTestExecutionCallback) context -> clusterInstance.stop(), + new ClusterInstanceParameterResolver(clusterInstance), new GenericParameterResolver<>(clusterConfig, ClusterConfig.class) ); } @@ -109,6 +115,7 @@ public static class RaftClusterInstance implements ClusterInstance { private final ClusterConfig clusterConfig; final AtomicBoolean started = new AtomicBoolean(false); final AtomicBoolean stopped = new AtomicBoolean(false); + private final ConcurrentLinkedQueue admins = new ConcurrentLinkedQueue<>(); RaftClusterInstance(AtomicReference clusterReference, ClusterConfig clusterConfig) { this.clusterReference = clusterReference; @@ -122,7 +129,7 @@ public String bootstrapServers() { @Override public Collection brokerSocketServers() { - return clusterReference.get().brokers().values().stream() + return brokers() .map(BrokerServer::socketServer) .collect(Collectors.toList()); } @@ -134,14 +141,14 @@ public ListenerName clientListener() { @Override public Collection controllerSocketServers() { - return clusterReference.get().controllers().values().stream() + return controllers() .map(ControllerServer::socketServer) .collect(Collectors.toList()); } @Override public SocketServer anyBrokerSocketServer() { - return clusterReference.get().brokers().values().stream() + return brokers() .map(BrokerServer::socketServer) .findFirst() .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); @@ -149,7 +156,7 @@ public SocketServer anyBrokerSocketServer() { @Override public SocketServer anyControllerSocketServer() { - return clusterReference.get().controllers().values().stream() + return controllers() .map(ControllerServer::socketServer) .findFirst() .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); @@ -172,7 +179,9 @@ public KafkaClusterTestKit getUnderlying() { @Override public Admin createAdminClient(Properties configOverrides) { - return Admin.create(clusterReference.get().clientProperties()); + Admin admin = Admin.create(clusterReference.get().clientProperties()); + admins.add(admin); + return admin; } @Override @@ -189,11 +198,27 @@ public void start() { @Override public void stop() { if (stopped.compareAndSet(false, true)) { - try { - clusterReference.get().close(); - } catch (Exception e) { - throw new RuntimeException("Failed to stop Raft server", e); - } + admins.forEach(admin -> Utils.closeQuietly(admin, "admin")); + Utils.closeQuietly(clusterReference.get(), "cluster"); + } + } + + @Override + public void shutdownBroker(int brokerId) { + findBrokerOrThrow(brokerId).shutdown(); + } + + @Override + public void startBroker(int brokerId) { + findBrokerOrThrow(brokerId).startup(); + } + + @Override + public void waitForReadyBrokers() throws InterruptedException { + try { + clusterReference.get().waitForReadyBrokers(); + } catch (ExecutionException e) { + throw new AssertionError("Failed while waiting for brokers to become ready", e); } } @@ -201,5 +226,19 @@ public void stop() { public void rollingBrokerRestart() { throw new UnsupportedOperationException("Restarting Raft servers is not yet supported."); } + + private BrokerServer findBrokerOrThrow(int brokerId) { + return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) + .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); + } + + private Stream brokers() { + return clusterReference.get().brokers().values().stream(); + } + + private Stream controllers() { + return clusterReference.get().controllers().values().stream(); + } + } } diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java index cd8cdc11a800d..4d4323f0cdcc0 100644 --- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java @@ -21,12 +21,12 @@ import kafka.network.SocketServer; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; import kafka.utils.TestUtils; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; -import kafka.test.ClusterConfig; -import kafka.test.ClusterInstance; import org.junit.jupiter.api.extension.AfterTestExecutionCallback; import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; import org.junit.jupiter.api.extension.Extension; @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this @@ -193,7 +194,7 @@ public String bootstrapServers() { @Override public Collection brokerSocketServers() { - return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream() + return servers() .map(KafkaServer::socketServer) .collect(Collectors.toList()); } @@ -205,7 +206,7 @@ public ListenerName clientListener() { @Override public Collection controllerSocketServers() { - return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream() + return servers() .filter(broker -> broker.kafkaController().isActive()) .map(KafkaServer::socketServer) .collect(Collectors.toList()); @@ -213,7 +214,7 @@ public Collection controllerSocketServers() { @Override public SocketServer anyBrokerSocketServer() { - return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream() + return servers() .map(KafkaServer::socketServer) .findFirst() .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); @@ -221,7 +222,7 @@ public SocketServer anyBrokerSocketServer() { @Override public SocketServer anyControllerSocketServer() { - return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream() + return servers() .filter(broker -> broker.kafkaController().isActive()) .map(KafkaServer::socketServer) .findFirst() @@ -262,6 +263,16 @@ public void stop() { } } + @Override + public void shutdownBroker(int brokerId) { + findBrokerOrThrow(brokerId).shutdown(); + } + + @Override + public void startBroker(int brokerId) { + findBrokerOrThrow(brokerId).startup(); + } + @Override public void rollingBrokerRestart() { if (!started.get()) { @@ -272,5 +283,25 @@ public void rollingBrokerRestart() { } clusterReference.get().restartDeadBrokers(true); } + + @Override + public void waitForReadyBrokers() throws InterruptedException { + org.apache.kafka.test.TestUtils.waitForCondition(() -> { + int numRegisteredBrokers = clusterReference.get().zkClient().getAllBrokersInCluster().size(); + return numRegisteredBrokers == config.numBrokers(); + }, "Timed out while waiting for brokers to become ready"); + } + + private KafkaServer findBrokerOrThrow(int brokerId) { + return servers() + .filter(server -> server.config().brokerId() == brokerId) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); + } + + private Stream servers() { + return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream(); + } + } } diff --git a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala index 9a21e7f8178b6..203e181a89f22 100644 --- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala +++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala @@ -17,19 +17,23 @@ package kafka.server +import java.io.{DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.ByteBuffer +import java.util.{Collections, Properties} + import kafka.network.SocketServer +import kafka.utils.Implicits._ import kafka.utils.{NotNothing, TestUtils} +import org.apache.kafka.clients.admin.{Admin, NewTopic} import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader} import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils -import java.io.{DataInputStream, DataOutputStream} -import java.net.Socket -import java.nio.ByteBuffer -import java.util.Properties import scala.annotation.nowarn +import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag object IntegrationTestUtils { @@ -101,6 +105,32 @@ object IntegrationTestUtils { finally socket.close() } + def createTopic( + admin: Admin, + topic: String, + numPartitions: Int, + replicationFactor: Short + ): Unit = { + val newTopics = Collections.singletonList(new NewTopic(topic, numPartitions, replicationFactor)) + val createTopicResult = admin.createTopics(newTopics) + createTopicResult.all().get() + } + + def createTopic( + admin: Admin, + topic: String, + replicaAssignment: Map[Int, Seq[Int]] + ): Unit = { + val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]() + replicaAssignment.forKeyValue { (partitionId, assignment) => + javaAssignment.put(partitionId, assignment.map(Int.box).asJava) + } + val newTopic = new NewTopic(topic, javaAssignment) + val newTopics = Collections.singletonList(newTopic) + val createTopicResult = admin.createTopics(newTopics) + createTopicResult.all().get() + } + protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT private var correlationId = 0 diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala new file mode 100644 index 0000000000000..eaef9367a1daa --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.common.AdminCommandFailedException +import org.apache.kafka.common.errors.TimeoutException +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.concurrent.duration._ + +/** + * For some error cases, we can save a little build time by avoiding the overhead for + * cluster creation and cleanup because the command is expected to fail immediately. + */ +class LeaderElectionCommandErrorTest { + + @Test + def testTopicWithoutPartition(): Unit = { + val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( + Array( + "--bootstrap-server", "nohost:9092", + "--election-type", "unclean", + "--topic", "some-topic" + ) + )) + assertTrue(e.getMessage.startsWith("Missing required option(s)")) + assertTrue(e.getMessage.contains(" partition")) + } + + @Test + def testPartitionWithoutTopic(): Unit = { + val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( + Array( + "--bootstrap-server", "nohost:9092", + "--election-type", "unclean", + "--all-topic-partitions", + "--partition", "0" + ) + )) + assertEquals("Option partition is only allowed if topic is used", e.getMessage) + } + + @Test + def testMissingElectionType(): Unit = { + val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( + Array( + "--bootstrap-server", "nohost:9092", + "--topic", "some-topic", + "--partition", "0" + ) + )) + assertTrue(e.getMessage.startsWith("Missing required option(s)")) + assertTrue(e.getMessage.contains(" election-type")) + } + + @Test + def testMissingTopicPartitionSelection(): Unit = { + val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( + Array( + "--bootstrap-server", "nohost:9092", + "--election-type", "preferred" + ) + )) + assertTrue(e.getMessage.startsWith("One and only one of the following options is required: ")) + assertTrue(e.getMessage.contains(" all-topic-partitions")) + assertTrue(e.getMessage.contains(" topic")) + assertTrue(e.getMessage.contains(" path-to-json-file")) + } + + @Test + def testInvalidBroker(): Unit = { + val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run( + Array( + "--bootstrap-server", "example.com:1234", + "--election-type", "unclean", + "--all-topic-partitions" + ), + 1.seconds + )) + assertTrue(e.getCause.isInstanceOf[TimeoutException]) + } +} diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index a7b91ea146a2f..b942f6f8ebad7 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -18,214 +18,176 @@ package kafka.admin import java.io.File import java.nio.charset.StandardCharsets -import java.nio.file.Files -import java.nio.file.Path +import java.nio.file.{Files, Path} import kafka.common.AdminCommandFailedException -import kafka.server.KafkaConfig -import kafka.server.KafkaServer +import kafka.server.IntegrationTestUtils.createTopic +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.{ClusterConfig, ClusterInstance} import kafka.utils.TestUtils -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} +import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.errors.UnknownTopicOrPartitionException import org.apache.kafka.common.network.ListenerName -import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{BeforeEach, Tag} -import scala.jdk.CollectionConverters._ -import scala.collection.Seq -import scala.concurrent.duration._ - -final class LeaderElectionCommandTest extends ZooKeeperTestHarness { +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3) +@Tag("integration") +final class LeaderElectionCommandTest(cluster: ClusterInstance) { import LeaderElectionCommandTest._ - var servers = Seq.empty[KafkaServer] val broker1 = 0 val broker2 = 1 val broker3 = 2 @BeforeEach - override def setUp(): Unit = { - super.setUp() - - val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) - servers = brokerConfigs.map { config => - config.setProperty("auto.leader.rebalance.enable", "false") - config.setProperty("controlled.shutdown.enable", "true") - config.setProperty("controlled.shutdown.max.retries", "1") - config.setProperty("controlled.shutdown.retry.backoff.ms", "1000") - TestUtils.createServer(KafkaConfig.fromProps(config)) - } + def setup(clusterConfig: ClusterConfig): Unit = { + TestUtils.verifyNoUnexpectedThreads("@BeforeEach") + clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true") + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1") + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000") + clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2") } - @AfterEach - override def tearDown(): Unit = { - TestUtils.shutdownServers(servers) - - super.tearDown() - } - - @Test + @ClusterTest def testAllTopicPartition(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) - - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + val client = cluster.createAdminClient() + val topic = "unclean-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) - val topicPartition = new TopicPartition(topic, partition) + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) - TestUtils.assertLeader(client, topicPartition, broker2) + val topicPartition = new TopicPartition(topic, partition) - servers(broker3).shutdown() - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - servers(broker2).shutdown() - TestUtils.assertNoLeader(client, topicPartition) - servers(broker3).startup() + TestUtils.assertLeader(client, topicPartition, broker2) + cluster.shutdownBroker(broker3) + TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) + cluster.shutdownBroker(broker2) + TestUtils.assertNoLeader(client, topicPartition) + cluster.startBroker(broker3) + TestUtils.waitForOnlineBroker(client, broker3) - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--all-topic-partitions" - ) + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--all-topic-partitions" ) + ) - TestUtils.assertLeader(client, topicPartition, broker3) - } + TestUtils.assertLeader(client, topicPartition, broker3) } - @Test + @ClusterTest def testTopicPartition(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) + val client = cluster.createAdminClient() + val topic = "unclean-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) - val topicPartition = new TopicPartition(topic, partition) + val topicPartition = new TopicPartition(topic, partition) - TestUtils.assertLeader(client, topicPartition, broker2) + TestUtils.assertLeader(client, topicPartition, broker2) - servers(broker3).shutdown() - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - servers(broker2).shutdown() - TestUtils.assertNoLeader(client, topicPartition) - servers(broker3).startup() + cluster.shutdownBroker(broker3) + TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) + cluster.shutdownBroker(broker2) + TestUtils.assertNoLeader(client, topicPartition) + cluster.startBroker(broker3) + TestUtils.waitForOnlineBroker(client, broker3) - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--topic", topic, - "--partition", partition.toString - ) + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--topic", topic, + "--partition", partition.toString ) + ) - TestUtils.assertLeader(client, topicPartition, broker3) - } + TestUtils.assertLeader(client, topicPartition, broker3) } - @Test + @ClusterTest def testPathToJsonFile(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) + val client = cluster.createAdminClient() + val topic = "unclean-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) - val topicPartition = new TopicPartition(topic, partition) + val topicPartition = new TopicPartition(topic, partition) - TestUtils.assertLeader(client, topicPartition, broker2) + TestUtils.assertLeader(client, topicPartition, broker2) - servers(broker3).shutdown() - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - servers(broker2).shutdown() - TestUtils.assertNoLeader(client, topicPartition) - servers(broker3).startup() + cluster.shutdownBroker(broker3) + TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) + cluster.shutdownBroker(broker2) + TestUtils.assertNoLeader(client, topicPartition) + cluster.startBroker(broker3) + TestUtils.waitForOnlineBroker(client, broker3) - val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition)) + val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition)) - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--path-to-json-file", topicPartitionPath.toString - ) + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--path-to-json-file", topicPartitionPath.toString ) + ) - TestUtils.assertLeader(client, topicPartition, broker3) - } + TestUtils.assertLeader(client, topicPartition, broker3) } - @Test + @ClusterTest def testPreferredReplicaElection(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) + val client = cluster.createAdminClient() + val topic = "preferred-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) - val topicPartition = new TopicPartition(topic, partition) + val topicPartition = new TopicPartition(topic, partition) - TestUtils.assertLeader(client, topicPartition, broker2) + TestUtils.assertLeader(client, topicPartition, broker2) - servers(broker2).shutdown() - TestUtils.assertLeader(client, topicPartition, broker3) - servers(broker2).startup() - TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2)) + cluster.shutdownBroker(broker2) + TestUtils.assertLeader(client, topicPartition, broker3) + cluster.startBroker(broker2) + TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2)) - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "preferred", - "--all-topic-partitions" - ) - ) - - TestUtils.assertLeader(client, topicPartition, broker2) - } - } - - @Test - def testTopicWithoutPartition(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( + LeaderElectionCommand.main( Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--topic", "some-topic" + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "preferred", + "--all-topic-partitions" ) - )) - assertTrue(e.getMessage.startsWith("Missing required option(s)")) - assertTrue(e.getMessage.contains(" partition")) - } + ) - @Test - def testPartitionWithoutTopic(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--all-topic-partitions", - "--partition", "0" - ) - )) - assertEquals("Option partition is only allowed if topic is used", e.getMessage) + TestUtils.assertLeader(client, topicPartition, broker2) } - @Test + @ClusterTest def testTopicDoesNotExist(): Unit = { val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main( Array( - "--bootstrap-server", bootstrapServers(servers), + "--bootstrap-server", cluster.bootstrapServers(), "--election-type", "preferred", "--topic", "unknown-topic-name", "--partition", "0" @@ -234,86 +196,55 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness { assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException]) } - @Test - def testMissingElectionType(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--topic", "some-topic", - "--partition", "0" - ) + @ClusterTest + def testElectionResultOutput(): Unit = { + val client = cluster.createAdminClient() + val topic = "non-preferred-topic" + val partition0 = 0 + val partition1 = 1 + val assignment0 = Seq(broker2, broker3) + val assignment1 = Seq(broker3, broker2) + + cluster.waitForReadyBrokers() + createTopic(client, topic, Map( + partition0 -> assignment0, + partition1 -> assignment1 )) - assertTrue(e.getMessage.startsWith("Missing required option(s)")) - assertTrue(e.getMessage.contains(" election-type")) - } - @Test - def testMissingTopicPartitionSelection(): Unit = { - val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "preferred" - ) - )) - assertTrue(e.getMessage.startsWith("One and only one of the following options is required: ")) - assertTrue(e.getMessage.contains(" all-topic-partitions")) - assertTrue(e.getMessage.contains(" topic")) - assertTrue(e.getMessage.contains(" path-to-json-file")) - } + val topicPartition0 = new TopicPartition(topic, partition0) + val topicPartition1 = new TopicPartition(topic, partition1) - @Test - def testInvalidBroker(): Unit = { - val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run( - Array( - "--bootstrap-server", "example.com:1234", - "--election-type", "unclean", - "--all-topic-partitions" - ), - 1.seconds - )) - assertTrue(e.getCause.isInstanceOf[TimeoutException]) - } + TestUtils.assertLeader(client, topicPartition0, broker2) + TestUtils.assertLeader(client, topicPartition1, broker3) - @Test - def testElectionResultOutput(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "non-preferred-topic" - val partition0 = 0 - val partition1 = 1 - val assignment0 = Seq(broker2, broker3) - val assignment1 = Seq(broker3, broker2) - - TestUtils.createTopic(zkClient, topic, Map(partition0 -> assignment0, partition1 -> assignment1), servers) - - val topicPartition0 = new TopicPartition(topic, partition0) - val topicPartition1 = new TopicPartition(topic, partition1) - - TestUtils.assertLeader(client, topicPartition0, broker2) - TestUtils.assertLeader(client, topicPartition1, broker3) - - servers(broker2).shutdown() - TestUtils.assertLeader(client, topicPartition0, broker3) - servers(broker2).startup() - TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2)) - TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2)) - - val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1)) - val output = TestUtils.grabConsoleOutput( - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "preferred", - "--path-to-json-file", topicPartitionPath.toString - ) + cluster.shutdownBroker(broker2) + TestUtils.assertLeader(client, topicPartition0, broker3) + cluster.startBroker(broker2) + TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2)) + TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2)) + + val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1)) + val output = TestUtils.grabConsoleOutput( + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "preferred", + "--path-to-json-file", topicPartitionPath.toString ) ) + ) + + val electionResultOutputIter = output.split("\n").iterator + + assertTrue(electionResultOutputIter.hasNext) + val firstLine = electionResultOutputIter.next() + assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"), + s"Unexpected output: $firstLine") - val electionResultOutputIter = output.split("\n").iterator - assertTrue(electionResultOutputIter.hasNext) - assertTrue(electionResultOutputIter.next().contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0")) - assertTrue(electionResultOutputIter.hasNext) - assertTrue(electionResultOutputIter.next().contains(s"Valid replica already elected for partitions $topicPartition1")) - } + assertTrue(electionResultOutputIter.hasNext) + val secondLine = electionResultOutputIter.next() + assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"), + s"Unexpected output: $secondLine") } } diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 768c06a9ec3b2..318e17eb0f7f9 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -19,43 +19,40 @@ package kafka.server import java.net.InetAddress import java.util +import java.util.Collections.singletonList import java.util.Properties -import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutionException +import java.util.concurrent.{CompletableFuture, ExecutionException} + import kafka.network.RequestChannel import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers import kafka.test.MockController -import kafka.utils.MockTime +import kafka.utils.{MockTime, NotNothing} import org.apache.kafka.clients.admin.AlterConfigOp -import org.apache.kafka.common.Uuid import org.apache.kafka.common.Uuid.ZERO_UUID +import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors._ import org.apache.kafka.common.memory.MemoryPool +import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource, AlterConfigsResourceCollection => OldAlterConfigsResourceCollection, AlterableConfig => OldAlterableConfig, AlterableConfigCollection => OldAlterableConfigCollection} +import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse} import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.CreateTopicsRequestData -import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic -import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection +import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult -import org.apache.kafka.common.message._ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection} -import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => OldAlterConfigsResourceCollection} -import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource} -import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => OldAlterableConfigCollection} -import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => OldAlterableConfig} -import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse +import org.apache.kafka.common.message.{CreateTopicsRequestData, _} import org.apache.kafka.common.network.{ClientInformation, ListenerName} -import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.protocol.ApiMessage import org.apache.kafka.common.protocol.Errors._ +import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ +import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.{ElectionType, Uuid} import org.apache.kafka.controller.Controller import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.ApiMessageAndVersion @@ -65,7 +62,9 @@ import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag class ControllerApisTest { private val nodeId = 1 @@ -136,12 +135,11 @@ class ControllerApisTest { def createDenyAllAuthorizer(): Authorizer = { val authorizer = mock(classOf[Authorizer]) - mock(classOf[Authorizer]) when(authorizer.authorize( any(classOf[AuthorizableRequestContext]), any(classOf[java.util.List[Action]]) )).thenReturn( - java.util.Collections.singletonList(AuthorizationResult.DENIED) + singletonList(AuthorizationResult.DENIED) ) authorizer } @@ -732,6 +730,79 @@ class ControllerApisTest { controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet) } + @Test + def testElectLeadersAuthorization(): Unit = { + val authorizer = mock(classOf[Authorizer]) + val controller = mock(classOf[Controller]) + val controllerApis = createControllerApis(Some(authorizer), controller) + + val request = new ElectLeadersRequest.Builder( + ElectionType.PREFERRED, + null, + 30000 + ).build() + + val resource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) + val actions = singletonList(new Action(AclOperation.ALTER, resource, 1, true, true)) + + when(authorizer.authorize( + any[RequestContext], + ArgumentMatchers.eq(actions) + )).thenReturn(singletonList(AuthorizationResult.DENIED)) + + val response = handleRequest[ElectLeadersResponse](request, controllerApis) + assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, Errors.forCode(response.data.errorCode)) + } + + @Test + def testElectLeadersHandledByController(): Unit = { + val controller = mock(classOf[Controller]) + val controllerApis = createControllerApis(None, controller) + + val request = new ElectLeadersRequest.Builder( + ElectionType.PREFERRED, + null, + 30000 + ).build() + + val responseData = new ElectLeadersResponseData() + .setErrorCode(Errors.NOT_CONTROLLER.code) + + when(controller.electLeaders( + request.data + )).thenReturn(CompletableFuture.completedFuture(responseData)) + + val response = handleRequest[ElectLeadersResponse](request, controllerApis) + assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(response.data.errorCode)) + } + + private def handleRequest[T <: AbstractResponse]( + request: AbstractRequest, + controllerApis: ControllerApis + )( + implicit classTag: ClassTag[T], + @nowarn("cat=unused") nn: NotNothing[T] + ): T = { + val req = buildRequest(request) + + controllerApis.handle(req, RequestLocal.NoCaching) + + val capturedResponse: ArgumentCaptor[AbstractResponse] = + ArgumentCaptor.forClass(classOf[AbstractResponse]) + verify(requestChannel).sendResponse( + ArgumentMatchers.eq(req), + capturedResponse.capture(), + ArgumentMatchers.eq(None) + ) + + capturedResponse.getValue match { + case response: T => response + case response => + throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, " + + s"but found ${response.getClass}") + } + } + @AfterEach def tearDown(): Unit = { quotas.shutdown() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 1ddd63a854c9d..85285116efde8 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -23,6 +23,7 @@ import java.util import java.util.Arrays.asList import java.util.concurrent.TimeUnit import java.util.{Collections, Optional, Properties, Random} + import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr} import kafka.cluster.{Broker, Partition} import kafka.controller.{ControllerContext, KafkaController} @@ -68,7 +69,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils} -import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} +import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.easymock.EasyMock._ import org.easymock.{Capture, EasyMock, IAnswer} @@ -468,6 +469,12 @@ class KafkaApisTest { testForwardableApi(ApiKeys.ALTER_CONFIGS, requestBuilder) } + @Test + def testElectLeadersForwarding(): Unit = { + val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000) + testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder) + } + @Test def testDescribeQuorumNotAllowedForZkClusters(): Unit = { val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition) @@ -495,6 +502,18 @@ class KafkaApisTest { ) } + private def testKraftForwarding( + apiKey: ApiKeys, + requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest] + ): Unit = { + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + testForwardableApi( + createKafkaApis(enableForwarding = true, raftSupport = true), + apiKey, + requestBuilder + ) + } + private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = { testForwardableApi( createKafkaApis(enableForwarding = true), @@ -511,7 +530,8 @@ class KafkaApisTest { val topicHeader = new RequestHeader(apiKey, apiKey.latestVersion, clientId, 0) - val request = buildRequest(requestBuilder.build(topicHeader.apiVersion)) + val apiRequest = requestBuilder.build(topicHeader.apiVersion) + val request = buildRequest(apiRequest) if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) { // The controller check only makes sense for ZK clusters. For KRaft, @@ -520,18 +540,28 @@ class KafkaApisTest { EasyMock.expect(controller.isActive).andReturn(false) } - expectNoThrottling(request) + val capturedResponse = expectNoThrottling(request) + val forwardCallback: Capture[Option[AbstractResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(forwardingManager.forwardRequest( EasyMock.eq(request), - anyObject[Option[AbstractResponse] => Unit]() + EasyMock.capture(forwardCallback) )).once() EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager) kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) + assertNotNull(request.buffer, "The buffer was unexpectedly deallocated after " + + s"`handle` returned (is $apiKey marked as forwardable in `ApiKeys`?)") + + val expectedResponse = apiRequest.getErrorResponse(Errors.NOT_CONTROLLER.exception) + assertTrue(forwardCallback.hasCaptured) + forwardCallback.getValue.apply(Some(expectedResponse)) - EasyMock.verify(controller, forwardingManager) + assertTrue(capturedResponse.hasCaptured) + assertEquals(expectedResponse, capturedResponse.getValue) + + EasyMock.verify(controller, requestChannel, forwardingManager) } private def authorizeResource(authorizer: Authorizer, @@ -3980,13 +4010,13 @@ class KafkaApisTest { request } - private def verifyShouldNeverHandle(handler: RequestChannel.Request => Unit): Unit = { + private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = { val request = createMockRequest() val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request)) assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage) } - private def verifyShouldAlwaysForward(handler: RequestChannel.Request => Unit): Unit = { + private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = { val request = createMockRequest() val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request)) assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage) @@ -3995,126 +4025,132 @@ class KafkaApisTest { @Test def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest) + verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest) } @Test def testRaftShouldNeverHandleStopReplicaRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest) + verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest) } @Test def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching)) + verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching)) } @Test def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest) + verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest) } @Test def testRaftShouldNeverHandleAlterIsrRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest) + verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterIsrRequest) } @Test def testRaftShouldNeverHandleEnvelope(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching)) + verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching)) } @Test def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest) } @Test def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest) } @Test def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest) } @Test def testRaftShouldAlwaysForwardCreateAcls(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls) } @Test def testRaftShouldAlwaysForwardDeleteAcls(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls) } @Test def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterConfigsRequest) } @Test def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest) } @Test def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest) } @Test def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequest) } @Test def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequest) } @Test def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequest) } @Test def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest) } @Test def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest) } @Test def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures) + } + + @Test + def testRaftShouldAlwaysForwardElectLeaders(): Unit = { + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders) } @Test def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest) + verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 64003b79cff2b..06c7569f33dfe 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -17,6 +17,7 @@ package kafka.utils import java.io._ +import java.net.InetAddress import java.nio._ import java.nio.channels._ import java.nio.charset.{Charset, StandardCharsets} @@ -26,12 +27,12 @@ import java.time.Duration import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit} import java.util.{Arrays, Collections, Optional, Properties} -import com.yammer.metrics.core.Meter +import com.yammer.metrics.core.Meter import javax.net.ssl.X509TrustManager import kafka.api._ import kafka.cluster.{Broker, EndPoint, IsrChangeListener} -import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.controller.{ControllerEventManager, LeaderIsrAndControllerEpoch} import kafka.log._ import kafka.metrics.KafkaYammerMetrics import kafka.network.RequestChannel @@ -44,6 +45,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.consumer.internals.AbstractCoordinator import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter} import org.apache.kafka.common.config.ConfigResource @@ -53,9 +55,9 @@ import org.apache.kafka.common.header.Header import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode} import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, RequestContext, RequestHeader} @@ -73,14 +75,13 @@ import org.apache.zookeeper.data.ACL import org.junit.jupiter.api.Assertions._ import org.mockito.Mockito -import java.net.InetAddress - import scala.annotation.nowarn import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.{Map, Seq, mutable} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try} /** * Utility functions to help with testing @@ -1625,19 +1626,37 @@ object TestUtils extends Logging { waitForLeaderToBecome(client, topicPartition, None) } - def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = { + def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = { + waitUntilTrue(() => { + val nodes = client.describeCluster().nodes().get() + nodes.asScala.exists(_.id == brokerId) + }, s"Timed out waiting for brokerId $brokerId to come online") + } + + def waitForLeaderToBecome( + client: Admin, + topicPartition: TopicPartition, + expectedLeaderOpt: Option[Int] + ): Unit = { val topic = topicPartition.topic - val partition = topicPartition.partition + val partitionId = topicPartition.partition + + def currentLeader: Try[Option[Int]] = Try { + val topicDescription = client.describeTopics(List(topic).asJava).allTopicNames.get.get(topic) + topicDescription.partitions.asScala + .find(_.partition == partitionId) + .flatMap(partitionState => Option(partitionState.leader)) + .map(_.id) + } - waitUntilTrue(() => { - try { - val topicResult = client.describeTopics(Arrays.asList(topic)).allTopicNames.get.get(topic) - val partitionResult = topicResult.partitions.get(partition) - Option(partitionResult.leader).map(_.id) == leader - } catch { - case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false - } - }, "Timed out waiting for leader metadata") + val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) { + case Success(leaderOpt) => leaderOpt == expectedLeaderOpt + case Failure(e: ExecutionException) if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false + case Failure(e) => throw e + } + + assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " + + s"Last metadata lookup returned leader = ${lastLeaderCheck.getOrElse("unknown")}") } def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = { @@ -1652,7 +1671,7 @@ object TestUtils extends Logging { brokerIds.intersect(isr).isEmpty }, - s"Expected brokers $brokerIds to no longer in the ISR for $partition" + s"Expected brokers $brokerIds to no longer be in the ISR for $partition" ) } @@ -1917,4 +1936,28 @@ object TestUtils extends Logging { ) } + def verifyNoUnexpectedThreads(context: String): Unit = { + // Threads which may cause transient failures in subsequent tests if not shutdown. + // These include threads which make connections to brokers and may cause issues + // when broker ports are reused (e.g. auto-create topics) as well as threads + // which reset static JAAS configuration. + val unexpectedThreadNames = Set( + ControllerEventManager.ControllerEventThreadName, + KafkaProducer.NETWORK_THREAD_PREFIX, + AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(), + AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, + ZooKeeperTestHarness.ZkClientEventThreadSuffix + ) + + def unexpectedThreads: Set[String] = { + val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName) + allThreads.filter(t => unexpectedThreadNames.exists(s => t.contains(s))).toSet + } + + val (unexpected, _) = TestUtils.computeUntilTrue(unexpectedThreads)(_.isEmpty) + assertTrue(unexpected.isEmpty, + s"Found ${unexpected.size} unexpected threads during $context: " + + s"${unexpected.mkString("`", ",", "`")}") + } + } diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 8b61c0e7a1dba..a81cbb9a435d9 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,19 +19,12 @@ package kafka.zk import javax.security.auth.login.Configuration import kafka.utils.{CoreUtils, Logging, TestUtils} -import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag} -import org.junit.jupiter.api.Assertions._ import org.apache.kafka.common.security.JaasUtils - -import scala.collection.Set -import scala.jdk.CollectionConverters._ -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.consumer.internals.AbstractCoordinator -import kafka.controller.ControllerEventManager -import org.apache.kafka.clients.admin.AdminClientUnitTestEnv import org.apache.kafka.common.utils.Time import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag} @Tag("integration") abstract class ZooKeeperTestHarness extends Logging { @@ -87,16 +80,6 @@ abstract class ZooKeeperTestHarness extends Logging { object ZooKeeperTestHarness { val ZkClientEventThreadSuffix = "-EventThread" - // Threads which may cause transient failures in subsequent tests if not shutdown. - // These include threads which make connections to brokers and may cause issues - // when broker ports are reused (e.g. auto-create topics) as well as threads - // which reset static JAAS configuration. - val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName, - KafkaProducer.NETWORK_THREAD_PREFIX, - AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(), - AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, - ZkClientEventThreadSuffix) - /** * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread. * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass, @@ -104,7 +87,7 @@ object ZooKeeperTestHarness { */ @BeforeAll def setUpClass(): Unit = { - verifyNoUnexpectedThreads("@BeforeClass") + TestUtils.verifyNoUnexpectedThreads("@BeforeAll") } /** @@ -112,19 +95,7 @@ object ZooKeeperTestHarness { */ @AfterAll def tearDownClass(): Unit = { - verifyNoUnexpectedThreads("@AfterClass") + TestUtils.verifyNoUnexpectedThreads("@AfterAll") } - /** - * Verifies that threads which are known to cause transient failures in subsequent tests - * have been shutdown. - */ - def verifyNoUnexpectedThreads(context: String): Unit = { - def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName) - val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads => - threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s))) - } - assertTrue(noUnexpected, s"Found unexpected threads during $context, allThreads=$threads, " + - s"unexpected=${threads.filterNot(t => unexpectedThreadNames.forall(s => !t.contains(s)))}") - } } diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index a0eb1eab66d72..5cfb9ee29cb0a 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -25,6 +25,7 @@ import scala.collection.Seq import com.yammer.metrics.core.{Gauge, Meter, MetricName} import kafka.server.KafkaConfig import kafka.metrics.KafkaYammerMetrics +import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Time @@ -33,7 +34,7 @@ import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} import org.apache.zookeeper.ZooKeeper.States import org.apache.zookeeper.client.ZKClientConfig import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs} -import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertTrue, assertThrows, fail} +import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertThrows, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import scala.jdk.CollectionConverters._ @@ -46,7 +47,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @BeforeEach override def setUp(): Unit = { - ZooKeeperTestHarness.verifyNoUnexpectedThreads("@BeforeEach") + TestUtils.verifyNoUnexpectedThreads("@BeforeEach") cleanMetricsRegistry() super.setUp() zooKeeperClient = newZooKeeperClient() @@ -58,7 +59,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { zooKeeperClient.close() super.tearDown() System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) - ZooKeeperTestHarness.verifyNoUnexpectedThreads("@AfterEach") + TestUtils.verifyNoUnexpectedThreads("@AfterEach") } @Test diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index c04a8fb27ab05..5a63094cc5dde 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -17,8 +17,6 @@ package org.apache.kafka.controller; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException; import org.apache.kafka.common.errors.StaleBrokerEpochException; @@ -35,11 +33,11 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FeatureMapAndEpoch; import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.slf4j.Logger; @@ -52,7 +50,9 @@ import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD; diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java index 5092ba164bae4..cf0f6bfd609af 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java @@ -22,6 +22,8 @@ import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.Replicas; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -38,6 +40,8 @@ * PartitionChangeBuilder handles changing partition registrations. */ public class PartitionChangeBuilder { + private static final Logger log = LoggerFactory.getLogger(PartitionChangeBuilder.class); + public static boolean changeRecordIsNoOp(PartitionChangeRecord record) { if (record.isr() != null) return false; if (record.leader() != NO_LEADER_CHANGE) return false; @@ -141,12 +145,15 @@ class BestLeader { private void tryElection(PartitionChangeRecord record) { BestLeader bestLeader = new BestLeader(); if (bestLeader.node != partition.leader) { + log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node); record.setLeader(bestLeader.node); if (bestLeader.unclean) { // If the election was unclean, we have to forcibly set the ISR to just the // new leader. This can result in data loss! record.setIsr(Collections.singletonList(bestLeader.node)); } + } else { + log.debug("Failed to find a new leader with current state: {}", this); } } @@ -240,4 +247,20 @@ public Optional build() { PARTITION_CHANGE_RECORD.highestSupportedVersion())); } } + + @Override + public String toString() { + return "PartitionChangeBuilder(" + + "partition=" + partition + + ", topicId=" + topicId + + ", partitionId=" + partitionId + + ", isAcceptableLeader=" + isAcceptableLeader + + ", uncleanElectionOk=" + uncleanElectionOk + + ", targetIsr=" + targetIsr + + ", targetReplicas=" + targetReplicas + + ", targetRemoving=" + targetRemoving + + ", targetAdding=" + targetAdding + + ", alwaysElectPreferredIfPossible=" + alwaysElectPreferredIfPossible + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 3066f4b21c003..4450e252bd9be 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -132,6 +132,14 @@ static class TopicControlInfo { this.id = id; this.parts = new TimelineHashMap<>(snapshotRegistry, 0); } + + public String name() { + return name; + } + + public Uuid topicId() { + return id; + } } private final SnapshotRegistry snapshotRegistry; @@ -586,6 +594,11 @@ PartitionRegistration getPartition(Uuid topicId, int partitionId) { return topic.parts.get(partitionId); } + // VisibleForTesting + TopicControlInfo getTopic(Uuid topicId) { + return topics.get(topicId); + } + // VisibleForTesting BrokersToIsrs brokersToIsrs() { return brokersToIsrs; @@ -787,7 +800,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List electLeaders(ElectLeadersRequestData request) { - boolean uncleanOk = electionTypeIsUnclean(request.electionType()); + ElectionType electionType = electionType(request.electionType()); List records = new ArrayList<>(); ElectLeadersResponseData response = new ElectLeadersResponseData(); if (request.topicPartitions() == null) { @@ -804,11 +817,16 @@ ControllerResult electLeaders(ElectLeadersRequestData TopicControlInfo topic = topics.get(topicEntry.getValue()); if (topic != null) { for (int partitionId : topic.parts.keySet()) { - ApiError error = electLeader(topicName, partitionId, uncleanOk, records); - topicResults.partitionResult().add(new PartitionResult(). - setPartitionId(partitionId). - setErrorCode(error.error().code()). - setErrorMessage(error.message())); + ApiError error = electLeader(topicName, partitionId, electionType, records); + + // When electing leaders for all partitions, we do not return + // partitions which already have the desired leader. + if (error.error() != Errors.ELECTION_NOT_NEEDED) { + topicResults.partitionResult().add(new PartitionResult(). + setPartitionId(partitionId). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + } } } } @@ -818,7 +836,7 @@ ControllerResult electLeaders(ElectLeadersRequestData new ReplicaElectionResult().setTopic(topic.topic()); response.replicaElectionResults().add(topicResults); for (int partitionId : topic.partitions()) { - ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records); + ApiError error = electLeader(topic.topic(), partitionId, electionType, records); topicResults.partitionResult().add(new PartitionResult(). setPartitionId(partitionId). setErrorCode(error.error().code()). @@ -829,17 +847,15 @@ ControllerResult electLeaders(ElectLeadersRequestData return ControllerResult.of(records, response); } - static boolean electionTypeIsUnclean(byte electionType) { - ElectionType type; + private static ElectionType electionType(byte electionType) { try { - type = ElectionType.valueOf(electionType); + return ElectionType.valueOf(electionType); } catch (IllegalArgumentException e) { throw new InvalidRequestException("Unknown election type " + (int) electionType); } - return type == ElectionType.UNCLEAN; } - ApiError electLeader(String topic, int partitionId, boolean uncleanOk, + ApiError electLeader(String topic, int partitionId, ElectionType electionType, List records) { Uuid topicId = topicsByName.get(topic); if (topicId == null) { @@ -856,21 +872,24 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk, return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such partition as " + topic + "-" + partitionId); } + if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader()) + || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) { + return new ApiError(Errors.ELECTION_NOT_NEEDED); + } + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, partitionId, r -> clusterControl.unfenced(r), - () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic)); - builder.setAlwaysElectPreferredIfPossible(true); + () -> electionType == ElectionType.UNCLEAN); + + builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED); Optional record = builder.build(); if (!record.isPresent()) { - if (partition.leader == NO_LEADER) { - // If we can't find any leader for the partition, return an error. - return new ApiError(Errors.LEADER_NOT_AVAILABLE, - "Unable to find any leader for the partition."); + if (electionType == ElectionType.PREFERRED) { + return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE); } else { - // There is nothing to do. - return ApiError.NONE; + return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE); } } records.add(record.get()); diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java index e159732e3eeea..4b574b3ada5dd 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java @@ -81,4 +81,11 @@ public ClientQuotasImage apply() { } return new ClientQuotasImage(newEntities); } + + @Override + public String toString() { + return "ClientQuotasDelta(" + + "changes=" + changes + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java index 90f61d7888c0a..6c48b8ecde575 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java @@ -126,4 +126,11 @@ public ClusterImage apply() { } return new ClusterImage(newBrokers); } + + @Override + public String toString() { + return "ClusterDelta(" + + "changedBrokers=" + changedBrokers + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java index 4b3aa0b67c7fc..677f764b831c5 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java @@ -74,4 +74,13 @@ public ConfigurationImage apply() { } return new ConfigurationImage(newData); } + + @Override + public String toString() { + // Values are intentionally left out of this so that sensitive configs + // do not end up in logging by mistake. + return "ConfigurationDelta(" + + "changedKeys=" + changes.keySet() + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java index df3b0ff5e8d98..d0f5848770e41 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java @@ -96,4 +96,11 @@ public ConfigurationsImage apply() { } return new ConfigurationsImage(newData); } + + @Override + public String toString() { + return "ConfigurationsDelta(" + + "changes=" + changes + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java index 940697eea602f..781c496f19b6e 100644 --- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java @@ -83,4 +83,11 @@ public FeaturesImage apply() { } return new FeaturesImage(newFinalizedVersions); } + + @Override + public String toString() { + return "FeaturesDelta(" + + "changes=" + changes + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index 6be0dd6cb9d91..4b9451b07a583 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -256,4 +256,15 @@ public MetadataImage apply() { return new MetadataImage(newFeatures, newCluster, newTopics, newConfigs, newClientQuotas); } + + @Override + public String toString() { + return "MetadataDelta(" + + "featuresDelta=" + featuresDelta + + ", clusterDelta=" + clusterDelta + + ", topicsDelta=" + topicsDelta + + ", configsDelta=" + configsDelta + + ", clientQuotasDelta=" + clientQuotasDelta + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java index 88fef3011473f..3214a0cfc6af6 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java @@ -138,4 +138,11 @@ public LocalReplicaChanges localChanges(int brokerId) { return new LocalReplicaChanges(deletes, leaders, followers); } + + @Override + public String toString() { + return "TopicDelta(" + + "partitionChanges=" + partitionChanges + + ')'; + } } diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java index a146bba8cced6..f9d8087879ba8 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java @@ -201,4 +201,12 @@ public LocalReplicaChanges localChanges(int brokerId) { return new LocalReplicaChanges(deletes, leaders, followers); } + + @Override + public String toString() { + return "TopicsDelta(" + + "changedTopics=" + changedTopics + + ", deletedTopicIds=" + deletedTopicIds + + ')'; + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index 1b013f4ee5a95..f935a808f0b69 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -107,6 +107,14 @@ public void testShouldTryElection() { shouldTryElection()); assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)). shouldTryElection()); + + assertTrue(createFooBuilder(true) + .setTargetIsr(Arrays.asList(3)) + .shouldTryElection()); + assertTrue(createFooBuilder(true) + .setTargetIsr(Arrays.asList(4)) + .setTargetReplicas(Arrays.asList(2, 1, 3, 4)) + .shouldTryElection()); } private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder, diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 09449d36d774f..78b0995a24c0c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.controller; import java.util.Optional; +import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; @@ -37,12 +38,12 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment; import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic; import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult; +import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; -import org.apache.kafka.common.message.CreateTopicsRequestData; -import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCollection; @@ -50,9 +51,10 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics; +import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; -import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; +import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -63,20 +65,21 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.Replicas; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.metadata.BrokerHeartbeatReply; -import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -84,18 +87,26 @@ import java.util.Map; import java.util.OptionalInt; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED; +import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH; import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS; import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT; import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION; import static org.apache.kafka.common.protocol.Errors.NONE; import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS; +import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -117,7 +128,7 @@ private static class ReplicationControlTestContext { final MockTime time = new MockTime(); final MockRandom random = new MockRandom(); final ClusterControlManager clusterControl = new ClusterControlManager( - logContext, time, snapshotRegistry, BROKER_SESSION_TIMEOUT_MS, + logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS), new StripedReplicaPlacer(random)); final ControllerMetrics metrics = new MockControllerMetrics(); final ConfigurationControlManager configurationControl = new ConfigurationControlManager( @@ -200,7 +211,45 @@ void registerBrokers(Integer... brokerIds) throws Exception { } } - void unfenceBrokers(Integer... brokerIds) throws Exception { + void alterIsr( + TopicIdPartition topicIdPartition, + int leaderId, + List isr + ) throws Exception { + BrokerRegistration registration = clusterControl.brokerRegistrations().get(leaderId); + assertFalse(registration.fenced()); + + PartitionRegistration partition = replicationControl.getPartition( + topicIdPartition.topicId(), + topicIdPartition.partitionId() + ); + assertNotNull(partition); + assertEquals(leaderId, partition.leader); + + PartitionData partitionData = new PartitionData() + .setPartitionIndex(topicIdPartition.partitionId()) + .setCurrentIsrVersion(partition.partitionEpoch) + .setLeaderEpoch(partition.leaderEpoch) + .setNewIsr(isr); + + String topicName = replicationControl.getTopic(topicIdPartition.topicId()).name(); + TopicData topicData = new TopicData() + .setName(topicName) + .setPartitions(singletonList(partitionData)); + + ControllerResult alterIsr = replicationControl.alterIsr( + new AlterIsrRequestData() + .setBrokerId(leaderId) + .setBrokerEpoch(registration.epoch()) + .setTopics(singletonList(topicData))); + replay(alterIsr.records()); + } + + void unfenceBrokers(Integer... brokerIds) throws Exception { + unfenceBrokers(Utils.mkSet(brokerIds)); + } + + void unfenceBrokers(Set brokerIds) throws Exception { for (int brokerId : brokerIds) { ControllerResult result = replicationControl. processBrokerHeartbeat(new BrokerHeartbeatRequestData(). @@ -213,6 +262,19 @@ void unfenceBrokers(Integer... brokerIds) throws Exception { } } + void alterTopicConfig( + String topic, + String configKey, + String configValue + ) throws Exception { + ConfigRecord configRecord = new ConfigRecord() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName(topic) + .setName(configKey) + .setValue(configValue); + replay(singletonList(new ApiMessageAndVersion(configRecord, (short) 0))); + } + void fenceBrokers(Set brokerIds) throws Exception { time.sleep(BROKER_SESSION_TIMEOUT_MS); @@ -284,10 +346,10 @@ public void testCreateTopics() throws Exception { setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage())); assertEquals(expectedResponse3, result3.response()); Uuid fooId = result2.response().topics().find("foo").topicId(); - RecordTestUtils.assertBatchIteratorContains(Arrays.asList( - Arrays.asList(new ApiMessageAndVersion(new PartitionRecord(). + RecordTestUtils.assertBatchIteratorContains(asList( + asList(new ApiMessageAndVersion(new PartitionRecord(). setPartitionId(0).setTopicId(fooId). - setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)). + setReplicas(asList(1, 2, 0)).setIsr(asList(1, 2, 0)). setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1). setLeaderEpoch(0).setPartitionEpoch(0), (short) 0), new ApiMessageAndVersion(new TopicRecord(). @@ -450,7 +512,7 @@ public void testShrinkAndExpandIsr() throws Exception { assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition)); long brokerEpoch = ctx.currentBrokerEpoch(0); PartitionData shrinkIsrRequest = newAlterIsrPartition( - replicationControl, topicIdPartition, Arrays.asList(0, 1)); + replicationControl, topicIdPartition, asList(0, 1)); ControllerResult shrinkIsrResult = sendAlterIsr( replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest); AlterIsrResponseData.PartitionData shrinkIsrResponse = assertAlterIsrResponse( @@ -458,7 +520,7 @@ public void testShrinkAndExpandIsr() throws Exception { assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse); PartitionData expandIsrRequest = newAlterIsrPartition( - replicationControl, topicIdPartition, Arrays.asList(0, 1, 2)); + replicationControl, topicIdPartition, asList(0, 1, 2)); ControllerResult expandIsrResult = sendAlterIsr( replicationControl, 0, brokerEpoch, "foo", expandIsrRequest); AlterIsrResponseData.PartitionData expandIsrResponse = assertAlterIsrResponse( @@ -482,7 +544,7 @@ public void testInvalidAlterIsrRequests() throws Exception { // Invalid leader PartitionData invalidLeaderRequest = newAlterIsrPartition( - replicationControl, topicIdPartition, Arrays.asList(0, 1)); + replicationControl, topicIdPartition, asList(0, 1)); ControllerResult invalidLeaderResult = sendAlterIsr( replicationControl, 1, ctx.currentBrokerEpoch(1), "foo", invalidLeaderRequest); @@ -490,13 +552,13 @@ public void testInvalidAlterIsrRequests() throws Exception { // Stale broker epoch PartitionData invalidBrokerEpochRequest = newAlterIsrPartition( - replicationControl, topicIdPartition, Arrays.asList(0, 1)); + replicationControl, topicIdPartition, asList(0, 1)); assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr( replicationControl, 0, brokerEpoch - 1, "foo", invalidBrokerEpochRequest)); // Invalid leader epoch PartitionData invalidLeaderEpochRequest = newAlterIsrPartition( - replicationControl, topicIdPartition, Arrays.asList(0, 1)); + replicationControl, topicIdPartition, asList(0, 1)); invalidLeaderEpochRequest.setLeaderEpoch(500); ControllerResult invalidLeaderEpochResult = sendAlterIsr( replicationControl, 1, ctx.currentBrokerEpoch(1), @@ -505,8 +567,8 @@ public void testInvalidAlterIsrRequests() throws Exception { // Invalid ISR (3 is not a valid replica) PartitionData invalidIsrRequest1 = newAlterIsrPartition( - replicationControl, topicIdPartition, Arrays.asList(0, 1)); - invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3)); + replicationControl, topicIdPartition, asList(0, 1)); + invalidIsrRequest1.setNewIsr(asList(0, 1, 3)); ControllerResult invalidIsrResult1 = sendAlterIsr( replicationControl, 1, ctx.currentBrokerEpoch(1), "foo", invalidIsrRequest1); @@ -514,8 +576,8 @@ public void testInvalidAlterIsrRequests() throws Exception { // Invalid ISR (does not include leader 0) PartitionData invalidIsrRequest2 = newAlterIsrPartition( - replicationControl, topicIdPartition, Arrays.asList(0, 1)); - invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2)); + replicationControl, topicIdPartition, asList(0, 1)); + invalidIsrRequest2.setNewIsr(asList(1, 2)); ControllerResult invalidIsrResult2 = sendAlterIsr( replicationControl, 1, ctx.currentBrokerEpoch(1), "foo", invalidIsrRequest2); @@ -647,28 +709,28 @@ public void testDeleteTopics() throws Exception { assertNull(replicationControl.getPartition(topicId, 3)); assertCreatedTopicConfigs(ctx, "foo", requestConfigs); - assertEquals(Collections.singletonMap(topicId, new ResultOrError<>("foo")), + assertEquals(singletonMap(topicId, new ResultOrError<>("foo")), replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId))); - assertEquals(Collections.singletonMap("foo", new ResultOrError<>(topicId)), + assertEquals(singletonMap("foo", new ResultOrError<>(topicId)), replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo"))); Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1, topicId.getLeastSignificantBits()); - assertEquals(Collections.singletonMap(invalidId, + assertEquals(singletonMap(invalidId, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId))); - assertEquals(Collections.singletonMap("bar", + assertEquals(singletonMap("bar", new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar"))); ControllerResult> invalidDeleteResult = replicationControl. deleteTopics(Collections.singletonList(invalidId)); assertEquals(0, invalidDeleteResult.records().size()); - assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)), + assertEquals(singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)), invalidDeleteResult.response()); ControllerResult> deleteResult = replicationControl. deleteTopics(Collections.singletonList(topicId)); assertTrue(deleteResult.isAtomic()); - assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)), + assertEquals(singletonMap(topicId, new ApiError(NONE, null)), deleteResult.response()); assertEquals(1, deleteResult.records().size()); ctx.replay(deleteResult.records()); @@ -676,10 +738,10 @@ public void testDeleteTopics() throws Exception { assertNull(replicationControl.getPartition(topicId, 1)); assertNull(replicationControl.getPartition(topicId, 2)); assertNull(replicationControl.getPartition(topicId, 3)); - assertEquals(Collections.singletonMap(topicId, new ResultOrError<>( + assertEquals(singletonMap(topicId, new ResultOrError<>( new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames( Long.MAX_VALUE, Collections.singleton(topicId))); - assertEquals(Collections.singletonMap("foo", new ResultOrError<>( + assertEquals(singletonMap("foo", new ResultOrError<>( new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds( Long.MAX_VALUE, Collections.singleton("foo"))); assertEmptyTopicConfigs(ctx, "foo"); @@ -715,7 +777,7 @@ public void testCreatePartitions() throws Exception { setName("quux").setCount(2).setAssignments(null)); ControllerResult> createPartitionsResult = replicationControl.createPartitions(topics); - assertEquals(Arrays.asList(new CreatePartitionsTopicResult(). + assertEquals(asList(new CreatePartitionsTopicResult(). setName("foo"). setErrorCode(NONE.code()). setErrorMessage(null), @@ -735,20 +797,20 @@ public void testCreatePartitions() throws Exception { ctx.replay(createPartitionsResult.records()); List topics2 = new ArrayList<>(); topics2.add(new CreatePartitionsTopic(). - setName("foo").setCount(6).setAssignments(Arrays.asList( - new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0))))); + setName("foo").setCount(6).setAssignments(asList( + new CreatePartitionsAssignment().setBrokerIds(asList(1, 0))))); topics2.add(new CreatePartitionsTopic(). - setName("bar").setCount(5).setAssignments(Arrays.asList( - new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1))))); + setName("bar").setCount(5).setAssignments(asList( + new CreatePartitionsAssignment().setBrokerIds(asList(1))))); topics2.add(new CreatePartitionsTopic(). - setName("quux").setCount(4).setAssignments(Arrays.asList( - new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0))))); + setName("quux").setCount(4).setAssignments(asList( + new CreatePartitionsAssignment().setBrokerIds(asList(1, 0))))); topics2.add(new CreatePartitionsTopic(). - setName("foo2").setCount(3).setAssignments(Arrays.asList( - new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0))))); + setName("foo2").setCount(3).setAssignments(asList( + new CreatePartitionsAssignment().setBrokerIds(asList(2, 0))))); ControllerResult> createPartitionsResult2 = replicationControl.createPartitions(topics2); - assertEquals(Arrays.asList(new CreatePartitionsTopicResult(). + assertEquals(asList(new CreatePartitionsTopicResult(). setName("foo"). setErrorCode(NONE.code()). setErrorMessage(null), @@ -775,13 +837,13 @@ public void testCreatePartitions() throws Exception { public void testValidateGoodManualPartitionAssignments() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext(); ctx.registerBrokers(1, 2, 3); - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1), + ctx.replicationControl.validateManualPartitionAssignment(asList(1), OptionalInt.of(1)); - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1), + ctx.replicationControl.validateManualPartitionAssignment(asList(1), OptionalInt.empty()); - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), + ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3), OptionalInt.of(3)); - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), + ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3), OptionalInt.empty()); } @@ -791,20 +853,20 @@ public void testValidateBadManualPartitionAssignments() throws Exception { ctx.registerBrokers(1, 2); assertEquals("The manual partition assignment includes an empty replica list.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(), + ctx.replicationControl.validateManualPartitionAssignment(asList(), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes broker 3, but no such " + "broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), + ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes the broker 2 more than " + "once.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 2), + ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 2), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes a partition with 2 " + "replica(s), but this is not consistent with previous partitions, which have " + "3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2), + ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2), OptionalInt.of(3))).getMessage()); } @@ -824,18 +886,18 @@ public void testReassignPartitions() throws Exception { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null)); ControllerResult alterResult = replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( - new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new AlterPartitionReassignmentsRequestData().setTopics(asList( + new ReassignableTopic().setName("foo").setPartitions(asList( new ReassignablePartition().setPartitionIndex(0). - setReplicas(Arrays.asList(3, 2, 1)), + setReplicas(asList(3, 2, 1)), new ReassignablePartition().setPartitionIndex(1). - setReplicas(Arrays.asList(0, 2, 1)), + setReplicas(asList(0, 2, 1)), new ReassignablePartition().setPartitionIndex(2). - setReplicas(Arrays.asList(0, 2, 1)))), + setReplicas(asList(0, 2, 1)))), new ReassignableTopic().setName("bar")))); assertEquals(new AlterPartitionReassignmentsResponseData(). - setErrorMessage(null).setResponses(Arrays.asList( - new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + setErrorMessage(null).setResponses(asList( + new ReassignableTopicResponse().setName("foo").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). setErrorMessage(null), new ReassignablePartitionResponse().setPartitionIndex(1). @@ -849,41 +911,41 @@ public void testReassignPartitions() throws Exception { ctx.replay(alterResult.records()); ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null). - setTopics(Arrays.asList(new OngoingTopicReassignment(). - setName("foo").setPartitions(Arrays.asList( + setTopics(asList(new OngoingTopicReassignment(). + setName("foo").setPartitions(asList( new OngoingPartitionReassignment().setPartitionIndex(1). - setRemovingReplicas(Arrays.asList(3)). - setAddingReplicas(Arrays.asList(0)). - setReplicas(Arrays.asList(0, 2, 1, 3)))))); + setRemovingReplicas(asList(3)). + setAddingReplicas(asList(0)). + setReplicas(asList(0, 2, 1, 3)))))); assertEquals(currentReassigning, replication.listPartitionReassignments(null)); - assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList( + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList( new ListPartitionReassignmentsTopics().setName("bar"). - setPartitionIndexes(Arrays.asList(0, 1, 2))))); - assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList( + setPartitionIndexes(asList(0, 1, 2))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(asList( new ListPartitionReassignmentsTopics().setName("foo"). - setPartitionIndexes(Arrays.asList(0, 1, 2))))); + setPartitionIndexes(asList(0, 1, 2))))); ControllerResult cancelResult = replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( - new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new AlterPartitionReassignmentsRequestData().setTopics(asList( + new ReassignableTopic().setName("foo").setPartitions(asList( new ReassignablePartition().setPartitionIndex(0). setReplicas(null), new ReassignablePartition().setPartitionIndex(1). setReplicas(null), new ReassignablePartition().setPartitionIndex(2). setReplicas(null))), - new ReassignableTopic().setName("bar").setPartitions(Arrays.asList( + new ReassignableTopic().setName("bar").setPartitions(asList( new ReassignablePartition().setPartitionIndex(0). setReplicas(null)))))); assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( new PartitionChangeRecord().setTopicId(fooId). setPartitionId(1). - setReplicas(Arrays.asList(2, 1, 3)). + setReplicas(asList(2, 1, 3)). setLeader(3). setRemovingReplicas(Collections.emptyList()). setAddingReplicas(Collections.emptyList()), (short) 0)), - new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList( - new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList( + new ReassignableTopicResponse().setName("foo").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null), new ReassignablePartitionResponse().setPartitionIndex(1). @@ -891,7 +953,7 @@ public void testReassignPartitions() throws Exception { new ReassignablePartitionResponse().setPartitionIndex(2). setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). setErrorMessage("Unable to find partition foo:2."))), - new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList( + new ReassignableTopicResponse().setName("bar").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()). setErrorMessage(null)))))), @@ -899,11 +961,11 @@ public void testReassignPartitions() throws Exception { log.info("running final alterIsr..."); ControllerResult alterIsrResult = replication.alterIsr( new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103). - setTopics(Arrays.asList(new TopicData().setName("foo").setPartitions(Arrays.asList( + setTopics(asList(new TopicData().setName("foo").setPartitions(asList( new PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1). - setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1))))))); - assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList( - new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList( + setLeaderEpoch(0).setNewIsr(asList(3, 0, 2, 1))))))); + assertEquals(new AlterIsrResponseData().setTopics(asList( + new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList( new AlterIsrResponseData.PartitionData(). setPartitionIndex(1). setErrorCode(FENCED_LEADER_EPOCH.code()))))), @@ -931,22 +993,22 @@ public void testCancelReassignPartitions() throws Exception { new int[] {}, new int[] {}, 1, 1, 1), replication.getPartition(fooId, 0)); ControllerResult alterResult = replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( - new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new AlterPartitionReassignmentsRequestData().setTopics(asList( + new ReassignableTopic().setName("foo").setPartitions(asList( new ReassignablePartition().setPartitionIndex(0). - setReplicas(Arrays.asList(1, 2, 3)), + setReplicas(asList(1, 2, 3)), new ReassignablePartition().setPartitionIndex(1). - setReplicas(Arrays.asList(1, 2, 3, 0)), + setReplicas(asList(1, 2, 3, 0)), new ReassignablePartition().setPartitionIndex(2). - setReplicas(Arrays.asList(5, 6, 7)), + setReplicas(asList(5, 6, 7)), new ReassignablePartition().setPartitionIndex(3). - setReplicas(Arrays.asList()))), - new ReassignableTopic().setName("bar").setPartitions(Arrays.asList( + setReplicas(asList()))), + new ReassignableTopic().setName("bar").setPartitions(asList( new ReassignablePartition().setPartitionIndex(0). - setReplicas(Arrays.asList(1, 2, 3, 4, 0))))))); + setReplicas(asList(1, 2, 3, 4, 0))))))); assertEquals(new AlterPartitionReassignmentsResponseData(). - setErrorMessage(null).setResponses(Arrays.asList( - new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + setErrorMessage(null).setResponses(asList( + new ReassignableTopicResponse().setName("foo").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). setErrorMessage(null), new ReassignablePartitionResponse().setPartitionIndex(1). @@ -959,7 +1021,7 @@ public void testCancelReassignPartitions() throws Exception { setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()). setErrorMessage("The manual partition assignment includes an empty " + "replica list."))), - new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList( + new ReassignableTopicResponse().setName("bar").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). setErrorMessage(null))))), alterResult.response()); @@ -972,55 +1034,55 @@ public void testCancelReassignPartitions() throws Exception { new int[] {}, new int[] {0, 1}, 4, 1, 2), replication.getPartition(barId, 0)); ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null). - setTopics(Arrays.asList(new OngoingTopicReassignment(). - setName("bar").setPartitions(Arrays.asList( + setTopics(asList(new OngoingTopicReassignment(). + setName("bar").setPartitions(asList( new OngoingPartitionReassignment().setPartitionIndex(0). setRemovingReplicas(Collections.emptyList()). - setAddingReplicas(Arrays.asList(0, 1)). - setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))); + setAddingReplicas(asList(0, 1)). + setReplicas(asList(1, 2, 3, 4, 0)))))); assertEquals(currentReassigning, replication.listPartitionReassignments(null)); - assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList( + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList( new ListPartitionReassignmentsTopics().setName("foo"). - setPartitionIndexes(Arrays.asList(0, 1, 2))))); - assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList( + setPartitionIndexes(asList(0, 1, 2))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(asList( new ListPartitionReassignmentsTopics().setName("bar"). - setPartitionIndexes(Arrays.asList(0, 1, 2))))); + setPartitionIndexes(asList(0, 1, 2))))); ControllerResult alterIsrResult = replication.alterIsr( new AlterIsrRequestData().setBrokerId(4).setBrokerEpoch(104). - setTopics(Arrays.asList(new TopicData().setName("bar").setPartitions(Arrays.asList( + setTopics(asList(new TopicData().setName("bar").setPartitions(asList( new PartitionData().setPartitionIndex(0).setCurrentIsrVersion(2). - setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 3, 0))))))); - assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList( - new AlterIsrResponseData.TopicData().setName("bar").setPartitions(Arrays.asList( + setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 3, 0))))))); + assertEquals(new AlterIsrResponseData().setTopics(asList( + new AlterIsrResponseData.TopicData().setName("bar").setPartitions(asList( new AlterIsrResponseData.PartitionData(). setPartitionIndex(0). setLeaderId(4). setLeaderEpoch(1). - setIsr(Arrays.asList(4, 1, 2, 3, 0)). + setIsr(asList(4, 1, 2, 3, 0)). setCurrentIsrVersion(3). setErrorCode(NONE.code()))))), alterIsrResult.response()); ControllerResult cancelResult = replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( - new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new AlterPartitionReassignmentsRequestData().setTopics(asList( + new ReassignableTopic().setName("foo").setPartitions(asList( new ReassignablePartition().setPartitionIndex(0). setReplicas(null))), - new ReassignableTopic().setName("bar").setPartitions(Arrays.asList( + new ReassignableTopic().setName("bar").setPartitions(asList( new ReassignablePartition().setPartitionIndex(0). setReplicas(null)))))); assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( new PartitionChangeRecord().setTopicId(barId). setPartitionId(0). setLeader(4). - setReplicas(Arrays.asList(2, 3, 4)). + setReplicas(asList(2, 3, 4)). setRemovingReplicas(null). setAddingReplicas(Collections.emptyList()), (short) 0)), - new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList( - new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList( + new ReassignableTopicResponse().setName("foo").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))), - new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList( + new ReassignableTopicResponse().setName("bar").setPartitions(asList( new ReassignablePartitionResponse().setPartitionIndex(0). setErrorMessage(null)))))), cancelResult); @@ -1052,6 +1114,160 @@ public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws ctx.replicationControl.getPartition(fooId, 1)); } + private void assertLeaderAndIsr( + ReplicationControlManager replication, + TopicIdPartition topicIdPartition, + int leaderId, + int[] isr + ) { + PartitionRegistration registration = replication.getPartition( + topicIdPartition.topicId(), + topicIdPartition.partitionId() + ); + assertArrayEquals(isr, registration.isr); + assertEquals(leaderId, registration.leader); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); + + TopicIdPartition partition0 = new TopicIdPartition(fooId, 0); + TopicIdPartition partition1 = new TopicIdPartition(fooId, 1); + TopicIdPartition partition2 = new TopicIdPartition(fooId, 2); + + ctx.fenceBrokers(Utils.mkSet(2, 3)); + ctx.fenceBrokers(Utils.mkSet(1, 2, 3)); + + assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1}); + assertLeaderAndIsr(replication, partition1, 4, new int[]{4}); + assertLeaderAndIsr(replication, partition2, 0, new int[]{0}); + + ElectLeadersRequestData request = buildElectLeadersRequest( + ElectionType.UNCLEAN, + electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2)) + ); + + // No election can be done yet because no replicas are available for partition 0 + ControllerResult result1 = replication.electLeaders(request); + assertEquals(Collections.emptyList(), result1.records()); + + ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap( + Utils.mkEntry( + new TopicPartition("foo", 0), + new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE) + ), + Utils.mkEntry( + new TopicPartition("foo", 1), + new ApiError(ELECTION_NOT_NEEDED) + ), + Utils.mkEntry( + new TopicPartition("foo", 2), + new ApiError(ELECTION_NOT_NEEDED) + ) + )); + assertElectLeadersResponse(expectedResponse1, result1.response()); + + // Now we bring 2 back online which should allow the unclean election of partition 0 + ctx.unfenceBrokers(Utils.mkSet(2)); + + // Bring 2 back into the ISR for partition 1. This allows us to verify that + // preferred election does not occur as a result of the unclean election request. + ctx.alterIsr(partition1, 4, asList(2, 4)); + + ControllerResult result = replication.electLeaders(request); + assertEquals(1, result.records().size()); + + ApiMessageAndVersion record = result.records().get(0); + assertTrue(record.message() instanceof PartitionChangeRecord); + + PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord) record.message(); + assertEquals(0, partitionChangeRecord.partitionId()); + assertEquals(2, partitionChangeRecord.leader()); + assertEquals(singletonList(2), partitionChangeRecord.isr()); + ctx.replay(result.records()); + + assertLeaderAndIsr(replication, partition0, 2, new int[]{2}); + assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4}); + assertLeaderAndIsr(replication, partition2, 0, new int[]{0}); + + ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap( + Utils.mkEntry( + new TopicPartition("foo", 0), + ApiError.NONE + ), + Utils.mkEntry( + new TopicPartition("foo", 1), + new ApiError(ELECTION_NOT_NEEDED) + ), + Utils.mkEntry( + new TopicPartition("foo", 2), + new ApiError(ELECTION_NOT_NEEDED) + ) + )); + assertElectLeadersResponse(expectedResponse, result.response()); + } + + @Test + public void testPreferredElectionDoesNotTriggerUncleanElection() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(1, 2, 3, 4); + ctx.unfenceBrokers(1, 2, 3, 4); + + Uuid fooId = ctx.createTestTopic("foo", new int[][]{new int[]{1, 2, 3}}).topicId(); + TopicIdPartition partition = new TopicIdPartition(fooId, 0); + + ctx.fenceBrokers(Utils.mkSet(2, 3)); + ctx.fenceBrokers(Utils.mkSet(1, 2, 3)); + ctx.unfenceBrokers(Utils.mkSet(2)); + + assertLeaderAndIsr(replication, partition, NO_LEADER, new int[]{1}); + + ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true"); + + ElectLeadersRequestData request = buildElectLeadersRequest( + ElectionType.PREFERRED, + singletonMap("foo", singletonList(0)) + ); + + // No election should be done even though unclean election is available + ControllerResult result = replication.electLeaders(request); + assertEquals(Collections.emptyList(), result.records()); + + ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, false, singletonMap( + new TopicPartition("foo", 0), new ApiError(PREFERRED_LEADER_NOT_AVAILABLE) + )); + assertEquals(expectedResponse, result.response()); + } + + private ElectLeadersRequestData buildElectLeadersRequest( + ElectionType electionType, + Map> partitions + ) { + ElectLeadersRequestData request = new ElectLeadersRequestData(). + setElectionType(electionType.value); + + if (partitions == null) { + request.setTopicPartitions(null); + } else { + partitions.forEach((topic, partitionIds) -> { + request.topicPartitions().add(new TopicPartitions() + .setTopic(topic) + .setPartitions(partitionIds) + ); + }); + } + return request; + } + @Test public void testFenceMultipleBrokers() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext(); @@ -1083,7 +1299,7 @@ public void testFenceMultipleBrokers() throws Exception { } @Test - public void testElectLeaders() throws Exception { + public void testElectPreferredLeaders() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext(); ReplicationControlManager replication = ctx.replicationControl; ctx.registerBrokers(0, 1, 2, 3, 4); @@ -1091,60 +1307,130 @@ public void testElectLeaders() throws Exception { Uuid fooId = ctx.createTestTopic("foo", new int[][]{ new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); ElectLeadersRequestData request1 = new ElectLeadersRequestData(). - setElectionType((byte) 0). - setTopicPartitions(new TopicPartitionsCollection(Arrays.asList( + setElectionType(ElectionType.PREFERRED.value). + setTopicPartitions(new TopicPartitionsCollection(asList( new TopicPartitions().setTopic("foo"). - setPartitions(Arrays.asList(0, 1)), + setPartitions(asList(0, 1)), new TopicPartitions().setTopic("bar"). - setPartitions(Arrays.asList(0, 1))).iterator())); + setPartitions(asList(0, 1))).iterator())); ControllerResult election1Result = replication.electLeaders(request1); - ElectLeadersResponseData expectedResponse1 = new ElectLeadersResponseData(). - setErrorCode((short) 0). - setReplicaElectionResults(Arrays.asList( - new ReplicaElectionResult().setTopic("foo"). - setPartitionResult(Arrays.asList( - new PartitionResult().setPartitionId(0). - setErrorCode(NONE.code()). - setErrorMessage(null), - new PartitionResult().setPartitionId(1). - setErrorCode(NONE.code()). - setErrorMessage(null))), - new ReplicaElectionResult().setTopic("bar"). - setPartitionResult(Arrays.asList( - new PartitionResult().setPartitionId(0). - setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). - setErrorMessage("No such topic as bar"), - new PartitionResult().setPartitionId(1). - setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). - setErrorMessage("No such topic as bar"))))); - assertEquals(expectedResponse1, election1Result.response()); + ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, false, Utils.mkMap( + Utils.mkEntry( + new TopicPartition("foo", 0), + new ApiError(PREFERRED_LEADER_NOT_AVAILABLE) + ), + Utils.mkEntry( + new TopicPartition("foo", 1), + new ApiError(ELECTION_NOT_NEEDED) + ), + Utils.mkEntry( + new TopicPartition("bar", 0), + new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar") + ), + Utils.mkEntry( + new TopicPartition("bar", 1), + new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar") + ) + )); + assertElectLeadersResponse(expectedResponse1, election1Result.response()); assertEquals(Collections.emptyList(), election1Result.records()); ctx.unfenceBrokers(0, 1); ControllerResult alterIsrResult = replication.alterIsr( new AlterIsrRequestData().setBrokerId(2).setBrokerEpoch(102). - setTopics(Arrays.asList(new AlterIsrRequestData.TopicData().setName("foo"). - setPartitions(Arrays.asList(new AlterIsrRequestData.PartitionData(). + setTopics(asList(new AlterIsrRequestData.TopicData().setName("foo"). + setPartitions(asList(new AlterIsrRequestData.PartitionData(). setPartitionIndex(0).setCurrentIsrVersion(0). - setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3))))))); - assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList( - new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList( + setLeaderEpoch(0).setNewIsr(asList(1, 2, 3))))))); + assertEquals(new AlterIsrResponseData().setTopics(asList( + new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList( new AlterIsrResponseData.PartitionData(). setPartitionIndex(0). setLeaderId(2). setLeaderEpoch(0). - setIsr(Arrays.asList(1, 2, 3)). + setIsr(asList(1, 2, 3)). setCurrentIsrVersion(1). setErrorCode(NONE.code()))))), alterIsrResult.response()); + + ElectLeadersResponseData expectedResponse2 = buildElectLeadersResponse(NONE, false, Utils.mkMap( + Utils.mkEntry( + new TopicPartition("foo", 0), + ApiError.NONE + ), + Utils.mkEntry( + new TopicPartition("foo", 1), + new ApiError(ELECTION_NOT_NEEDED) + ), + Utils.mkEntry( + new TopicPartition("bar", 0), + new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar") + ), + Utils.mkEntry( + new TopicPartition("bar", 1), + new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar") + ) + )); + ctx.replay(alterIsrResult.records()); ControllerResult election2Result = replication.electLeaders(request1); - assertEquals(expectedResponse1, election2Result.response()); - assertEquals(Arrays.asList(new ApiMessageAndVersion(new PartitionChangeRecord(). + assertElectLeadersResponse(expectedResponse2, election2Result.response()); + assertEquals(asList(new ApiMessageAndVersion(new PartitionChangeRecord(). setPartitionId(0). setTopicId(fooId). setLeader(1), (short) 0)), election2Result.records()); } + + private void assertElectLeadersResponse( + ElectLeadersResponseData expected, + ElectLeadersResponseData actual + ) { + assertEquals(Errors.forCode(expected.errorCode()), Errors.forCode(actual.errorCode())); + assertEquals(collectElectLeadersErrors(expected), collectElectLeadersErrors(actual)); + } + + private Map collectElectLeadersErrors(ElectLeadersResponseData response) { + Map res = new HashMap<>(); + response.replicaElectionResults().forEach(topicResult -> { + String topic = topicResult.topic(); + topicResult.partitionResult().forEach(partitionResult -> { + TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId()); + res.put(topicPartition, partitionResult); + }); + }); + return res; + } + + private ElectLeadersResponseData buildElectLeadersResponse( + Errors topLevelError, + boolean electAllPartitions, + Map errors + ) { + Map>> errorsByTopic = errors.entrySet().stream() + .collect(Collectors.groupingBy(entry -> entry.getKey().topic())); + + ElectLeadersResponseData response = new ElectLeadersResponseData() + .setErrorCode(topLevelError.code()); + + errorsByTopic.forEach((topic, partitionErrors) -> { + ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic); + electionResult.setPartitionResult(partitionErrors.stream() + .filter(entry -> !electAllPartitions || entry.getValue().error() != ELECTION_NOT_NEEDED) + .map(entry -> { + TopicPartition topicPartition = entry.getKey(); + ApiError error = entry.getValue(); + return new PartitionResult() + .setPartitionId(topicPartition.partition()) + .setErrorCode(error.error().code()) + .setErrorMessage(error.message()); + }) + .collect(Collectors.toList())); + response.replicaElectionResults().add(electionResult); + }); + + return response; + } + }