From 436271ff841d03b9c62ddaead630679378312fc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Tue, 30 Jul 2024 18:31:01 +0000 Subject: [PATCH 1/6] KAFKA-16533; Update voter handling --- .../message/UpdateRaftVoterRequest.json | 2 + .../message/UpdateRaftVoterResponse.json | 12 +- .../scala/kafka/server/ControllerApis.scala | 2 +- .../java/org/apache/kafka/raft/Endpoints.java | 28 +++ .../apache/kafka/raft/KafkaRaftClient.java | 99 +++++++- .../org/apache/kafka/raft/LeaderState.java | 54 +++- .../java/org/apache/kafka/raft/RaftUtil.java | 57 +++++ .../kafka/raft/internals/AddVoterHandler.java | 4 +- .../raft/internals/AddVoterHandlerState.java | 9 +- .../raft/internals/RemoveVoterHandler.java | 4 +- .../internals/RemoveVoterHandlerState.java | 9 +- .../raft/internals/UpdateVoterHandler.java | 231 ++++++++++++++++++ .../internals/UpdateVoterHandlerState.java | 72 ++++++ .../apache/kafka/raft/internals/VoterSet.java | 14 ++ .../raft/KafkaRaftClientReconfigTest.java | 67 +++++ .../kafka/raft/RaftClientTestContext.java | 48 +++- 16 files changed, 678 insertions(+), 34 deletions(-) create mode 100644 raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java create mode 100644 raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java diff --git a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json index 80ee58a43a3d6..5dde41cff56bd 100644 --- a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json +++ b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json @@ -22,6 +22,8 @@ "flexibleVersions": "0+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "0+" }, + { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+", + "about": "The current leader epoch of the partition, -1 for unknown leader epoch" }, { "name": "VoterId", "type": "int32", "versions": "0+", "about": "The replica id of the voter getting updated in the topic partition" }, { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+", diff --git a/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json index 64816406c7426..33b49c37198bb 100644 --- a/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json +++ b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json @@ -23,6 +23,16 @@ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The error code, or 0 if there was no error" } + "about": "The error code, or 0 if there was no error" }, + { "name": "CurrentLeader", "type": "CurrentLeader", "versions": "0+", + "taggedVersions": "0+", "tag": 0, "fields": [ + { "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId", + "about": "The replica id of the current leader or -1 if the leader is unknown" }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", + "about": "The latest known leader epoch" }, + { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" }, + { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" } + ] + } ] } diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 8a99cd9eed9a7..ae4980f435464 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -1095,6 +1095,6 @@ class ControllerApis( def handleUpdateRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) - throw new UnsupportedVersionException("handleUpdateRaftVoter is not supported yet.") + handleRaftRequest(request, response => new UpdateRaftVoterResponse(response.asInstanceOf[UpdateRaftVoterResponseData])) } } diff --git a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java index 5e979022ec322..8dd05a5ae854e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java +++ b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.message.EndQuorumEpochResponseData; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FetchSnapshotResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.message.VotersRecord; import org.apache.kafka.common.network.ListenerName; @@ -131,6 +132,21 @@ public DescribeQuorumResponseData.ListenerCollection toDescribeQuorumResponseLis return listeners; } + public UpdateRaftVoterRequestData.ListenerCollection toUpdateVoterRequest() { + UpdateRaftVoterRequestData.ListenerCollection listeners = + new UpdateRaftVoterRequestData.ListenerCollection(endpoints.size()); + for (Map.Entry entry : endpoints.entrySet()) { + listeners.add( + new UpdateRaftVoterRequestData.Listener() + .setName(entry.getKey().value()) + .setHost(entry.getValue().getHostString()) + .setPort(entry.getValue().getPort()) + ); + } + + return listeners; + } + private static final Endpoints EMPTY = new Endpoints(Collections.emptyMap()); public static Endpoints empty() { return EMPTY; @@ -272,4 +288,16 @@ public static Endpoints fromAddVoterRequest(AddRaftVoterRequestData.ListenerColl return new Endpoints(listeners); } + + public static Endpoints fromUpdateVoterRequest(UpdateRaftVoterRequestData.ListenerCollection endpoints) { + Map listeners = new HashMap<>(endpoints.size()); + for (UpdateRaftVoterRequestData.Listener endpoint : endpoints) { + listeners.put( + ListenerName.normalised(endpoint.name()), + InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port()) + ); + } + + return new Endpoints(listeners); + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 0b454666db3cb..8ae2b013dee56 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.FetchSnapshotResponseData; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -76,6 +78,7 @@ import org.apache.kafka.raft.internals.RemoveVoterHandler; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.ThresholdPurgatory; +import org.apache.kafka.raft.internals.UpdateVoterHandler; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.serialization.RecordSerde; @@ -207,6 +210,7 @@ public final class KafkaRaftClient implements RaftClient { // Specialized handlers private volatile AddVoterHandler addVoterHandler; private volatile RemoveVoterHandler removeVoterHandler; + private volatile UpdateVoterHandler updateVoterHandler; /** * Create a new instance. @@ -349,6 +353,7 @@ private void onUpdateLeaderHighWatermark( // add or remove voter request that need to be completed addVoterHandler.highWatermarkUpdated(state); removeVoterHandler.highWatermarkUpdated(state); + updateVoterHandler.highWatermarkUpdated(state); // After updating the high watermark, we first clear the append // purgatory so that we have an opportunity to route the pending @@ -543,6 +548,15 @@ public void initialize( quorumConfig.requestTimeoutMs(), logContext ); + + // Specialized remove voter handler + this.updateVoterHandler = new UpdateVoterHandler( + nodeId, + partitionState, + channel.listenerName(), + time, + quorumConfig.requestTimeoutMs() + ); } @Override @@ -2071,7 +2085,8 @@ private CompletableFuture handleAddVoterRequest( .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage( String.format( - "Add voter request didn't include the default listener: %s", + "Add voter request didn't include the endpoint (%s) for the default listener %s", + newVoterEndpoints, channel.listenerName() ) ) @@ -2153,6 +2168,84 @@ private CompletableFuture handleRemoveVoterRequest( ); } + private CompletableFuture handleUpdateVoterRequest( + RaftRequest.Inbound requestMetadata, + long currentTimeMs + ) { + UpdateRaftVoterRequestData data = (UpdateRaftVoterRequestData) requestMetadata.data(); + + if (!hasValidClusterId(data.clusterId())) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INCONSISTENT_CLUSTER_ID, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + Optional leaderValidation = validateLeaderOnlyRequest(data.currentLeaderEpoch()); + if (leaderValidation.isPresent()) { + return completedFuture( + RaftUtil.updateVoterResponse( + leaderValidation.get(), + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + Optional voter = RaftUtil.updateVoterRequestVoterKey(data); + if (!voter.isPresent() || !voter.get().directoryId().isPresent()) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners()); + if (!voterEndpoints.address(channel.listenerName()).isPresent()) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions = data.kRaftVersionFeature(); + if (supportedKraftVersions.minSupportedVersion() < 0 || + supportedKraftVersions.maxSupportedVersion() < 0 || + supportedKraftVersions.maxSupportedVersion() < supportedKraftVersions.minSupportedVersion() + ) { + return completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestMetadata.listenerName(), + quorum.leaderAndEpoch(), + quorum.leaderEndpoints() + ) + ); + } + + return updateVoterHandler.handleUpdateVoterRequest( + quorum.leaderStateOrThrow(), + requestMetadata.listenerName(), + voter.get(), + voterEndpoints, + supportedKraftVersions, + currentTimeMs + ); + } + private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) { // Only elected leaders are sent in the request/response header, so if we have an elected // leaderId, it should be consistent with what is in the message. @@ -2419,6 +2512,10 @@ private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) { responseFuture = handleRemoveVoterRequest(request, currentTimeMs); break; + case UPDATE_RAFT_VOTER: + responseFuture = handleUpdateVoterRequest(request, currentTimeMs); + break; + default: throw new IllegalArgumentException("Unexpected request type " + apiKey); } diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 06deb28c69ca4..e1ad8e184e73b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -31,6 +31,7 @@ import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.RemoveVoterHandlerState; import org.apache.kafka.raft.internals.ReplicaKey; +import org.apache.kafka.raft.internals.UpdateVoterHandlerState; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.server.common.KRaftVersion; @@ -44,12 +45,14 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +// TODO: the leader needs to update itself /** * In the context of LeaderState, an acknowledged voter means one who has acknowledged the current leader by either * responding to a `BeginQuorumEpoch` request from the leader or by beginning to send `Fetch` requests. @@ -74,6 +77,8 @@ public class LeaderState implements EpochState { private Map voterStates = new HashMap<>(); private Optional addVoterHandlerState = Optional.empty(); private Optional removeVoterHandlerState = Optional.empty(); + private Optional updateVoterHandlerState = Optional.empty(); + private final Map observerStates = new HashMap<>(); private final Logger log; @@ -239,8 +244,26 @@ public void resetRemoveVoterHandlerState( removeVoterHandlerState = state; } + public Optional updateVoterHandlerState() { + return updateVoterHandlerState; + } + + public void resetUpdateVoterHandlerState( + Errors error, + Optional state + ) { + updateVoterHandlerState.ifPresent( + handlerState -> handlerState.completeFuture( + error, + new LeaderAndEpoch(OptionalInt.of(localReplicaKey.id()), epoch), + endpoints + ) + ); + updateVoterHandlerState = state; + } + public long maybeExpirePendingOperation(long currentTimeMs) { - // First abort any expired operation + // First abort any expired operations long timeUntilAddVoterExpiration = addVoterHandlerState() .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) .orElse(Long.MAX_VALUE); @@ -257,14 +280,27 @@ public long maybeExpirePendingOperation(long currentTimeMs) { resetRemoveVoterHandlerState(Errors.REQUEST_TIMED_OUT, null, Optional.empty()); } - // Reread the timeouts and return the smaller of the two + long timeUntilUpdateVoterExpiration = updateVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE); + + if (timeUntilUpdateVoterExpiration == 0) { + resetUpdateVoterHandlerState(Errors.REQUEST_TIMED_OUT, Optional.empty()); + } + + // Reread the timeouts and return the smaller of them return Math.min( addVoterHandlerState() .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) .orElse(Long.MAX_VALUE), - removeVoterHandlerState() - .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) - .orElse(Long.MAX_VALUE) + Math.min( + removeVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE), + updateVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE) + ) ); } @@ -826,11 +862,9 @@ public String name() { @Override public void close() { - addVoterHandlerState.ifPresent(AddVoterHandlerState::close); - addVoterHandlerState = Optional.empty(); - - removeVoterHandlerState.ifPresent(RemoveVoterHandlerState::close); - removeVoterHandlerState = Optional.empty(); + resetAddVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, Optional.empty()); + resetRemoveVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, Optional.empty()); + resetUpdateVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty()); accumulator.close(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index 4d2f3bc06e821..a9d3a430856d7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterResponseData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; @@ -32,6 +33,8 @@ import org.apache.kafka.common.message.FetchSnapshotResponseData; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.network.ListenerName; @@ -564,6 +567,52 @@ public static RemoveRaftVoterResponseData removeVoterResponse( .setErrorMessage(errorMessage); } + // TODO: add tests + public static UpdateRaftVoterRequestData updateVoterRequest( + String clusterId, + ReplicaKey voter, + int epoch, + SupportedVersionRange supportedVersions, + Endpoints endpoints + ) { + UpdateRaftVoterRequestData request = new UpdateRaftVoterRequestData() + .setClusterId(clusterId) + .setCurrentLeaderEpoch(epoch) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .setListeners(endpoints.toUpdateVoterRequest()); + + request.kRaftVersionFeature() + .setMinSupportedVersion(supportedVersions.min()) + .setMaxSupportedVersion(supportedVersions.max()); + + return request; + } + + // TODO: add tests + public static UpdateRaftVoterResponseData updateVoterResponse( + Errors error, + ListenerName listenerName, + LeaderAndEpoch leaderAndEpoch, + Endpoints endpoints + ) { + UpdateRaftVoterResponseData response = new UpdateRaftVoterResponseData() + .setErrorCode(error.code()); + + response.currentLeader() + .setLeaderId(leaderAndEpoch.leaderId().orElse(-1)) + .setLeaderEpoch(leaderAndEpoch.epoch()); + + Optional address = endpoints.address(listenerName); + if (address.isPresent()) { + response.currentLeader() + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()); + } + + return response; + } + private static List toReplicaStates( short apiVersion, int leaderId, @@ -641,6 +690,14 @@ public static Optional removeVoterRequestVoterKey(RemoveRaftVoterReq } } + public static Optional updateVoterRequestVoterKey(UpdateRaftVoterRequestData request) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); + } + } + static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { return data.topics().size() == 1 && data.topics().get(0).topicId().equals(topicId) && diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java index a504e2ec9d498..444e747b1e594 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java @@ -87,12 +87,12 @@ public CompletableFuture handleAddVoterRequest( Endpoints voterEndpoints, long currentTimeMs ) { - // Check if there are any pending add or remove voter requests + // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( RaftUtil.addVoterResponse( Errors.REQUEST_TIMED_OUT, - "Request timed out waiting for leader to handle previous add or remove voter request" + "Request timed out waiting for leader to handle previous voter change request" ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java index 278fd4ecd62d1..5bef0afa709a8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java @@ -18,15 +18,13 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.message.AddRaftVoterResponseData; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.raft.Endpoints; -import org.apache.kafka.raft.RaftUtil; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; -public final class AddVoterHandlerState implements AutoCloseable { +public final class AddVoterHandlerState { private final ReplicaKey voterKey; private final Endpoints voterEndpoints; private final Timer timeout; @@ -84,9 +82,4 @@ public OptionalLong lastOffset() { public CompletableFuture future() { return future; } - - @Override - public void close() { - future.complete(RaftUtil.addVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null)); - } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java index b7f4ffb759eee..13aaf478808ec 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java @@ -77,12 +77,12 @@ public CompletableFuture handleRemoveVoterRequest( ReplicaKey voterKey, long currentTimeMs ) { - // Check if there are any pending add or remove voter requests + // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( RaftUtil.removeVoterResponse( Errors.REQUEST_TIMED_OUT, - "Request timed out waiting for leader to handle previous add or remove voter request" + "Request timed out waiting for leader to handle previous voter change request" ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java index 670103b4346a4..bb9ef4cb2cc89 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java @@ -17,13 +17,11 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.raft.RaftUtil; import java.util.concurrent.CompletableFuture; -public final class RemoveVoterHandlerState implements AutoCloseable { +public final class RemoveVoterHandlerState { private final long lastOffset; private final Timer timeout; private final CompletableFuture future = new CompletableFuture<>(); @@ -45,9 +43,4 @@ public CompletableFuture future() { public long lastOffset() { return lastOffset; } - - @Override - public void close() { - future.complete(RaftUtil.removeVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null)); - } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java new file mode 100644 index 0000000000000..f5b45c2c0d45e --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.LeaderState; +import org.apache.kafka.raft.LogOffsetMetadata; +import org.apache.kafka.raft.RaftUtil; +import org.apache.kafka.server.common.KRaftVersion; + +import java.util.Optional; +import java.util.OptionalInt; +import java.util.concurrent.CompletableFuture; + +/** + * TODO: document this. + * + * 1. Wait until there are no uncommitted add or remove voter records. Note that the implementation + * may just return a REQUEST_TIMED_OUT error if there are pending operations. + * 2. Wait for the LeaderChangeMessage control record from the current epoch to get committed. Note + * that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations. + * 3. Check that the updated voter supports the current kraft.version. + * 4. If the replica id tracked doesn't have a replica directory id, update it with the replica + * directory id provided in the request. + * 5. Append the updated VotersRecord to the log if the finalized kraft.version is greater than 0. + * 6. The KRaft internal listener will read this record from the log and update the voter's + * information. This includes updating the endpoint used by the KRaft NetworkClient. + * 7. Wait for the VotersRecord to commit using the majority of the new set of voters. + * 8. Send the UpdateVoter response to the client. + */ +public final class UpdateVoterHandler { + private final OptionalInt localId; + private final KRaftControlRecordStateMachine partitionState; + private final ListenerName defaultListenerName; + private final Time time; + private final long requestTimeoutMs; + + public UpdateVoterHandler( + OptionalInt localId, + KRaftControlRecordStateMachine partitionState, + ListenerName defaultListenerName, + Time time, + long requestTimeoutMs + ) { + this.localId = localId; + this.partitionState = partitionState; + this.defaultListenerName = defaultListenerName; + this.time = time; + this.requestTimeoutMs = requestTimeoutMs; + } + + public CompletableFuture handleUpdateVoterRequest( + LeaderState leaderState, + ListenerName requestListenerName, + ReplicaKey voterKey, + Endpoints voterEndpoints, + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions, + long currentTimeMs + ) { + // Check if there are any pending voter change requests + if (leaderState.isOperationPending(currentTimeMs)) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that the leader has established a HWM and committed the current epoch + Optional highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); + if (!highWatermark.isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that the cluster supports kraft.version >= 1 + // TODO: File a jira to handle the kraft.version == 0 + KRaftVersion kraftVersion = partitionState.lastKraftVersion(); + if (!kraftVersion.isReconfigSupported()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.UNSUPPORTED_VERSION, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that there are no uncommitted VotersRecord + Optional> votersEntry = partitionState.lastVoterSetEntry(); + if (!votersEntry.isPresent() || votersEntry.get().offset() >= highWatermark.get()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.REQUEST_TIMED_OUT, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that the supported version range is valid + if (!validVersionRange(kraftVersion, supportedKraftVersions)) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Check that endpoinds includes the default listener + if (!voterEndpoints.address(defaultListenerName).isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.INVALID_REQUEST, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + // Update the voter + Optional updatedVoters = votersEntry + .get() + .value() + .updateVoter( + VoterSet.VoterNode.of( + voterKey, + voterEndpoints, + new SupportedVersionRange( + supportedKraftVersions.minSupportedVersion(), + supportedKraftVersions.maxSupportedVersion() + ) + ) + ); + if (!updatedVoters.isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.updateVoterResponse( + Errors.VOTER_NOT_FOUND, + requestListenerName, + new LeaderAndEpoch( + localId, + leaderState.epoch() + ), + leaderState.leaderEndpoints() + ) + ); + } + + UpdateVoterHandlerState state = new UpdateVoterHandlerState( + leaderState.appendVotersRecord(updatedVoters.get(), currentTimeMs), + requestListenerName, + time.timer(requestTimeoutMs) + ); + leaderState.resetUpdateVoterHandlerState(Errors.UNKNOWN_SERVER_ERROR, Optional.of(state)); + + return state.future(); + } + + public void highWatermarkUpdated(LeaderState leaderState) { + leaderState.updateVoterHandlerState().ifPresent(current -> { + leaderState.highWatermark().ifPresent(highWatermark -> { + if (highWatermark.offset() > current.lastOffset()) { + // VotersRecord with the updated voter was committed; complete the RPC + leaderState.resetUpdateVoterHandlerState(Errors.NONE, Optional.empty()); + } + }); + }); + } + + private boolean validVersionRange( + KRaftVersion finalizedVersion, + UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions + ) { + return supportedKraftVersions.minSupportedVersion() <= finalizedVersion.featureLevel() && + supportedKraftVersions.maxSupportedVersion() >= finalizedVersion.featureLevel(); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java new file mode 100644 index 0000000000000..c0ac6c5189983 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.RaftUtil; + +import java.util.concurrent.CompletableFuture; + +public final class UpdateVoterHandlerState { + private final long lastOffset; + private final ListenerName requestListenerName; + private final Timer timeout; + private final CompletableFuture future = new CompletableFuture<>(); + + UpdateVoterHandlerState( + long lastOffset, + ListenerName requestListenerName, + Timer timeout + ) { + this.lastOffset = lastOffset; + this.requestListenerName = requestListenerName; + this.timeout = timeout; + } + + public long timeUntilOperationExpiration(long currentTimeMs) { + timeout.update(currentTimeMs); + return timeout.remainingMs(); + } + + public CompletableFuture future() { + return future; + } + + public void completeFuture( + Errors error, + LeaderAndEpoch leaderAndEpoch, + Endpoints leaderEndpoints + ) { + future.complete( + RaftUtil.updateVoterResponse( + error, + requestListenerName, + leaderAndEpoch, + leaderEndpoints + ) + ); + } + + public long lastOffset() { + return lastOffset; + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index 447b44a93d9e2..0f9d83ae31aac 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -209,6 +209,20 @@ public Optional removeVoter(ReplicaKey voterKey) { return Optional.empty(); } + // TODO: write tests + // TODO: write documentation + public Optional updateVoter(VoterNode voter) { + VoterNode oldVoter = voters.get(voter.voterKey().id()); + if (oldVoter != null && oldVoter.isVoter(voter.voterKey())) { + HashMap newVoters = new HashMap<>(voters); + newVoters.put(voter.voterKey().id(), voter); + + return Optional.of(new VoterSet(newVoters)); + } + + return Optional.empty(); + } + /** * Converts a voter set to a voters record for a given version. * diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 2ca0e45720848..bc571550d486f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -50,6 +50,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -1547,6 +1548,72 @@ void testRemoveVoterWithPendingAddVoter() throws Exception { context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT); } + @Test + void testUpdateVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + assertTrue(context.client.quorum().isVoter(follower)); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + new SupportedVersionRange((short) 0, (short) 1), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // follower should not be a voter in the latest voter set + assertTrue(context.client.quorum().isVoter(follower)); + + // Send a FETCH to increase the HWM and commit the new voter set + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Expect reply for RemoveVoter request + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.NONE); + } + private static void verifyVotersRecord( VoterSet expectedVoterSet, ByteBuffer recordKey, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 695af19f7b155..91cffc6eb2242 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterResponseData; @@ -39,6 +40,8 @@ import org.apache.kafka.common.message.LeaderChangeMessage.Voter; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -1148,10 +1151,23 @@ RemoveRaftVoterResponseData assertSentRemoveVoterResponse(Errors error) { return removeVoterResponse; } - // TODO: preferredSuccessors should be a list of replica keys + UpdateRaftVoterResponseData assertSentUpdateVoterResponse(Errors error) { + List sentResponses = drainSentResponses(ApiKeys.UPDATE_RAFT_VOTER); + assertEquals(1, sentResponses.size()); + + RaftResponse.Outbound response = sentResponses.get(0); + assertInstanceOf(UpdateRaftVoterResponseData.class, response.data()); + + UpdateRaftVoterResponseData updateVoterResponse = (UpdateRaftVoterResponseData) response.data(); + assertEquals(error, Errors.forCode(updateVoterResponse.errorCode())); + + return updateVoterResponse; + } + List collectEndQuorumRequests( int epoch, Set destinationIdSet, + // TODO: preferredSuccessors should be a list of replica keys Optional> preferredSuccessorsOpt ) { List endQuorumRequests = new ArrayList<>(); @@ -1644,6 +1660,24 @@ RemoveRaftVoterRequestData removeVoterRequest(String cluster, ReplicaKey voter) return RaftUtil.removeVoterRequest(cluster, voter); } + UpdateRaftVoterRequestData updateVoterRequest( + ReplicaKey voter, + SupportedVersionRange supportedVersions, + Endpoints endpoints + ) { + return updateVoterRequest(clusterId, voter, currentEpoch(), supportedVersions, endpoints); + } + + UpdateRaftVoterRequestData updateVoterRequest( + String clusterId, + ReplicaKey voter, + int epoch, + SupportedVersionRange supportedVersions, + Endpoints endpoints + ) { + return RaftUtil.updateVoterRequest(clusterId, voter, epoch, supportedVersions, endpoints); + } + private short fetchRpcVersion() { if (kip853Rpc) { return 17; @@ -1708,6 +1742,14 @@ private short removeVoterRpcVersion() { } } + private short updateVoterRpcVersion() { + if (kip853Rpc) { + return 0; + } else { + throw new IllegalStateException("Reconfiguration must be enabled by calling withKip853Rpc(true)"); + } + } + private short raftRequestVersion(ApiMessage request) { if (request instanceof FetchRequestData) { return fetchRpcVersion(); @@ -1725,6 +1767,8 @@ private short raftRequestVersion(ApiMessage request) { return addVoterRpcVersion(); } else if (request instanceof RemoveRaftVoterRequestData) { return removeVoterRpcVersion(); + } else if (request instanceof UpdateRaftVoterRequestData) { + return updateVoterRpcVersion(); } else { throw new IllegalArgumentException(String.format("Request %s is not a raft request", request)); } @@ -1747,6 +1791,8 @@ private short raftResponseVersion(ApiMessage response) { return addVoterRpcVersion(); } else if (response instanceof RemoveRaftVoterResponseData) { return removeVoterRpcVersion(); + } else if (response instanceof UpdateRaftVoterResponseData) { + return updateVoterRpcVersion(); } else if (response instanceof ApiVersionsResponseData) { return 4; } else { From 35c65629917296030157de04925d5de42992982a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Wed, 31 Jul 2024 23:11:50 +0000 Subject: [PATCH 2/6] KAFKA-16533; Implement leader self updating the voter set --- .../apache/kafka/raft/KafkaRaftClient.java | 7 ++ .../org/apache/kafka/raft/LeaderState.java | 86 ++++++++++++------- .../org/apache/kafka/raft/QuorumState.java | 5 ++ .../apache/kafka/raft/internals/VoterSet.java | 10 +++ .../raft/KafkaRaftClientReconfigTest.java | 52 ++++++++++- .../apache/kafka/raft/LeaderStateTest.java | 3 + .../apache/kafka/raft/QuorumStateTest.java | 2 + .../kafka/raft/RaftClientTestContext.java | 39 +++++++-- .../kafka/raft/RaftEventSimulationTest.java | 2 + .../raft/internals/KafkaRaftMetricsTest.java | 2 + .../kafka/raft/internals/VoterSetTest.java | 4 +- .../kafka/server/common/KRaftVersion.java | 20 ++++- .../kafka/server/common/KRaftVersionTest.java | 11 ++- 13 files changed, 200 insertions(+), 43 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 8ae2b013dee56..a6549550bee33 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterResponseData; @@ -174,6 +175,7 @@ public final class KafkaRaftClient implements RaftClient { private final int fetchMaxWaitMs; private final String clusterId; private final Endpoints localListeners; + private final SupportedVersionRange localSupportedKRaftVersion; private final NetworkChannel channel; private final ReplicatedLog log; private final Random random; @@ -230,6 +232,7 @@ public KafkaRaftClient( String clusterId, Collection bootstrapServers, Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, QuorumConfig quorumConfig ) { this( @@ -246,6 +249,7 @@ public KafkaRaftClient( clusterId, bootstrapServers, localListeners, + localSupportedKRaftVersion, logContext, new Random(), quorumConfig @@ -266,6 +270,7 @@ public KafkaRaftClient( String clusterId, Collection bootstrapServers, Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, LogContext logContext, Random random, QuorumConfig quorumConfig @@ -283,6 +288,7 @@ public KafkaRaftClient( this.time = time; this.clusterId = clusterId; this.localListeners = localListeners; + this.localSupportedKRaftVersion = localSupportedKRaftVersion; this.fetchMaxWaitMs = fetchMaxWaitMs; this.logger = logContext.logger(KafkaRaftClient.class); this.random = random; @@ -497,6 +503,7 @@ public void initialize( nodeDirectoryId, partitionState, localListeners, + localSupportedKRaftVersion, quorumConfig.electionTimeoutMs(), quorumConfig.fetchTimeoutMs(), quorumStateStore, diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index e1ad8e184e73b..9a58a55bd0a5d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; @@ -67,7 +68,8 @@ public class LeaderState implements EpochState { private final int epoch; private final long epochStartOffset; private final Set grantingVoters; - private final Endpoints endpoints; + private final Endpoints localListeners; + private final SupportedVersionRange localSupportedKRaftVersion; private final VoterSet voterSetAtEpochStart; // This field is non-empty if the voter set at epoch start came from a snapshot or log segment private final OptionalLong offsetOfVotersAtEpochStart; @@ -103,14 +105,16 @@ protected LeaderState( KRaftVersion kraftVersionAtEpochStart, Set grantingVoters, BatchAccumulator accumulator, - Endpoints endpoints, + Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, int fetchTimeoutMs, LogContext logContext ) { this.localReplicaKey = localReplicaKey; this.epoch = epoch; this.epochStartOffset = epochStartOffset; - this.endpoints = endpoints; + this.localListeners = localListeners; + this.localSupportedKRaftVersion = localSupportedKRaftVersion; for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) { boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey); @@ -256,7 +260,7 @@ public void resetUpdateVoterHandlerState( handlerState -> handlerState.completeFuture( error, new LeaderAndEpoch(OptionalInt.of(localReplicaKey.id()), epoch), - endpoints + localListeners ) ); updateVoterHandlerState = state; @@ -344,36 +348,54 @@ public void appendLeaderChangeMessageAndBootstrapRecords(long currentTimeMs) { ) { builder.appendLeaderChangeMessage(currentTimeMs, leaderChangeMessage); - offsetOfVotersAtEpochStart.ifPresent(offset -> { - if (offset == -1) { - // Latest voter set came from the bootstrap checkpoint (0-0.checkpoint) - // rewrite the voter set to the log so that it is replicated to the replicas. - if (!kraftVersionAtEpochStart.isReconfigSupported()) { - throw new IllegalStateException( - String.format( - "The bootstrap checkpoint contains a set of voters %s at %s " + - "and the KRaft version is %s", - voterSetAtEpochStart, - offset, - kraftVersionAtEpochStart - ) - ); - } else { - builder.appendKRaftVersionMessage( - currentTimeMs, - new KRaftVersionRecord() - .setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION) - .setKRaftVersion(kraftVersionAtEpochStart.featureLevel()) - ); - builder.appendVotersMessage( - currentTimeMs, - voterSetAtEpochStart.toVotersRecord( - ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION + if (kraftVersionAtEpochStart.isReconfigSupported()) { + long offset = offsetOfVotersAtEpochStart.orElseThrow( + () -> new IllegalStateException( + String.format( + "The %s is %s but there is no voter set in the log or " + + "checkpoint %s", + KRaftVersion.FEATURE_NAME, + kraftVersionAtEpochStart, + voterSetAtEpochStart + ) + ) + ); + + VoterSet.VoterNode updatedVoterNode = VoterSet.VoterNode.of( + localReplicaKey, + localListeners, + localSupportedKRaftVersion + ); + + // The leader should write the latest voters record if its local listeners are different + // or it has never written a voters record to the log before. + if (offset == -1 || voterSetAtEpochStart.voterNodeNeedsUpdate(updatedVoterNode)) { + VoterSet updatedVoterSet = voterSetAtEpochStart + .updateVoter(updatedVoterNode) + .orElseThrow( + () -> new IllegalStateException( + String.format( + "Update expected for leader node %s and voter set %s", + updatedVoterNode, + voterSetAtEpochStart + ) ) ); - } + + builder.appendKRaftVersionMessage( + currentTimeMs, + new KRaftVersionRecord() + .setVersion(kraftVersionAtEpochStart.kraftVersionRecordVersion()) + .setKRaftVersion(kraftVersionAtEpochStart.featureLevel()) + ); + builder.appendVotersMessage( + currentTimeMs, + updatedVoterSet.toVotersRecord( + kraftVersionAtEpochStart.votersRecordVersion() + ) + ); } - }); + } return builder.build(); } @@ -426,7 +448,7 @@ public int epoch() { @Override public Endpoints leaderEndpoints() { - return endpoints; + return localListeners; } Map voterStates() { diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index d84e274990e10..913a7f474d52d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -17,6 +17,7 @@ package org.apache.kafka.raft; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.internals.BatchAccumulator; @@ -84,6 +85,7 @@ public class QuorumState { private final QuorumStateStore store; private final KRaftControlRecordStateMachine partitionState; private final Endpoints localListeners; + private final SupportedVersionRange localSupportedKRaftVersion; private final Random random; private final int electionTimeoutMs; private final int fetchTimeoutMs; @@ -96,6 +98,7 @@ public QuorumState( Uuid localDirectoryId, KRaftControlRecordStateMachine partitionState, Endpoints localListeners, + SupportedVersionRange localSupportedKRaftVersion, int electionTimeoutMs, int fetchTimeoutMs, QuorumStateStore store, @@ -107,6 +110,7 @@ public QuorumState( this.localDirectoryId = localDirectoryId; this.partitionState = partitionState; this.localListeners = localListeners; + this.localSupportedKRaftVersion = localSupportedKRaftVersion; this.electionTimeoutMs = electionTimeoutMs; this.fetchTimeoutMs = fetchTimeoutMs; this.store = store; @@ -550,6 +554,7 @@ public LeaderState transitionToLeader(long epochStartOffset, BatchAccumul candidateState.grantingVoters(), accumulator, localListeners, + localSupportedKRaftVersion, fetchTimeoutMs, logContext ); diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index 0f9d83ae31aac..9dffa7cf9316e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -95,6 +95,16 @@ public Optional voterNode(int voterId, ListenerName listenerName) { .map(address -> new Node(voterId, address.getHostString(), address.getPort())); } + // TODO: write documentation + public boolean voterNodeNeedsUpdate(VoterNode updatedVoterNode) { + return Optional.ofNullable(voters.get(updatedVoterNode.voterKey().id())) + .map( + node -> node.isVoter(updatedVoterNode.voterKey()) && + !node.equals(updatedVoterNode) + ) + .orElse(false); + } + /** * Returns if the node is a voter in the set of voters. * diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index bc571550d486f..5edb2e56010ca 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.KRaftVersionTest; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriterReaderTest; @@ -1589,7 +1590,7 @@ void testUpdateVoter() throws Exception { context.deliverRequest( context.updateVoterRequest( follower, - new SupportedVersionRange((short) 0, (short) 1), + KRaftVersionTest.supportedVersionRange(), newListeners ) ); @@ -1614,6 +1615,53 @@ void testUpdateVoter() throws Exception { context.assertSentUpdateVoterResponse(Errors.NONE); } + @Test + void testLeaderUpdatesVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + HashMap listenersMap = new HashMap<>(2); + listenersMap.put( + VoterSetTest.DEFAULT_LISTENER_NAME, + InetSocketAddress.createUnresolved("localhost", 9990 + local.id()) + ); + listenersMap.put( + ListenerName.normalised("ANOTHER_LISTENER"), + InetSocketAddress.createUnresolved("localhost", 8990 + local.id()) + ); + Endpoints localListeners = Endpoints.fromInetSocketAddresses(listenersMap); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .withLocalListeners(localListeners) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + assertTrue(context.client.quorum().isVoter(follower)); + + // Establish a HWM and commit the latest voter set + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + Optional updatedVoterSet = voters.updateVoter( + VoterSet.VoterNode.of( + local, + localListeners, + KRaftVersionTest.supportedVersionRange() + ) + ); + assertEquals(updatedVoterSet, context.listener.lastCommittedVoterSet()); + } + private static void verifyVotersRecord( VoterSet expectedVoterSet, ByteBuffer recordKey, @@ -1642,7 +1690,7 @@ private int randomeReplicaId() { } private static ApiVersionsResponseData apiVersionsResponse(Errors error) { - return apiVersionsResponse(error, new SupportedVersionRange((short) 0, (short) 1)); + return apiVersionsResponse(error, KRaftVersionTest.supportedVersionRange()); } private static ApiVersionsResponseData apiVersionsResponse(Errors error, SupportedVersionRange supportedVersions) { diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 8ce18ec9d44df..1bbc27566bb07 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.KRaftVersionTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -75,6 +76,7 @@ private LeaderState newLeaderState( voters.voterIds(), accumulator, voters.listeners(localReplicaKey.id()), + KRaftVersionTest.supportedVersionRange(), fetchTimeoutMs, logContext ); @@ -120,6 +122,7 @@ public void testRequireNonNullAccumulator() { Collections.emptySet(), null, Endpoints.empty(), + KRaftVersionTest.supportedVersionRange(), fetchTimeoutMs, logContext ) diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 236b858a0462f..9d183e43b151b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.KRaftVersionTest; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -80,6 +81,7 @@ private QuorumState buildQuorumState( localDirectoryId, mockPartitionState, localId.isPresent() ? voterSet.listeners(localId.getAsInt()) : Endpoints.empty(), + KRaftVersionTest.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, store, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 91cffc6eb2242..ee966815ff5e4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.message.VoteRequestData; import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.message.VotersRecord; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiMessage; @@ -66,6 +67,7 @@ import org.apache.kafka.raft.internals.StringSerde; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.KRaftVersionTest; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotReader; @@ -95,6 +97,7 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR; import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; @@ -170,6 +173,7 @@ public static final class Builder { private List bootstrapServers = Collections.emptyList(); private boolean kip853Rpc = false; private Optional startingVoters = Optional.empty(); + private Endpoints localListeners = Endpoints.empty(); private boolean isStartingVotersStatic = false; public Builder(int localId, Set staticVoters) { @@ -349,6 +353,11 @@ Builder withBootstrapSnapshot(Optional voters) { return this; } + Builder withLocalListeners(Endpoints localListeners) { + this.localListeners = localListeners; + return this; + } + public RaftClientTestContext build() throws IOException { VoterSet startingVoters = this.startingVoters.orElseThrow(Builder::missingStartingVoterException); @@ -368,11 +377,16 @@ public RaftClientTestContext build() throws IOException { ); } - // Compute the local listeners. Only potential voters/leader need to provide the local listeners - // If the local id is not set (must be observer), the local listener can be empty. - Endpoints localListeners = localId.isPresent() ? - startingVoters.listeners(localId.getAsInt()) : - Endpoints.empty(); + /* + * Compute the local listeners if the test didn't override it. + * Only potential voters/leader need to provide the local listeners. + * If the local id is not set (must be observer), the local listener can be empty. + */ + Endpoints localListeners = this.localListeners.isEmpty() ? + localId.isPresent() ? + startingVoters.listeners(localId.getAsInt()) : + Endpoints.empty() : + this.localListeners; QuorumConfig quorumConfig = new QuorumConfig( requestTimeoutMs, @@ -397,6 +411,7 @@ public RaftClientTestContext build() throws IOException { clusterId, bootstrapServers, localListeners, + KRaftVersionTest.supportedVersionRange(), logContext, random, quorumConfig @@ -1868,6 +1883,20 @@ OptionalLong lastCommitOffset() { } } + Optional lastCommittedVoterSet() { + return commits.stream() + .flatMap(batch -> batch.controlRecords().stream()) + .flatMap(controlRecord -> { + if (controlRecord.type() == ControlRecordType.KRAFT_VOTERS) { + return Stream.of((VotersRecord) controlRecord.message()); + } else { + return Stream.empty(); + } + }) + .reduce((accumulated, current) -> current) + .map(VoterSet::fromVotersRecord); + } + OptionalInt currentClaimedEpoch() { if (localId.isPresent() && currentLeaderAndEpoch.isLeader(localId.getAsInt())) { return OptionalInt.of(currentLeaderAndEpoch.epoch()); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index a932ec72d4fe9..7c961fae2ae71 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.raft.MockLog.LogBatch; import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.internals.BatchMemoryPool; +import org.apache.kafka.server.common.KRaftVersionTest; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; @@ -791,6 +792,7 @@ void start(int nodeId) { clusterId, Collections.emptyList(), endpointsFromId(nodeId, channel.listenerName()), + KRaftVersionTest.supportedVersionRange(), logContext, random, quorumConfig diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 2085c442087cc..37a467fa709d5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.KRaftVersionTest; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; @@ -82,6 +83,7 @@ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion kraftVersio localDirectoryId, mockPartitionState, voterSet.listeners(localId), + KRaftVersionTest.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, new MockQuorumStateStore(), diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java index 06846636ea455..1491d28231015 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java @@ -18,10 +18,10 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.server.common.KRaftVersionTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -339,7 +339,7 @@ public static VoterSet.VoterNode voterNode(ReplicaKey replicaKey) { ) ) ), - new SupportedVersionRange((short) 0, (short) 0) + KRaftVersionTest.supportedVersionRange() ); } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index 975ef66607202..eb3d83ced3c6c 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -81,6 +81,24 @@ public short quorumStateVersion() { case KRAFT_VERSION_1: return (short) 1; } - throw new RuntimeException("Unknown KRaft feature level: " + this); + throw new IllegalStateException("Unsupported KRaft feature level: " + this); + } + + // TODO: write test + public short kraftVersionRecordVersion() { + switch (this) { + case KRAFT_VERSION_1: + return (short) 0; + } + throw new IllegalStateException("Unsupported KRaft feature level: " + this); + } + + // TODO: write test + public short votersRecordVersion() { + switch (this) { + case KRAFT_VERSION_1: + return (short) 0; + } + throw new IllegalStateException("Unsupported KRaft feature level: " + this); } } diff --git a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java index 4c6d417bb8b4a..04dd60271bfee 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java @@ -17,11 +17,13 @@ package org.apache.kafka.server.common; +import org.apache.kafka.common.feature.SupportedVersionRange; + import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; -class KRaftVersionTest { +public final class KRaftVersionTest { @Test public void testFeatureLevel() { for (int i = 0; i < KRaftVersion.values().length; i++) { @@ -59,4 +61,11 @@ public void testBootstrapMetadataVersion() { } } } + + public static SupportedVersionRange supportedVersionRange() { + return new SupportedVersionRange( + KRaftVersion.values()[0].featureLevel(), + KRaftVersion.values()[KRaftVersion.values().length - 1].featureLevel() + ); + } } From 55483a386fbe509258b452ec4e16a4e92587fe06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Thu, 1 Aug 2024 15:00:14 +0000 Subject: [PATCH 3/6] KAFKA-16533; Implement protocol tests --- .../main/scala/kafka/raft/RaftManager.scala | 6 +- .../raft/KafkaRaftClientReconfigTest.java | 619 +++++++++++++++++- .../apache/kafka/raft/LeaderStateTest.java | 6 +- .../apache/kafka/raft/QuorumStateTest.java | 4 +- .../kafka/raft/RaftClientTestContext.java | 5 +- .../kafka/raft/RaftEventSimulationTest.java | 4 +- .../raft/internals/KafkaRaftMetricsTest.java | 4 +- .../kafka/raft/internals/VoterSetTest.java | 4 +- .../apache/kafka/server/common/Features.java | 9 + .../kafka/server/common/KRaftVersionTest.java | 9 - 10 files changed, 630 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index caa77a9fd6d0f..25c8e8b294b47 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -45,6 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog} import org.apache.kafka.server.ProcessRole +import org.apache.kafka.server.common.Features import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.util.{FileLock, KafkaScheduler} import org.apache.kafka.server.fault.FaultHandler @@ -153,7 +154,7 @@ class KafkaRaftManager[T]( threadNamePrefixOpt: Option[String], val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]], bootstrapServers: JCollection[InetSocketAddress], - controllerListeners: Endpoints, + localListeners: Endpoints, fatalFaultHandler: FaultHandler ) extends RaftManager[T] with Logging { @@ -236,7 +237,8 @@ class KafkaRaftManager[T]( logContext, clusterId, bootstrapServers, - controllerListeners, + localListeners, + Features.KRAFT_VERSION.supportedVersionRange(), raftConfig ) } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 5edb2e56010ca..e98c8a90d2f88 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -39,8 +39,8 @@ import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; -import org.apache.kafka.server.common.KRaftVersionTest; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriterReaderTest; @@ -718,9 +718,6 @@ void testAddVoterTimeout() throws Exception { Collections.singletonMap(context.channel.listenerName(), newAddress) ); - // Show that the new voter is not currently a voter - assertFalse(context.client.quorum().isVoter(newVoter)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) @@ -1094,8 +1091,6 @@ public void testRemoveVoterIsLeader() throws Exception { context.becomeLeader(); int epoch = context.currentEpoch(); - assertTrue(context.client.quorum().isVoter(follower2)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0) @@ -1187,7 +1182,7 @@ void testRemoveVoterToNotLeader() throws Exception { .withUnknownLeader(3) .build(); - // Attempt to add new voter to the quorum + // Attempt to remove voter to the quorum context.deliverRequest(context.removeVoterRequest(follower1)); context.pollUntilResponse(); context.assertSentRemoveVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER); @@ -1382,8 +1377,6 @@ void testRemoveVoterTimedOut() throws Exception { context.becomeLeader(); int epoch = context.currentEpoch(); - assertTrue(context.client.quorum().isVoter(follower2)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0) @@ -1428,8 +1421,6 @@ void testRemoveVoterFailsWhenLosingLeadership() throws Exception { context.becomeLeader(); int epoch = context.currentEpoch(); - assertTrue(context.client.quorum().isVoter(follower2)); - // Establish a HWM and fence previous leaders context.deliverRequest( context.fetchRequest(epoch, follower1, context.log.endOffset().offset(), epoch, 0) @@ -1543,7 +1534,7 @@ void testRemoveVoterWithPendingAddVoter() throws Exception { // Attempt to add new voter to the quorum context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); - // Attempt to remove followe while AddVoter is pending + // Attempt to remove follower while AddVoter is pending context.deliverRequest(context.removeVoterRequest(follower)); context.pollUntilResponse(); context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT); @@ -1590,7 +1581,7 @@ void testUpdateVoter() throws Exception { context.deliverRequest( context.updateVoterRequest( follower, - KRaftVersionTest.supportedVersionRange(), + Features.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); @@ -1610,7 +1601,7 @@ void testUpdateVoter() throws Exception { context.pollUntilResponse(); context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); - // Expect reply for RemoveVoter request + // Expect reply for UpdateVoter request context.pollUntilResponse(); context.assertSentUpdateVoterResponse(Errors.NONE); } @@ -1656,12 +1647,608 @@ void testLeaderUpdatesVoter() throws Exception { VoterSet.VoterNode.of( local, localListeners, - KRaftVersionTest.supportedVersionRange() + Features.KRAFT_VERSION.supportedVersionRange() ) ); assertEquals(updatedVoterSet, context.listener.lastCommittedVoterSet()); } + @Test + public void testUpdateVoterInvalidClusterId() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // empty cluster id is rejected + context.deliverRequest( + context.updateVoterRequest( + "", + follower, + epoch, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID); + + // invalid cluster id is rejected + context.deliverRequest( + context.updateVoterRequest( + "invalid-uuid", + follower, + epoch, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID); + } + + @Test + void testUpdateVoterOldEpoch() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + context.deliverRequest( + context.updateVoterRequest( + context.clusterId, + follower, + epoch - 1, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.FENCED_LEADER_EPOCH); + } + + @Test + void testUpdateVoterNewEpoch() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + context.deliverRequest( + context.updateVoterRequest( + context.clusterId, + follower, + epoch + 1, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.UNKNOWN_LEADER_EPOCH); + } + + @Test + void testUpdateVoterToNotLeader() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + // Attempt to uodate voter in the quorum + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + Endpoints.empty() + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER); + } + + @Test + void testUpdateVoterWithPendingUpdateVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // Attempt to update voter again + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + // TODO: Mentioned that a Jira is going to fix this + @Test + void testUpdateVoterWithKraftVersion0() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withStaticVoters(voters) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.UNSUPPORTED_VERSION); + } + + @Test + void testUpdateVoterWithNoneVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update a replica with the same id as follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + replicaKey(follower.id(), true), + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.VOTER_NOT_FOUND); + } + + @Test + void testUpdateVoterWithNoneVoterId() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update a replica with the same id as follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + 1 + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + 1 + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + ReplicaKey.of(follower.id() + 1, follower.directoryId().get()), + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.VOTER_NOT_FOUND); + } + + @Test + void testUpdateVoterTimedOut() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // Wait for request timeout without sending a FETCH request to timeout the update voter RPC + context.time.sleep(context.requestTimeoutMs()); + + // Expect a timeout error + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testUpdateVoterFailsWhenLosingLeadership() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Handle the update voter request + context.client.poll(); + // Append the VotersRecord to the log + context.client.poll(); + + // Leader completes the UpdateVoter RPC when resigning + context.client.resign(epoch); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER); + } + + @Test + void testUpdateVoterWithPendingAddVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newVoterAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newVoterListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newVoterAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newVoterListeners)); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testRemoveVoterWithPendingUpdateVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update the follower + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + follower.id() + ); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 8990 + follower.id() + ); + HashMap listenersMap = new HashMap<>(2); + listenersMap.put(context.channel.listenerName(), defaultAddress); + listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); + Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + context.deliverRequest( + context.updateVoterRequest( + follower, + Features.KRAFT_VERSION.supportedVersionRange(), + newListeners + ) + ); + + // Attempt to remove follower while UpdateVoter is pending + context.deliverRequest(context.removeVoterRequest(follower)); + context.pollUntilResponse(); + context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT); + } + private static void verifyVotersRecord( VoterSet expectedVoterSet, ByteBuffer recordKey, @@ -1690,7 +2277,7 @@ private int randomeReplicaId() { } private static ApiVersionsResponseData apiVersionsResponse(Errors error) { - return apiVersionsResponse(error, KRaftVersionTest.supportedVersionRange()); + return apiVersionsResponse(error, Features.KRAFT_VERSION.supportedVersionRange()); } private static ApiVersionsResponseData apiVersionsResponse(Errors error, SupportedVersionRange supportedVersions) { diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 1bbc27566bb07..04ff1ed29c278 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -23,8 +23,8 @@ import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; -import org.apache.kafka.server.common.KRaftVersionTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -76,7 +76,7 @@ private LeaderState newLeaderState( voters.voterIds(), accumulator, voters.listeners(localReplicaKey.id()), - KRaftVersionTest.supportedVersionRange(), + Features.KRAFT_VERSION.supportedVersionRange(), fetchTimeoutMs, logContext ); @@ -122,7 +122,7 @@ public void testRequireNonNullAccumulator() { Collections.emptySet(), null, Endpoints.empty(), - KRaftVersionTest.supportedVersionRange(), + Features.KRAFT_VERSION.supportedVersionRange(), fetchTimeoutMs, logContext ) diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 9d183e43b151b..77f7234d3377e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -25,8 +25,8 @@ import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; -import org.apache.kafka.server.common.KRaftVersionTest; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -81,7 +81,7 @@ private QuorumState buildQuorumState( localDirectoryId, mockPartitionState, localId.isPresent() ? voterSet.listeners(localId.getAsInt()) : Endpoints.empty(), - KRaftVersionTest.supportedVersionRange(), + Features.KRAFT_VERSION.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, store, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index ee966815ff5e4..40bda906fe3fa 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -66,8 +66,8 @@ import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.StringSerde; import org.apache.kafka.raft.internals.VoterSet; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; -import org.apache.kafka.server.common.KRaftVersionTest; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotReader; @@ -411,7 +411,7 @@ public RaftClientTestContext build() throws IOException { clusterId, bootstrapServers, localListeners, - KRaftVersionTest.supportedVersionRange(), + Features.KRAFT_VERSION.supportedVersionRange(), logContext, random, quorumConfig @@ -1173,6 +1173,7 @@ UpdateRaftVoterResponseData assertSentUpdateVoterResponse(Errors error) { RaftResponse.Outbound response = sentResponses.get(0); assertInstanceOf(UpdateRaftVoterResponseData.class, response.data()); + // TODO: check the leader id, leader epocha and leader endpoint UpdateRaftVoterResponseData updateVoterResponse = (UpdateRaftVoterResponseData) response.data(); assertEquals(error, Errors.forCode(updateVoterResponse.errorCode())); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 7c961fae2ae71..9b1a325dd2f2b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.raft.MockLog.LogBatch; import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.internals.BatchMemoryPool; -import org.apache.kafka.server.common.KRaftVersionTest; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; @@ -792,7 +792,7 @@ void start(int nodeId) { clusterId, Collections.emptyList(), endpointsFromId(nodeId, channel.listenerName()), - KRaftVersionTest.supportedVersionRange(), + Features.KRAFT_VERSION.supportedVersionRange(), logContext, random, quorumConfig diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 37a467fa709d5..bd65bbe993f22 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -26,8 +26,8 @@ import org.apache.kafka.raft.MockQuorumStateStore; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.KRaftVersion; -import org.apache.kafka.server.common.KRaftVersionTest; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; @@ -83,7 +83,7 @@ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion kraftVersio localDirectoryId, mockPartitionState, voterSet.listeners(localId), - KRaftVersionTest.supportedVersionRange(), + Features.KRAFT_VERSION.supportedVersionRange(), electionTimeoutMs, fetchTimeoutMs, new MockQuorumStateStore(), diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java index 1491d28231015..3bf468fdf6522 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.Endpoints; -import org.apache.kafka.server.common.KRaftVersionTest; +import org.apache.kafka.server.common.Features; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -339,7 +339,7 @@ public static VoterSet.VoterNode voterNode(ReplicaKey replicaKey) { ) ) ), - KRaftVersionTest.supportedVersionRange() + Features.KRAFT_VERSION.supportedVersionRange() ); } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Features.java b/server-common/src/main/java/org/apache/kafka/server/common/Features.java index f10b95dd6966d..15269e5183681 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.server.common; +import org.apache.kafka.common.feature.SupportedVersionRange; + import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -86,6 +88,13 @@ public short latestTesting() { return featureVersions[featureVersions.length - 1].featureLevel(); } + public SupportedVersionRange supportedVersionRange() { + return new SupportedVersionRange( + minimumProduction(), + latestTesting() + ); + } + /** * Creates a FeatureVersion from a level. * diff --git a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java index 04dd60271bfee..de82bf45f3d62 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java @@ -17,8 +17,6 @@ package org.apache.kafka.server.common; -import org.apache.kafka.common.feature.SupportedVersionRange; - import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -61,11 +59,4 @@ public void testBootstrapMetadataVersion() { } } } - - public static SupportedVersionRange supportedVersionRange() { - return new SupportedVersionRange( - KRaftVersion.values()[0].featureLevel(), - KRaftVersion.values()[KRaftVersion.values().length - 1].featureLevel() - ); - } } From 1c6f426c7680ea4d249123bc237f8fd186096d87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Fri, 2 Aug 2024 00:17:26 +0000 Subject: [PATCH 4/6] KAFKA-16533; Improve update voter respoons testing --- .../org/apache/kafka/raft/LeaderState.java | 1 - .../java/org/apache/kafka/raft/RaftUtil.java | 4 +- .../raft/internals/UpdateVoterHandler.java | 2 +- .../apache/kafka/raft/internals/VoterSet.java | 15 +++- .../raft/KafkaRaftClientReconfigTest.java | 87 +++++++++++++++---- .../kafka/raft/RaftClientTestContext.java | 25 +++++- .../kafka/raft/internals/VoterSetTest.java | 32 +++++++ .../kafka/server/common/KRaftVersion.java | 2 - .../kafka/server/common/KRaftVersionTest.java | 51 +++++++++++ 9 files changed, 191 insertions(+), 28 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 9a58a55bd0a5d..ecc0e68a0f885 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -53,7 +53,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -// TODO: the leader needs to update itself /** * In the context of LeaderState, an acknowledged voter means one who has acknowledged the current leader by either * responding to a `BeginQuorumEpoch` request from the leader or by beginning to send `Fetch` requests. diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index a9d3a430856d7..224da441a9c87 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -545,7 +545,7 @@ public static AddRaftVoterResponseData addVoterResponse( .setErrorCode(error.code()) .setErrorMessage(errorMessage); } - + public static RemoveRaftVoterRequestData removeVoterRequest( String clusterId, ReplicaKey voter @@ -567,7 +567,6 @@ public static RemoveRaftVoterResponseData removeVoterResponse( .setErrorMessage(errorMessage); } - // TODO: add tests public static UpdateRaftVoterRequestData updateVoterRequest( String clusterId, ReplicaKey voter, @@ -589,7 +588,6 @@ public static UpdateRaftVoterRequestData updateVoterRequest( return request; } - // TODO: add tests public static UpdateRaftVoterResponseData updateVoterResponse( Errors error, ListenerName listenerName, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java index f5b45c2c0d45e..4a73be7eda4c1 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java @@ -109,8 +109,8 @@ public CompletableFuture handleUpdateVoterRequest( ); } + // KAFKA-16538 will implement the case when the kraft.version is 0 // Check that the cluster supports kraft.version >= 1 - // TODO: File a jira to handle the kraft.version == 0 KRaftVersion kraftVersion = partitionState.lastKraftVersion(); if (!kraftVersion.isReconfigSupported()) { return CompletableFuture.completedFuture( diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index 9dffa7cf9316e..df3f0dfbd082a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -95,7 +95,12 @@ public Optional voterNode(int voterId, ListenerName listenerName) { .map(address -> new Node(voterId, address.getHostString(), address.getPort())); } - // TODO: write documentation + /** + * Return true the provided voter node is a voter and would cause a change in the voter set. + * + * @param updatedVoterNode the updated voter node + * @return true if the updated voter node is different than the node in the voter set; otherwise false. + */ public boolean voterNodeNeedsUpdate(VoterNode updatedVoterNode) { return Optional.ofNullable(voters.get(updatedVoterNode.voterKey().id())) .map( @@ -219,8 +224,12 @@ public Optional removeVoter(ReplicaKey voterKey) { return Optional.empty(); } - // TODO: write tests - // TODO: write documentation + /** + * Updates a voter in the voter set. + * + * @param voter the updated voter + * @return a new voter set if the voter was updated, otherwise {@code Optional.empty()} + */ public Optional updateVoter(VoterNode voter) { VoterNode oldVoter = voters.get(voter.voterKey().id()); if (oldVoter != null && oldVoter.isVoter(voter.voterKey())) { diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index e98c8a90d2f88..1b34ba0433c60 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -1603,7 +1603,11 @@ void testUpdateVoter() throws Exception { // Expect reply for UpdateVoter request context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.NONE); + context.assertSentUpdateVoterResponse( + Errors.NONE, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -1680,7 +1684,11 @@ public void testUpdateVoterInvalidClusterId() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID); + context.assertSentUpdateVoterResponse( + Errors.INCONSISTENT_CLUSTER_ID, + OptionalInt.of(local.id()), + epoch + ); // invalid cluster id is rejected context.deliverRequest( @@ -1693,7 +1701,11 @@ public void testUpdateVoterInvalidClusterId() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.INCONSISTENT_CLUSTER_ID); + context.assertSentUpdateVoterResponse( + Errors.INCONSISTENT_CLUSTER_ID, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -1722,7 +1734,11 @@ void testUpdateVoterOldEpoch() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.FENCED_LEADER_EPOCH); + context.assertSentUpdateVoterResponse( + Errors.FENCED_LEADER_EPOCH, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -1751,7 +1767,11 @@ void testUpdateVoterNewEpoch() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.UNKNOWN_LEADER_EPOCH); + context.assertSentUpdateVoterResponse( + Errors.UNKNOWN_LEADER_EPOCH, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -1776,7 +1796,11 @@ void testUpdateVoterToNotLeader() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER); + context.assertSentUpdateVoterResponse( + Errors.NOT_LEADER_OR_FOLLOWER, + OptionalInt.empty(), + context.currentEpoch() + ); } @Test @@ -1837,7 +1861,11 @@ void testUpdateVoterWithPendingUpdateVoter() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -1854,6 +1882,7 @@ void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception { .build(); context.becomeLeader(); + int epoch = context.currentEpoch(); // Attempt to update the follower InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved( @@ -1876,10 +1905,14 @@ void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); } - // TODO: Mentioned that a Jira is going to fix this + // KAFKA-16538 is going to allow UpdateVoter RPC when the kraft.version is 0 @Test void testUpdateVoterWithKraftVersion0() throws Exception { ReplicaKey local = replicaKey(randomeReplicaId(), true); @@ -1924,7 +1957,11 @@ void testUpdateVoterWithKraftVersion0() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.UNSUPPORTED_VERSION); + context.assertSentUpdateVoterResponse( + Errors.UNSUPPORTED_VERSION, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -1971,7 +2008,11 @@ void testUpdateVoterWithNoneVoter() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.VOTER_NOT_FOUND); + context.assertSentUpdateVoterResponse( + Errors.VOTER_NOT_FOUND, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -2018,7 +2059,11 @@ void testUpdateVoterWithNoneVoterId() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.VOTER_NOT_FOUND); + context.assertSentUpdateVoterResponse( + Errors.VOTER_NOT_FOUND, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -2075,7 +2120,11 @@ void testUpdateVoterTimedOut() throws Exception { // Expect a timeout error context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -2130,7 +2179,11 @@ void testUpdateVoterFailsWhenLosingLeadership() throws Exception { // Leader completes the UpdateVoter RPC when resigning context.client.resign(epoch); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER); + context.assertSentUpdateVoterResponse( + Errors.NOT_LEADER_OR_FOLLOWER, + OptionalInt.of(local.id()), + epoch + ); } @Test @@ -2196,7 +2249,11 @@ void testUpdateVoterWithPendingAddVoter() throws Exception { ) ); context.pollUntilResponse(); - context.assertSentUpdateVoterResponse(Errors.REQUEST_TIMED_OUT); + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 40bda906fe3fa..f69ee5c1016d4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -1166,17 +1166,36 @@ RemoveRaftVoterResponseData assertSentRemoveVoterResponse(Errors error) { return removeVoterResponse; } - UpdateRaftVoterResponseData assertSentUpdateVoterResponse(Errors error) { + UpdateRaftVoterResponseData assertSentUpdateVoterResponse( + Errors error, + OptionalInt leaderId, + int epoch + ) { List sentResponses = drainSentResponses(ApiKeys.UPDATE_RAFT_VOTER); assertEquals(1, sentResponses.size()); RaftResponse.Outbound response = sentResponses.get(0); assertInstanceOf(UpdateRaftVoterResponseData.class, response.data()); - // TODO: check the leader id, leader epocha and leader endpoint UpdateRaftVoterResponseData updateVoterResponse = (UpdateRaftVoterResponseData) response.data(); assertEquals(error, Errors.forCode(updateVoterResponse.errorCode())); - + assertEquals(leaderId.orElse(-1), updateVoterResponse.currentLeader().leaderId()); + assertEquals(epoch, updateVoterResponse.currentLeader().leaderEpoch()); + + if (updateVoterResponse.currentLeader().leaderId() >= 0) { + int id = updateVoterResponse.currentLeader().leaderId(); + Endpoints expectedLeaderEndpoints = startingVoters.listeners(id); + Endpoints responseEndpoints = Endpoints.fromInetSocketAddresses( + Collections.singletonMap( + channel.listenerName(), + InetSocketAddress.createUnresolved( + updateVoterResponse.currentLeader().host(), + updateVoterResponse.currentLeader().port() + ) + ) + ); + assertEquals(expectedLeaderEndpoints, responseEndpoints); + } return updateVoterResponse; } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java index 3bf468fdf6522..a4f2354d690f0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.Endpoints; @@ -115,6 +116,37 @@ void testRemoveVoter() { ); } + @Test + void testUpdateVoter() { + Map aVoterMap = voterMap(IntStream.of(1, 2, 3), true); + VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + + assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(4, true))); + assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(4, true))); + assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(3, true))); + assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(3, true))); + + VoterSet.VoterNode voter3 = aVoterMap.get(3); + VoterSet.VoterNode newVoter3 = VoterSet.VoterNode.of( + voter3.voterKey(), + Endpoints.fromInetSocketAddresses( + Collections.singletonMap( + ListenerName.normalised("ABC"), + InetSocketAddress.createUnresolved("abc", 1234) + ) + ), + new SupportedVersionRange((short) 1, (short) 1) + ); + aVoterMap.put(3, newVoter3); + + assertTrue(voterSet.voterNodeNeedsUpdate(newVoter3)); + assertEquals( + Optional.of(new VoterSet(new HashMap<>(aVoterMap))), + voterSet.updateVoter(newVoter3) + ); + } + + @Test void testCannotRemoveToEmptyVoterSet() { Map aVoterMap = voterMap(IntStream.of(1), true); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index eb3d83ced3c6c..f3bf97feb1082 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -84,7 +84,6 @@ public short quorumStateVersion() { throw new IllegalStateException("Unsupported KRaft feature level: " + this); } - // TODO: write test public short kraftVersionRecordVersion() { switch (this) { case KRAFT_VERSION_1: @@ -93,7 +92,6 @@ public short kraftVersionRecordVersion() { throw new IllegalStateException("Unsupported KRaft feature level: " + this); } - // TODO: write test public short votersRecordVersion() { switch (this) { case KRAFT_VERSION_1: diff --git a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java index de82bf45f3d62..d8309be01b5c8 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java @@ -17,9 +17,12 @@ package org.apache.kafka.server.common; +import org.apache.kafka.common.record.ControlRecordUtils; + import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public final class KRaftVersionTest { @Test @@ -59,4 +62,52 @@ public void testBootstrapMetadataVersion() { } } } + + @Test + public void testKraftVersionRecordVersion() { + for (KRaftVersion kraftVersion : KRaftVersion.values()) { + switch (kraftVersion) { + case KRAFT_VERSION_0: + assertThrows( + IllegalStateException.class, + () -> kraftVersion.kraftVersionRecordVersion() + ); + break; + + case KRAFT_VERSION_1: + assertEquals( + ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION, + kraftVersion.kraftVersionRecordVersion() + ); + break; + + default: + throw new RuntimeException("Unsupported value " + kraftVersion); + } + } + } + + @Test + public void tesVotersRecordVersion() { + for (KRaftVersion kraftVersion : KRaftVersion.values()) { + switch (kraftVersion) { + case KRAFT_VERSION_0: + assertThrows( + IllegalStateException.class, + () -> kraftVersion.votersRecordVersion() + ); + break; + + case KRAFT_VERSION_1: + assertEquals( + ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION, + kraftVersion.votersRecordVersion() + ); + break; + + default: + throw new RuntimeException("Unsupported value " + kraftVersion); + } + } + } } From 392f5a0119eedc3cad6851b670249281949e9820 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Fri, 2 Aug 2024 00:57:13 +0000 Subject: [PATCH 5/6] KAFKA-16533; Improve documentation --- .../raft/internals/RemoveVoterHandler.java | 2 +- .../raft/internals/UpdateVoterHandler.java | 28 ++++++++++--------- .../kafka/raft/KafkaRaftClientTest.java | 7 ++++- .../kafka/raft/RaftClientTestContext.java | 17 +++++++++-- 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java index 13aaf478808ec..3a62d383dda19 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java @@ -42,7 +42,7 @@ * 2. Check that the cluster supports kraft.version 1, otherwise return the UNSUPPORTED_VERSION error. * 3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error. * 4. Append the updated VotersRecord to the log. The KRaft internal listener will read this - * uncommitted record from the log and add the new voter to the set of voters. + * uncommitted record from the log and remove the voter from the set of voters. * 5. Wait for the VotersRecord to commit using the majority of the new set of voters. Return a * REQUEST_TIMED_OUT error if it doesn't commit in time. * 6. Send the RemoveVoter successful response to the client. diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java index 4a73be7eda4c1..152de68f3fcd3 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java @@ -34,20 +34,22 @@ import java.util.concurrent.CompletableFuture; /** - * TODO: document this. + * This type implements the protocol for updating a voter from a KRaft partition. * - * 1. Wait until there are no uncommitted add or remove voter records. Note that the implementation - * may just return a REQUEST_TIMED_OUT error if there are pending operations. - * 2. Wait for the LeaderChangeMessage control record from the current epoch to get committed. Note - * that the implementation may just return a REQUEST_TIMED_OUT error if there are pending operations. - * 3. Check that the updated voter supports the current kraft.version. - * 4. If the replica id tracked doesn't have a replica directory id, update it with the replica - * directory id provided in the request. - * 5. Append the updated VotersRecord to the log if the finalized kraft.version is greater than 0. - * 6. The KRaft internal listener will read this record from the log and update the voter's - * information. This includes updating the endpoint used by the KRaft NetworkClient. - * 7. Wait for the VotersRecord to commit using the majority of the new set of voters. - * 8. Send the UpdateVoter response to the client. + * 1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known, + * otherwise return the REQUEST_TIMED_OUT error. + * 2. Check that the cluster supports kraft.version 1, otherwise return the UNSUPPORTED_VERSION error. + * 3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error. + * 4. Check that the updated voter still supports the currently finalized kraft.version, otherwise + * return the INVALID_REQUEST error. + * 5. Check that the updated voter is still listening on the default listener. + * 6. Append the updated VotersRecord to the log. The KRaft internal listener will read this + * uncommitted record from the log and update the voter in the set of voters. + * 7. Wait for the VotersRecord to commit using the majority of the voters. Return a + * REQUEST_TIMED_OUT error if it doesn't commit in time. + * 8. Send the UpdateVoter successful response to the voter. + * + * KAFKA-16538 is going to add support for handling this RPC when the kraft.version is 0. */ public final class UpdateVoterHandler { private final OptionalInt localId; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 419618c00ffe4..e74e32fb79adb 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -2730,7 +2730,12 @@ public void testEndQuorumEpochSentBasedOnFetchOffset(boolean withKip853Rpc) thro context.collectEndQuorumRequests( epoch, Utils.mkSet(closeFollower.id(), laggingFollower.id()), - Optional.of(Arrays.asList(closeFollower.id(), laggingFollower.id())) + Optional.of( + Arrays.asList( + replicaKey(closeFollower.id(), false), + replicaKey(laggingFollower.id(), false) + ) + ) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index f69ee5c1016d4..d7097d6b7f5ba 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -1202,11 +1202,14 @@ UpdateRaftVoterResponseData assertSentUpdateVoterResponse( List collectEndQuorumRequests( int epoch, Set destinationIdSet, - // TODO: preferredSuccessors should be a list of replica keys - Optional> preferredSuccessorsOpt + Optional> preferredCandidates ) { List endQuorumRequests = new ArrayList<>(); Set collectedDestinationIdSet = new HashSet<>(); + + Optional> preferredSuccessorsOpt = preferredCandidates + .map(list -> list.stream().map(ReplicaKey::id).collect(Collectors.toList())); + for (RaftRequest.Outbound raftMessage : channel.drainSendQueue()) { if (raftMessage.data() instanceof EndQuorumEpochRequestData) { EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) raftMessage.data(); @@ -1219,6 +1222,16 @@ List collectEndQuorumRequests( preferredSuccessorsOpt.ifPresent(preferredSuccessors -> assertEquals(preferredSuccessors, partitionRequest.preferredSuccessors()) ); + preferredCandidates.ifPresent(preferred -> + assertEquals( + preferred, + partitionRequest + .preferredCandidates() + .stream() + .map(replica -> ReplicaKey.of(replica.candidateId(), replica.candidateDirectoryId())) + .collect(Collectors.toList()) + ) + ); collectedDestinationIdSet.add(raftMessage.destination().id()); endQuorumRequests.add(raftMessage); From e28c53727a9840e163cb3bb86a11dfadbac66ef0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Mon, 5 Aug 2024 12:44:10 +0000 Subject: [PATCH 6/6] KAFKA-16533; Fix update voter request/response test --- .../common/requests/RequestResponseTest.java | 16 +++++++++++---- .../apache/kafka/raft/KafkaRaftClient.java | 20 +++++++++---------- .../apache/kafka/raft/internals/VoterSet.java | 2 +- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index ac044a5a5363e..97abcd84e5a76 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -1365,8 +1365,16 @@ private UpdateRaftVoterRequest createUpdateRaftVoterRequest(short version) { } private UpdateRaftVoterResponse createUpdateRaftVoterResponse() { - return new UpdateRaftVoterResponse(new UpdateRaftVoterResponseData(). - setErrorCode((short) 0)); + return new UpdateRaftVoterResponse( + new UpdateRaftVoterResponseData() + .setErrorCode((short) 0) + .setCurrentLeader(new UpdateRaftVoterResponseData.CurrentLeader() + .setLeaderId(1) + .setLeaderEpoch(2) + .setHost("localhost") + .setPort(9999) + ) + ); } private DescribeTopicPartitionsResponse createDescribeTopicPartitionsResponse() { @@ -3020,7 +3028,7 @@ private AddPartitionsToTxnRequest createAddPartitionsToTxnRequest(short version) .setName("topic") .setPartitions(Collections.singletonList(73))).iterator()))) .iterator()); - return AddPartitionsToTxnRequest.Builder.forBroker(transactions).build(version); + return AddPartitionsToTxnRequest.Builder.forBroker(transactions).build(version); } } @@ -3029,7 +3037,7 @@ private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse(short versio AddPartitionsToTxnResponseData.AddPartitionsToTxnResult result = AddPartitionsToTxnResponse.resultForTransaction( txnId, Collections.singletonMap(new TopicPartition("t", 0), Errors.NONE)); AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData().setThrottleTimeMs(0); - + if (version < 4) { data.setResultsByTopicV3AndBelow(result.topicResults()); } else { diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index a6549550bee33..ed11e45b5207b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -556,7 +556,7 @@ public void initialize( logContext ); - // Specialized remove voter handler + // Specialized update voter handler this.updateVoterHandler = new UpdateVoterHandler( nodeId, partitionState, @@ -2069,10 +2069,10 @@ private CompletableFuture handleAddVoterRequest( ); } - Optional leaderValidation = validateLeaderOnlyRequest(quorum.epoch()); - if (leaderValidation.isPresent()) { + Optional leaderValidationError = validateLeaderOnlyRequest(quorum.epoch()); + if (leaderValidationError.isPresent()) { return completedFuture( - new AddRaftVoterResponseData().setErrorCode(leaderValidation.get().code()) + new AddRaftVoterResponseData().setErrorCode(leaderValidationError.get().code()) ); } @@ -2152,10 +2152,10 @@ private CompletableFuture handleRemoveVoterRequest( ); } - Optional leaderValidation = validateLeaderOnlyRequest(quorum.epoch()); - if (leaderValidation.isPresent()) { + Optional leaderValidationError = validateLeaderOnlyRequest(quorum.epoch()); + if (leaderValidationError.isPresent()) { return completedFuture( - new RemoveRaftVoterResponseData().setErrorCode(leaderValidation.get().code()) + new RemoveRaftVoterResponseData().setErrorCode(leaderValidationError.get().code()) ); } @@ -2192,11 +2192,11 @@ private CompletableFuture handleUpdateVoterRequest( ); } - Optional leaderValidation = validateLeaderOnlyRequest(data.currentLeaderEpoch()); - if (leaderValidation.isPresent()) { + Optional leaderValidationError = validateLeaderOnlyRequest(data.currentLeaderEpoch()); + if (leaderValidationError.isPresent()) { return completedFuture( RaftUtil.updateVoterResponse( - leaderValidation.get(), + leaderValidationError.get(), requestMetadata.listenerName(), quorum.leaderAndEpoch(), quorum.leaderEndpoints() diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index df3f0dfbd082a..dfbe4e4942025 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -96,7 +96,7 @@ public Optional voterNode(int voterId, ListenerName listenerName) { } /** - * Return true the provided voter node is a voter and would cause a change in the voter set. + * Return true if the provided voter node is a voter and would cause a change in the voter set. * * @param updatedVoterNode the updated voter node * @return true if the updated voter node is different than the node in the voter set; otherwise false.