Skip to content

Commit

Permalink
KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs
Browse files Browse the repository at this point in the history
Implement the add voter, remove voter, and update voter RPCs for
KIP-853. This is just adding the RPC handling; the current
implementation in RaftManager just throws UnsupportedVersionException.

Reviewers: Andrew Schofield <[email protected]>, José Armando García Sancio <[email protected]>
  • Loading branch information
cmccabe authored and TaiJuWu committed Jun 8, 2024
1 parent 15df653 commit 9333742
Show file tree
Hide file tree
Showing 31 changed files with 1,161 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Options for {@link Admin#addRaftVoter}.
*/
@InterfaceStability.Stable
public class AddRaftVoterOptions extends AbstractOptions<AddRaftVoterOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;

/**
* The result of {@link org.apache.kafka.clients.admin.Admin#addRaftVoter(int, org.apache.kafka.common.Uuid, java.util.Set, org.apache.kafka.clients.admin.AddRaftVoterOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Stable
public class AddRaftVoterResult {
private final KafkaFuture<Void> result;

AddRaftVoterResult(KafkaFuture<Void> result) {
this.result = result;
}

/**
* Returns a future that completes when the voter has been added.
*/
public KafkaFuture<Void> all() {
return result;
}

}
56 changes: 56 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,62 @@ default ListClientMetricsResourcesResult listClientMetricsResources() {
*/
Uuid clientInstanceId(Duration timeout);

/**
* Add a new voter node to the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param endpoints The endpoints that the new voter has.
*/
default AddRaftVoterResult addRaftVoter(
int voterId,
Uuid voterDirectoryId,
Set<RaftVoterEndpoint> endpoints
) {
return addRaftVoter(voterId, voterDirectoryId, endpoints, new AddRaftVoterOptions());
}

/**
* Add a new voter node to the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param endpoints The endpoints that the new voter has.
* @param options The options to use when adding the new voter node.
*/
AddRaftVoterResult addRaftVoter(
int voterId,
Uuid voterDirectoryId,
Set<RaftVoterEndpoint> endpoints,
AddRaftVoterOptions options
);

/**
* Remove a voter node from the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
*/
default RemoveRaftVoterResult removeRaftVoter(
int voterId,
Uuid voterDirectoryId
) {
return removeRaftVoter(voterId, voterDirectoryId, new RemoveRaftVoterOptions());
}

/**
* Remove a voter node from the KRaft metadata quorum.
*
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param options The options to use when removing the voter node.
*/
RemoveRaftVoterResult removeRaftVoter(
int voterId,
Uuid voterDirectoryId,
RemoveRaftVoterOptions options
);

/**
* Get the metrics kept by the adminClient
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ public Uuid clientInstanceId(Duration timeout) {
return delegate.clientInstanceId(timeout);
}

@Override
public AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set<RaftVoterEndpoint> endpoints, AddRaftVoterOptions options) {
return delegate.addRaftVoter(voterId, voterDirectoryId, endpoints, options);
}

@Override
public RemoveRaftVoterResult removeRaftVoter(int voterId, Uuid voterDirectoryId, RemoveRaftVoterOptions options) {
return delegate.removeRaftVoter(voterId, voterDirectoryId, options);
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return delegate.metrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
Expand Down Expand Up @@ -154,6 +155,7 @@
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
Expand All @@ -170,6 +172,8 @@
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddRaftVoterRequest;
import org.apache.kafka.common.requests.AddRaftVoterResponse;
import org.apache.kafka.common.requests.AlterClientQuotasRequest;
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
Expand Down Expand Up @@ -232,6 +236,7 @@
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.UnregisterBrokerRequest;
Expand Down Expand Up @@ -4604,6 +4609,101 @@ void handleFailure(Throwable throwable) {
return new ListClientMetricsResourcesResult(future);
}

@Override
public AddRaftVoterResult addRaftVoter(
int voterId,
Uuid voterDirectoryId,
Set<RaftVoterEndpoint> endpoints,
AddRaftVoterOptions options
) {
NodeProvider provider = new LeastLoadedBrokerOrActiveKController();

final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
"addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) {

@Override
AddRaftVoterRequest.Builder createRequest(int timeoutMs) {
AddRaftVoterRequestData.ListenerCollection listeners =
new AddRaftVoterRequestData.ListenerCollection();
endpoints.forEach(endpoint ->
listeners.add(new AddRaftVoterRequestData.Listener().
setName(endpoint.name()).
setHost(endpoint.host()).
setPort(endpoint.port())));
return new AddRaftVoterRequest.Builder(
new AddRaftVoterRequestData().
setVoterId(voterId) .
setVoterDirectoryId(voterDirectoryId).
setListeners(listeners));
}

@Override
void handleResponse(AbstractResponse response) {
AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response;
if (addResponse.data().errorCode() != Errors.NONE.code()) {
ApiError error = new ApiError(
addResponse.data().errorCode(),
addResponse.data().errorMessage());
future.completeExceptionally(error.exception());
} else {
future.complete(null);
}
}

@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new AddRaftVoterResult(future);
}

@Override
public RemoveRaftVoterResult removeRaftVoter(
int voterId,
Uuid voterDirectoryId,
RemoveRaftVoterOptions options
) {
NodeProvider provider = new LeastLoadedBrokerOrActiveKController();

final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
"removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) {

@Override
RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) {
return new RemoveRaftVoterRequest.Builder(
new RemoveRaftVoterRequestData().
setVoterId(voterId) .
setVoterDirectoryId(voterDirectoryId));
}

@Override
void handleResponse(AbstractResponse response) {
AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response;
if (addResponse.data().errorCode() != Errors.NONE.code()) {
ApiError error = new ApiError(
addResponse.data().errorCode(),
addResponse.data().errorMessage());
future.completeExceptionally(error.exception());
} else {
future.complete(null);
}
}

@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
};
runnable.call(call, now);
return new RemoveRaftVoterResult(future);
}

@Override
public Uuid clientInstanceId(Duration timeout) {
if (timeout.isNegative()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Locale;
import java.util.Objects;

/**
* An endpoint for a raft quorum voter.
*/
@InterfaceStability.Stable
public class RaftVoterEndpoint {
private final String name;
private final String host;
private final int port;

static String requireNonNullAllCapsNonEmpty(String input) {
if (input == null) {
throw new IllegalArgumentException("Null argument not allowed.");
}
if (!input.trim().equals(input)) {
throw new IllegalArgumentException("Leading or trailing whitespace is not allowed.");
}
if (input.isEmpty()) {
throw new IllegalArgumentException("Empty string is not allowed.");
}
if (!input.toUpperCase(Locale.ROOT).equals(input)) {
throw new IllegalArgumentException("String must be UPPERCASE.");
}
return input;
}

/**
* Create an endpoint for a metadata quorum voter.
*
* @param name The human-readable name for this endpoint. For example, CONTROLLER.
* @param host The DNS hostname for this endpoint.
* @param port The network port for this endpoint.
*/
public RaftVoterEndpoint(
String name,
String host,
int port
) {
this.name = requireNonNullAllCapsNonEmpty(name);
this.host = Objects.requireNonNull(host);
this.port = port;
}

public String name() {
return name;
}

public String host() {
return host;
}

public int port() {
return port;
}

@Override
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) return false;
RaftVoterEndpoint other = (RaftVoterEndpoint) o;
return name.equals(other.name) &&
host.equals(other.host) &&
port == other.port;
}

@Override
public int hashCode() {
return Objects.hash(name, host, port);
}

@Override
public String toString() {
return "RaftVoterEndpoint" +
"(name=" + name +
", host=" + host +
", port=" + port +
")";
}
}
Loading

0 comments on commit 9333742

Please sign in to comment.