Skip to content

Commit

Permalink
KAFKA-16713: Define initial set of RPCs for KIP-932 (apache#16022)
Browse files Browse the repository at this point in the history
This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.

Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy <[email protected]>
  • Loading branch information
AndrewJSchofield authored Jun 3, 2024
1 parent 8507693 commit 8f82f14
Show file tree
Hide file tree
Showing 35 changed files with 2,100 additions and 9 deletions.
56 changes: 56 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/ShareGroupState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* The share group state.
*/
public enum ShareGroupState {
UNKNOWN("Unknown"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty");

private final static Map<String, ShareGroupState> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));

private final String name;

ShareGroupState(String name) {
this.name = name;
}

/**
* Case-insensitive share group state lookup by string name.
*/
public static ShareGroupState parse(String name) {
ShareGroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT));
return state == null ? UNKNOWN : state;
}

@Override
public String toString() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Thrown when the share coordinator rejected the request because the share-group state epoch did not match.
*/
public class FencedStateEpochException extends ApiException {
private static final long serialVersionUID = 1L;

public FencedStateEpochException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Thrown when the acknowledgement of delivery of a record could not be completed because the record
* state is invalid.
*/
public class InvalidRecordStateException extends ApiException {

private static final long serialVersionUID = 1L;

public InvalidRecordStateException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Thrown when the share session epoch is invalid.
*/
public class InvalidShareSessionEpochException extends RetriableException {
private static final long serialVersionUID = 1L;

public InvalidShareSessionEpochException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Thrown when the share session was not found.
*/
public class ShareSessionNotFoundException extends RetriableException {
private static final long serialVersionUID = 1L;

public ShareSessionNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ public enum ApiKeys {
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS);
DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS),
SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT),
SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE),
SHARE_FETCH(ApiMessageType.SHARE_FETCH),
SHARE_ACKNOWLEDGE(ApiMessageType.SHARE_ACKNOWLEDGE);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.FetchSessionTopicIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
Expand All @@ -64,12 +65,14 @@
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
Expand Down Expand Up @@ -109,6 +112,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
Expand Down Expand Up @@ -394,7 +398,11 @@ public enum Errors {
UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new),
TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new),
INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new),
TRANSACTION_ABORTABLE(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", TransactionAbortableException::new);
TRANSACTION_ABORTABLE(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", TransactionAbortableException::new),
INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new),
SHARE_SESSION_NOT_FOUND(122, "The share session was not found.", ShareSessionNotFoundException::new),
INVALID_SHARE_SESSION_EPOCH(123, "The share session epoch is invalid.", InvalidShareSessionEpochException::new),
FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return ListClientMetricsResourcesRequest.parse(buffer, apiVersion);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsRequest.parse(buffer, apiVersion);
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatRequest.parse(buffer, apiVersion);
case SHARE_GROUP_DESCRIBE:
return ShareGroupDescribeRequest.parse(buffer, apiVersion);
case SHARE_FETCH:
return ShareFetchRequest.parse(buffer, apiVersion);
case SHARE_ACKNOWLEDGE:
return ShareAcknowledgeRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return ListClientMetricsResourcesResponse.parse(responseBuffer, version);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsResponse.parse(responseBuffer, version);
case SHARE_GROUP_HEARTBEAT:
return ShareGroupHeartbeatResponse.parse(responseBuffer, version);
case SHARE_GROUP_DESCRIBE:
return ShareGroupDescribeResponse.parse(responseBuffer, version);
case SHARE_FETCH:
return ShareFetchResponse.parse(responseBuffer, version);
case SHARE_ACKNOWLEDGE:
return ShareAcknowledgeResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ShareAcknowledgeRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<ShareAcknowledgeRequest> {

private final ShareAcknowledgeRequestData data;

public Builder(ShareAcknowledgeRequestData data) {
this(data, false);
}

public Builder(ShareAcknowledgeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.SHARE_ACKNOWLEDGE, enableUnstableLastVersion);
this.data = data;
}

public static ShareAcknowledgeRequest.Builder forConsumer(String groupId, ShareFetchMetadata metadata,
Map<TopicIdPartition, List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareAcknowledgeRequestData data = new ShareAcknowledgeRequestData();
data.setGroupId(groupId);
if (metadata != null) {
data.setMemberId(metadata.memberId().toString());
data.setShareSessionEpoch(metadata.epoch());
}

// Build a map of topics to acknowledge keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareAcknowledgeRequestData.AcknowledgePartition>> ackMap = new HashMap<>();

for (Map.Entry<TopicIdPartition, List<ShareAcknowledgeRequestData.AcknowledgementBatch>> acknowledgeEntry : acknowledgementsMap.entrySet()) {
TopicIdPartition tip = acknowledgeEntry.getKey();
Map<Integer, ShareAcknowledgeRequestData.AcknowledgePartition> partMap = ackMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
ShareAcknowledgeRequestData.AcknowledgePartition ackPartition = partMap.get(tip.partition());
if (ackPartition == null) {
ackPartition = new ShareAcknowledgeRequestData.AcknowledgePartition()
.setPartitionIndex(tip.partition());
partMap.put(tip.partition(), ackPartition);
}
ackPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
}

// Finally, build up the data to fetch
data.setTopics(new ArrayList<>());
ackMap.forEach((topicId, partMap) -> {
ShareAcknowledgeRequestData.AcknowledgeTopic ackTopic = new ShareAcknowledgeRequestData.AcknowledgeTopic()
.setTopicId(topicId)
.setPartitions(new ArrayList<>());
data.topics().add(ackTopic);

partMap.forEach((index, ackPartition) -> ackTopic.partitions().add(ackPartition));
});

return new ShareAcknowledgeRequest.Builder(data, true);
}

public ShareAcknowledgeRequestData data() {
return data;
}

@Override
public ShareAcknowledgeRequest build(short version) {
return new ShareAcknowledgeRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final ShareAcknowledgeRequestData data;

public ShareAcknowledgeRequest(ShareAcknowledgeRequestData data, short version) {
super(ApiKeys.SHARE_ACKNOWLEDGE, version);
this.data = data;
}

@Override
public ShareAcknowledgeRequestData data() {
return data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
return new ShareAcknowledgeResponse(new ShareAcknowledgeResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code()));
}

public static ShareAcknowledgeRequest parse(ByteBuffer buffer, short version) {
return new ShareAcknowledgeRequest(
new ShareAcknowledgeRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}
Loading

0 comments on commit 8f82f14

Please sign in to comment.