Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16533; Update voter handling #16735

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+" },
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
"about": "The current leader epoch of the partition, -1 for unknown leader epoch" },
{ "name": "VoterId", "type": "int32", "versions": "0+",
"about": "The replica id of the voter getting updated in the topic partition" },
{ "name": "VoterDirectoryId", "type": "uuid", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error" }
"about": "The error code, or 0 if there was no error" },
{ "name": "CurrentLeader", "type": "CurrentLeader", "versions": "0+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like RequestResponseTest.createUpdateRaftVoterResponse has not been updated to set these new fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

"taggedVersions": "0+", "tag": 0, "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1", "entityType" : "brokerId",
"about": "The replica id of the current leader or -1 if the leader is unknown" },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "The latest known leader epoch" },
{ "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname" },
{ "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port" }
]
}
]
}
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Features
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
Expand Down Expand Up @@ -153,7 +154,7 @@ class KafkaRaftManager[T](
threadNamePrefixOpt: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
bootstrapServers: JCollection[InetSocketAddress],
controllerListeners: Endpoints,
localListeners: Endpoints,
fatalFaultHandler: FaultHandler
) extends RaftManager[T] with Logging {

Expand Down Expand Up @@ -236,7 +237,8 @@ class KafkaRaftManager[T](
logContext,
clusterId,
bootstrapServers,
controllerListeners,
localListeners,
Features.KRAFT_VERSION.supportedVersionRange(),
raftConfig
)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,6 @@ class ControllerApis(

def handleUpdateRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
throw new UnsupportedVersionException("handleUpdateRaftVoter is not supported yet.")
handleRaftRequest(request, response => new UpdateRaftVoterResponse(response.asInstanceOf[UpdateRaftVoterResponseData]))
}
}
28 changes: 28 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/Endpoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.network.ListenerName;
Expand Down Expand Up @@ -131,6 +132,21 @@ public DescribeQuorumResponseData.ListenerCollection toDescribeQuorumResponseLis
return listeners;
}

public UpdateRaftVoterRequestData.ListenerCollection toUpdateVoterRequest() {
UpdateRaftVoterRequestData.ListenerCollection listeners =
new UpdateRaftVoterRequestData.ListenerCollection(endpoints.size());
for (Map.Entry<ListenerName, InetSocketAddress> entry : endpoints.entrySet()) {
listeners.add(
new UpdateRaftVoterRequestData.Listener()
.setName(entry.getKey().value())
.setHost(entry.getValue().getHostString())
.setPort(entry.getValue().getPort())
);
}

return listeners;
}

private static final Endpoints EMPTY = new Endpoints(Collections.emptyMap());
public static Endpoints empty() {
return EMPTY;
Expand Down Expand Up @@ -272,4 +288,16 @@ public static Endpoints fromAddVoterRequest(AddRaftVoterRequestData.ListenerColl

return new Endpoints(listeners);
}

public static Endpoints fromUpdateVoterRequest(UpdateRaftVoterRequestData.ListenerCollection endpoints) {
Map<ListenerName, InetSocketAddress> listeners = new HashMap<>(endpoints.size());
for (UpdateRaftVoterRequestData.Listener endpoint : endpoints) {
listeners.put(
ListenerName.normalised(endpoint.name()),
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
);
}

return new Endpoints(listeners);
}
}
106 changes: 105 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AddRaftVoterResponseData;
Expand All @@ -39,6 +40,8 @@
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -76,6 +79,7 @@
import org.apache.kafka.raft.internals.RemoveVoterHandler;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.raft.internals.UpdateVoterHandler;
import org.apache.kafka.raft.internals.VoterSet;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
Expand Down Expand Up @@ -171,6 +175,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
private final int fetchMaxWaitMs;
private final String clusterId;
private final Endpoints localListeners;
private final SupportedVersionRange localSupportedKRaftVersion;
private final NetworkChannel channel;
private final ReplicatedLog log;
private final Random random;
Expand Down Expand Up @@ -207,6 +212,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
// Specialized handlers
private volatile AddVoterHandler addVoterHandler;
private volatile RemoveVoterHandler removeVoterHandler;
private volatile UpdateVoterHandler updateVoterHandler;

/**
* Create a new instance.
Expand All @@ -226,6 +232,7 @@ public KafkaRaftClient(
String clusterId,
Collection<InetSocketAddress> bootstrapServers,
Endpoints localListeners,
SupportedVersionRange localSupportedKRaftVersion,
QuorumConfig quorumConfig
) {
this(
Expand All @@ -242,6 +249,7 @@ public KafkaRaftClient(
clusterId,
bootstrapServers,
localListeners,
localSupportedKRaftVersion,
logContext,
new Random(),
quorumConfig
Expand All @@ -262,6 +270,7 @@ public KafkaRaftClient(
String clusterId,
Collection<InetSocketAddress> bootstrapServers,
Endpoints localListeners,
SupportedVersionRange localSupportedKRaftVersion,
LogContext logContext,
Random random,
QuorumConfig quorumConfig
Expand All @@ -279,6 +288,7 @@ public KafkaRaftClient(
this.time = time;
this.clusterId = clusterId;
this.localListeners = localListeners;
this.localSupportedKRaftVersion = localSupportedKRaftVersion;
this.fetchMaxWaitMs = fetchMaxWaitMs;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
Expand Down Expand Up @@ -349,6 +359,7 @@ private void onUpdateLeaderHighWatermark(
// add or remove voter request that need to be completed
addVoterHandler.highWatermarkUpdated(state);
removeVoterHandler.highWatermarkUpdated(state);
updateVoterHandler.highWatermarkUpdated(state);

// After updating the high watermark, we first clear the append
// purgatory so that we have an opportunity to route the pending
Expand Down Expand Up @@ -492,6 +503,7 @@ public void initialize(
nodeDirectoryId,
partitionState,
localListeners,
localSupportedKRaftVersion,
quorumConfig.electionTimeoutMs(),
quorumConfig.fetchTimeoutMs(),
quorumStateStore,
Expand Down Expand Up @@ -543,6 +555,15 @@ public void initialize(
quorumConfig.requestTimeoutMs(),
logContext
);

// Specialized remove voter handler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: specialized update voter handler

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.updateVoterHandler = new UpdateVoterHandler(
nodeId,
partitionState,
channel.listenerName(),
time,
quorumConfig.requestTimeoutMs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be passed in vs imported in UpdateVoterHandler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the question. In general inputs to a type or method should be limited to what they require. This avoid passing a configuration object with many methods where only one is used or required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, you can ignore this comment. I had mistakenly thought it would be possible to import the underlying config in UpdateVoterHandler

);
}

@Override
Expand Down Expand Up @@ -2071,7 +2092,8 @@ private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage(
String.format(
"Add voter request didn't include the default listener: %s",
"Add voter request didn't include the endpoint (%s) for the default listener %s",
newVoterEndpoints,
channel.listenerName()
)
)
Expand Down Expand Up @@ -2153,6 +2175,84 @@ private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(
);
}

private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
RaftRequest.Inbound requestMetadata,
long currentTimeMs
) {
UpdateRaftVoterRequestData data = (UpdateRaftVoterRequestData) requestMetadata.data();

if (!hasValidClusterId(data.clusterId())) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INCONSISTENT_CLUSTER_ID,
requestMetadata.listenerName(),
quorum.leaderAndEpoch(),
quorum.leaderEndpoints()
)
);
}

Optional<Errors> leaderValidation = validateLeaderOnlyRequest(data.currentLeaderEpoch());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know this is the convention handleFetchSnapshotRequest uses too, but could we rename to leaderValidationError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the variable name? Replace "leaderValidation" with "leaderValidationError"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (leaderValidation.isPresent()) {
return completedFuture(
RaftUtil.updateVoterResponse(
leaderValidation.get(),
requestMetadata.listenerName(),
quorum.leaderAndEpoch(),
quorum.leaderEndpoints()
)
);
}

Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data);
if (!voter.isPresent() || !voter.get().directoryId().isPresent()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
requestMetadata.listenerName(),
quorum.leaderAndEpoch(),
quorum.leaderEndpoints()
)
);
}

Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do want to catch any illegalArgumentExceptions that might be thrown from this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Catching IllegalArgumentExceptions is not enough. Let me file an issue to audit all of the validation of the new RPCs: https://issues.apache.org/jira/browse/KAFKA-17264

if (!voterEndpoints.address(channel.listenerName()).isPresent()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we might benefit from the update voter response schema including an error message field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this before. The ErrorMessage field is a human readable message meant to be displayed to the end user. This is why the admin RPCs (AddVoter and RemoveVoter) have the additional ErrorMessage field. UpdateVoter is an internal RPC. Internal RPCs don't tend to have a human readable error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about some logging in the error cases? although the RPC is internal, an invalid update could stem from operator action (and it could be useful for them to dig into logs to understand why an update failed)

requestMetadata.listenerName(),
quorum.leaderAndEpoch(),
quorum.leaderEndpoints()
)
);
}

UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions = data.kRaftVersionFeature();
if (supportedKraftVersions.minSupportedVersion() < 0 ||
supportedKraftVersions.maxSupportedVersion() < 0 ||
supportedKraftVersions.maxSupportedVersion() < supportedKraftVersions.minSupportedVersion()
) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
requestMetadata.listenerName(),
quorum.leaderAndEpoch(),
quorum.leaderEndpoints()
)
);
}

return updateVoterHandler.handleUpdateVoterRequest(
quorum.leaderStateOrThrow(),
requestMetadata.listenerName(),
voter.get(),
voterEndpoints,
supportedKraftVersions,
currentTimeMs
);
}

private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
// Only elected leaders are sent in the request/response header, so if we have an elected
// leaderId, it should be consistent with what is in the message.
Expand Down Expand Up @@ -2419,6 +2519,10 @@ private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
responseFuture = handleRemoveVoterRequest(request, currentTimeMs);
break;

case UPDATE_RAFT_VOTER:
responseFuture = handleUpdateVoterRequest(request, currentTimeMs);
break;

default:
throw new IllegalArgumentException("Unexpected request type " + apiKey);
}
Expand Down
Loading