Skip to content

Commit

Permalink
KAFKA-16793; Heartbeat API for upgrading ConsumerGroup (apache#15988)
Browse files Browse the repository at this point in the history
This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup.

Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
dongnuo123 authored May 23, 2024
1 parent e692fee commit 14b5c4d
Show file tree
Hide file tree
Showing 12 changed files with 820 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,11 @@ public CompletableFuture<HeartbeatResponseData> heartbeat(
);
}

// Using a read operation is okay here as we ignore the last committed offset in the snapshot registry.
// This means we will read whatever is in the latest snapshot, which is how the old coordinator behaves.
return runtime.scheduleReadOperation(
return runtime.scheduleWriteOperation(
"classic-group-heartbeat",
topicPartitionFor(request.groupId()),
(coordinator, __) -> coordinator.classicGroupHeartbeat(context, request)
Duration.ofMillis(config.offsetCommitTimeoutMs),
coordinator -> coordinator.classicGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"classic-group-heartbeat",
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,10 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(
* @param context The request context.
* @param request The actual Heartbeat request.
*
* @return The HeartbeatResponse.
* @return A Result containing the heartbeat response and
* a list of records to update the state machine.
*/
public HeartbeatResponseData classicGroupHeartbeat(
public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeat(
RequestContext context,
HeartbeatRequestData request
) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1300,4 +1300,27 @@ public boolean allMembersUseClassicProtocolExcept(String memberId) {
return numClassicProtocolMembers() == members().size() - 1 &&
!getOrMaybeCreateMember(memberId, false).useClassicProtocol();
}

/**
* Checks whether the member has any unreleased partition.
*
* @param member The member to check.
* @return A boolean indicating whether the member has partitions in the target
* assignment that hasn't been revoked by other members.
*/
public boolean waitingOnUnreleasedPartition(ConsumerGroupMember member) {
if (member.state() == MemberState.UNRELEASED_PARTITIONS) {
for (Map.Entry<Uuid, Set<Integer>> entry : targetAssignment().get(member.memberId()).partitions().entrySet()) {
Uuid topicId = entry.getKey();
Set<Integer> assignedPartitions = member.assignedPartitions().getOrDefault(topicId, Collections.emptySet());

for (int partition : entry.getValue()) {
if (!assignedPartitions.contains(partition) && currentPartitionEpoch(topicId, partition) != -1) {
return true;
}
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ public void run() {
timer.add(task);
}

@Override
public void scheduleIfAbsent(
String key,
long delay,
TimeUnit unit,
boolean retry,
TimeoutOperation<Void, U> operation
) {
if (!tasks.containsKey(key)) {
schedule(key, delay, unit, retry, 500, operation);
}
}

@Override
public void cancel(String key) {
TimerTask prevTask = tasks.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ interface TimeoutOperation<T, U> {
*/
void schedule(String key, long delay, TimeUnit unit, boolean retry, long retryBackoff, TimeoutOperation<T, U> operation);

/**
* Add an operation to the timer if there's no operation with the same key.
*
* @param key The key to identify this operation.
* @param delay The delay to wait before expiring.
* @param unit The delay unit.
* @param retry A boolean indicating whether the operation should
* be retried on failure.
* @param operation The operation to perform upon expiration.
*/
void scheduleIfAbsent(String key, long delay, TimeUnit unit, boolean retry, TimeoutOperation<T, U> operation);

/**
* Remove an operation corresponding to a given key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,10 @@ public void testHeartbeat() throws Exception {

service.startup(() -> 1);

when(runtime.scheduleReadOperation(
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new HeartbeatResponseData()
Expand Down Expand Up @@ -708,9 +709,10 @@ public void testHeartbeatCoordinatorNotAvailableException() throws Exception {

service.startup(() -> 1);

when(runtime.scheduleReadOperation(
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
new CoordinatorLoadInProgressException(null)
Expand Down Expand Up @@ -740,9 +742,10 @@ public void testHeartbeatCoordinatorException() throws Exception {

service.startup(() -> 1);

when(runtime.scheduleReadOperation(
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("classic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
new RebalanceInProgressException()
Expand Down
Loading

0 comments on commit 14b5c4d

Please sign in to comment.