Skip to content

Commit

Permalink
KAFKA-16886: Detect replica demotion in AssignmentsManager (apache#16232
Browse files Browse the repository at this point in the history
)

JBOD Brokers keep the Controller up to date with replica-to-directory
placement via AssignReplicasToDirsRequest. These requests are queued,
compacted and sent by AssignmentsManager.

The Controller returns the error NOT_LEADER_OR_FOLLOWER when handling
a AssignReplicasToDirsRequest from a broker that is not a replica.

A partition reassignment can take place, removing the Broker
as a replica before the AssignReplicasToDirsRequest successfully
reaches the Controller. AssignmentsManager retries failed
requests, and will continuously try to propagate this assignment,
until the Broker either shuts down, or is added back as a replica.

When encountering a NOT_LEADER_OR_FOLLOWER error, AssignmentsManager
should assume that the broker is no longer a replica, and stop
trying to propagate the directory assignment for that partition.

Reviewers: Luke Chen <[email protected]>
  • Loading branch information
soarez authored and gongxuanzhang committed Jun 12, 2024
1 parent e6cfe3f commit 7cc8597
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public void close() throws InterruptedException {
}
}

public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, String reason) {
onAssignment(topicPartition, dirId, reason, null);
}

public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, String reason, Runnable callback) {
if (callback == null) {
callback = () -> { };
Expand Down Expand Up @@ -444,7 +448,9 @@ private static Set<AssignmentEvent> filterFailures(
} else {
acknowledged.add(topicPartition);
Errors error = Errors.forCode(partition.errorCode());
if (error != Errors.NONE) {
if (error == Errors.NOT_LEADER_OR_FOLLOWER) {
log.info("Dropping late directory assignment for partition {} into directory {} because this broker is no longer a replica", partition, event.dirId);
} else if (error != Errors.NONE) {
log.error("Controller returned error {} for assignment of partition {} into directory {}",
error.name(), partition, event.dirId);
failures.add(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
Expand Down Expand Up @@ -310,12 +313,21 @@ void testOnCompletion() throws Exception {
}

private static ClientResponse buildSuccessfulResponse(AssignReplicasToDirsRequestData request) {
return buildResponse(request, topicIdPartition -> Errors.NONE);
}

private static ClientResponse buildResponse(AssignReplicasToDirsRequestData request,
Function<TopicIdPartition, Errors> perPartitionError) {
Map<Uuid, Map<TopicIdPartition, Errors>> errors = new HashMap<>();
for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) {
for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) {
for (AssignReplicasToDirsRequestData.PartitionData partition : topic.partitions()) {
TopicIdPartition topicIdPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex());
errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, Errors.NONE);
Errors error = perPartitionError.apply(topicIdPartition);
if (error == null) {
error = Errors.NONE;
}
errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, error);
}
}
}
Expand Down Expand Up @@ -403,4 +415,48 @@ void testQueuedReplicaToDirAssignmentsMetric() throws Exception {
}
TestUtils.retryOnExceptionWithTimeout(5_000, () -> assertEquals(8, queuedReplicaToDirAssignments.value()));
}

// AssignmentsManager retries to propagate assignments (via AssignReplicasToDirsRequest) after failures.
// When an assignment fails to propagate with NOT_LEADER_OR_FOLLOWER, AssignmentsManager should conclude
// that the broker has been removed as a replica for the partition, and stop trying to propagate it.
@Test
void testDropsOldAssignments() throws InterruptedException {
TopicIdPartition tp1 = new TopicIdPartition(TOPIC_1, 1), tp2 = new TopicIdPartition(TOPIC_1, 2);
List<AssignReplicasToDirsRequestData> requests = new ArrayList<>();
CountDownLatch readyToAssert = new CountDownLatch(2);
doAnswer(invocation -> {
AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data();
ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class);
if (readyToAssert.getCount() == 2) {
// First request, reply with a partition-level NOT_LEADER_OR_FOLLOWER error and queue a different assignment
completionHandler.onComplete(buildResponse(request, topicIdPartition -> Errors.NOT_LEADER_OR_FOLLOWER));
manager.onAssignment(tp2, DIR_1, "testDropsOldAssignments-second");
}
if (readyToAssert.getCount() == 1) {
// Second request, reply with success
completionHandler.onComplete(buildSuccessfulResponse(request));
}
requests.add(request);
readyToAssert.countDown();
return null;
}).when(channelManager).sendRequest(any(), any());

manager.onAssignment(tp1, DIR_1, "testDropsOldAssignments-first");
TestUtils.waitForCondition(() -> {
time.sleep(TimeUnit.SECONDS.toMillis(1));
manager.wakeup();
return readyToAssert.await(1, TimeUnit.MILLISECONDS);
}, "Timed out waiting for AssignReplicasToDirsRequest to be sent.");

assertEquals(Arrays.asList(
buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() {{
put(tp1, DIR_1);
}}),
// Even though the controller replied with NOT_LEADER_OR_FOLLOWER, the second request does not include
// partition 1, meaning AssignmentManager dropped (no longer retries) the assignment.
buildRequestData(8, 100, new HashMap<TopicIdPartition, Uuid>() {{
put(tp2, DIR_1);
}})
), requests);
}
}

0 comments on commit 7cc8597

Please sign in to comment.