Skip to content

Commit

Permalink
KAFKA-17973: Relax Restriction for Voters Set Change (#17728)
Browse files Browse the repository at this point in the history
Relax the voter set change validation that exists in KRaft. When reading the kraft partition and validating voter set changes allow the voter set to have more than one change.

This violates the invariant that after a voter change there are overlapping voters for all possible majorities. This is okay because the KRaft leader checks that there are no pending voter set updates when handling an add voter request and a remove voter request.

Reviewers: José Armando García Sancio <[email protected]>
  • Loading branch information
hni61223 authored Jan 13, 2025
1 parent 70d6312 commit 15e938e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public KRaftControlRecordStateMachine(
LogContext logContext
) {
this.log = log;
this.voterSetHistory = new VoterSetHistory(staticVoterSet);
this.voterSetHistory = new VoterSetHistory(staticVoterSet, logContext);
this.serde = serde;
this.bufferSupplier = bufferSupplier;
this.maxBatchSizeBytes = maxBatchSizeBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.kafka.raft.internals;

import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.VoterSet;

import org.slf4j.Logger;

import java.util.Optional;
import java.util.OptionalLong;

Expand All @@ -31,9 +34,11 @@
public final class VoterSetHistory {
private final VoterSet staticVoterSet;
private final LogHistory<VoterSet> votersHistory = new TreeMapLogHistory<>();
private final Logger logger;

VoterSetHistory(VoterSet staticVoterSet) {
VoterSetHistory(VoterSet staticVoterSet, LogContext logContext) {
this.staticVoterSet = staticVoterSet;
this.logger = logContext.logger(getClass());
}

/**
Expand All @@ -55,12 +60,10 @@ public void addAt(long offset, VoterSet voters) {
// all replicas.
VoterSet lastVoterSet = lastEntry.get().value();
if (!lastVoterSet.hasOverlappingMajority(voters)) {
throw new IllegalArgumentException(
String.format(
"Last voter set %s doesn't have an overlapping majority with the new voter set %s",
lastVoterSet,
voters
)
logger.info(
"Last voter set ({}) doesn't have an overlapping majority with the new voter set ({})",
lastVoterSet,
voters
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft.internals;

import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.VoterSetTest;

Expand All @@ -33,7 +34,7 @@ public final class VoterSetHistoryTest {
@Test
void testStaticVoterSet() {
VoterSet staticVoterSet = VoterSet.fromMap(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);

assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
Expand All @@ -54,7 +55,7 @@ void testStaticVoterSet() {

@Test
void TestNoStaticVoterSet() {
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());

assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(0));
assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(100));
Expand All @@ -65,7 +66,7 @@ void TestNoStaticVoterSet() {
void testAddAt() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);

assertThrows(
IllegalArgumentException.class,
Expand Down Expand Up @@ -95,7 +96,7 @@ void testAddAt() {
void testBootstrapAddAt() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet bootstrapVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());

votersHistory.addAt(-1, bootstrapVoterSet);
assertEquals(bootstrapVoterSet, votersHistory.lastValue());
Expand Down Expand Up @@ -124,43 +125,38 @@ void testBootstrapAddAt() {

@Test
void testAddAtNonOverlapping() {
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());

Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(voterMap));

// Add a starting voter to the history
votersHistory.addAt(100, voterSet);

// Remove voter so that it doesn't overlap
VoterSet nonoverlappingRemovedSet = voterSet
// Assert multiple voters can be removed at a time
VoterSet nonOverlappingRemovedSet = voterSet
.removeVoter(voterMap.get(1).voterKey()).get()
.removeVoter(voterMap.get(2).voterKey()).get();

assertThrows(
IllegalArgumentException.class,
() -> votersHistory.addAt(200, nonoverlappingRemovedSet)
);
assertEquals(voterSet, votersHistory.lastValue());
votersHistory.addAt(200, nonOverlappingRemovedSet);

assertEquals(nonOverlappingRemovedSet, votersHistory.lastValue());

// Add voters so that it doesn't overlap
VoterSet nonoverlappingAddSet = voterSet
.addVoter(VoterSetTest.voterNode(4, true)).get()
.addVoter(VoterSetTest.voterNode(5, true)).get();
// Assert multiple voters can be added at a time
VoterSet nonOverlappingAddSet = nonOverlappingRemovedSet
.addVoter(VoterSetTest.voterNode(1, true)).get()
.addVoter(VoterSetTest.voterNode(2, true)).get();

assertThrows(
IllegalArgumentException.class,
() -> votersHistory.addAt(200, nonoverlappingAddSet)
);
assertEquals(voterSet, votersHistory.lastValue());
votersHistory.addAt(300, nonOverlappingAddSet);

assertEquals(nonOverlappingAddSet, votersHistory.lastValue());
}

@Test
void testNonoverlappingFromStaticVoterSet() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(VoterSet.empty());
VoterSetHistory votersHistory = voterSetHistory(VoterSet.empty());

// Remove voter so that it doesn't overlap
VoterSet nonoverlappingRemovedSet = staticVoterSet
Expand All @@ -175,7 +171,7 @@ void testNonoverlappingFromStaticVoterSet() {
void testTruncateTo() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);

// Add voter 4 to the voter set and voter set history
voterMap.put(4, VoterSetTest.voterNode(4, true));
Expand All @@ -201,7 +197,7 @@ void testTruncateTo() {
void testTrimPrefixTo() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);

// Add voter 4 to the voter set and voter set history
voterMap.put(4, VoterSetTest.voterNode(4, true));
Expand Down Expand Up @@ -234,7 +230,7 @@ void testTrimPrefixTo() {
void testClear() {
Map<Integer, VoterSet.VoterNode> voterMap = VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
VoterSet staticVoterSet = VoterSet.fromMap(new HashMap<>(voterMap));
VoterSetHistory votersHistory = new VoterSetHistory(staticVoterSet);
VoterSetHistory votersHistory = voterSetHistory(staticVoterSet);

// Add voter 4 to the voter set and voter set history
voterMap.put(4, VoterSetTest.voterNode(4, true));
Expand All @@ -250,4 +246,8 @@ void testClear() {

assertEquals(staticVoterSet, votersHistory.lastValue());
}

private VoterSetHistory voterSetHistory(VoterSet staticVoterSet) {
return new VoterSetHistory(staticVoterSet, new LogContext());
}
}

0 comments on commit 15e938e

Please sign in to comment.