Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Raft module Cleanup #16205

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private QuorumStateData readStateFromFile(File file) {
if (dataVersion < LOWEST_SUPPORTED_VERSION || dataVersion > HIGHEST_SUPPORTED_VERSION) {
throw new IllegalStateException(
String.format(
"data_version (%d) is not within the min (%d) and max ($d) supported version",
"data_version (%d) is not within the min (%d) and max (%d) supported version",
dataVersion,
LOWEST_SUPPORTED_VERSION,
HIGHEST_SUPPORTED_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public interface LogHistory<T> {
*/
void clear();

final static class Entry<T> {
final class Entry<T> {
private final long offset;
private final T value;

Expand All @@ -101,9 +101,7 @@ public boolean equals(Object o) {
Entry<?> that = (Entry<?>) o;

if (offset != that.offset) return false;
if (!Objects.equals(value, that.value)) return false;

return true;
return Objects.equals(value, that.value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ public boolean equals(Object o) {
ReplicaKey that = (ReplicaKey) o;

if (id != that.id) return false;
if (!Objects.equals(directoryId, that.directoryId)) return false;

return true;
return Objects.equals(directoryId, that.directoryId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,7 @@ public boolean hasOverlappingMajority(VoterSet that) {
.collect(Collectors.toSet());

if (Utils.diff(HashSet::new, thisReplicaKeys, thatReplicaKeys).size() > 1) return false;
if (Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() > 1) return false;

return true;
return Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() <= 1;
}

@Override
Expand Down Expand Up @@ -314,9 +312,7 @@ public boolean equals(Object o) {

if (!Objects.equals(voterKey, that.voterKey)) return false;
if (!Objects.equals(supportedKRaftVersion, that.supportedKRaftVersion)) return false;
if (!Objects.equals(listeners, that.listeners)) return false;

return true;
return Objects.equals(listeners, that.listeners);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
Expand Down Expand Up @@ -122,7 +123,7 @@ public void testLeaderListenerNotified(boolean entireLog) throws Exception {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -164,7 +165,7 @@ public void testFollowerListenerNotified(boolean entireLog) throws Exception {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -210,7 +211,7 @@ public void testSecondListenerNotified(boolean entireLog) throws Exception {
// Check that the second listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = secondListener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -245,7 +246,7 @@ public void testListenerRenotified() throws Exception {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}

// Generate a new snapshot
Expand All @@ -264,7 +265,7 @@ public void testListenerRenotified() throws Exception {
// Check that listener was notified of the second snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(secondSnapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -660,7 +661,7 @@ public void testFetchSnapshotRequestAsLeader() throws Exception {
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.appendToLog(snapshotId.epoch(), Collections.singletonList("a"))
.build();

context.becomeLeader();
Expand Down Expand Up @@ -712,7 +713,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajo
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.appendToLog(snapshotId.epoch(), Collections.singletonList("a"))
.build();

int resignLeadershipTimeout = context.checkQuorumTimeoutMs;
Expand Down Expand Up @@ -909,7 +910,7 @@ public void testFetchSnapshotRequestWithInvalidPosition() throws Exception {
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.appendToLog(snapshotId.epoch(), Collections.singletonList("a"))
.build();

context.becomeLeader();
Expand Down Expand Up @@ -1136,12 +1137,12 @@ public void testFetchResponseWithSnapshotId() throws Exception {
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot);

// Check that listener was notified of the new snapshot
try (SnapshotReader<String> reader = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, reader.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader);
}
}

Expand Down Expand Up @@ -1239,12 +1240,12 @@ public void testFetchSnapshotResponsePartialData() throws Exception {
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot);

// Check that listener was notified of the new snapshot
try (SnapshotReader<String> reader = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, reader.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader);
}
}

Expand Down
22 changes: 11 additions & 11 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public void testResignInOlderEpochIgnored() throws Exception {
context.client.poll();

// Ensure we are still leader even after expiration of the election timeout.
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);
context.client.poll();
context.assertElectedLeader(currentEpoch, localId);
}
Expand Down Expand Up @@ -607,7 +607,7 @@ public void testElectionTimeoutAfterUserInitiatedResign() throws Exception {
resignedEpoch, OptionalInt.of(localId));

// After the election timer, we should become a candidate.
context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());
context.pollUntil(context.client.quorum()::isCandidate);
assertEquals(resignedEpoch + 1, context.currentEpoch());
assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1),
Expand Down Expand Up @@ -693,7 +693,7 @@ public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();

context.assertUnknownLeader(0);
context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());

context.pollUntilRequest();
context.assertVotedCandidate(1, localId);
Expand Down Expand Up @@ -737,7 +737,7 @@ public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree() throws Excep
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();

context.assertUnknownLeader(0);
context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());

context.pollUntilRequest();
context.assertVotedCandidate(1, localId);
Expand Down Expand Up @@ -1118,7 +1118,7 @@ public void testVoteRequestTimeout() throws Exception {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
context.assertUnknownLeader(0);

context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);

Expand Down Expand Up @@ -1361,7 +1361,7 @@ public void testRetryElection() throws Exception {

context.assertUnknownLeader(0);

context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);

Expand Down Expand Up @@ -2090,7 +2090,7 @@ public void testVoteResponseIgnoredAfterBecomingFollower() throws Exception {
context.assertUnknownLeader(epoch - 1);

// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);

// Wait until the vote requests are inflight
context.pollUntilRequest();
Expand Down Expand Up @@ -2696,7 +2696,7 @@ public void testFollowerLogReconciliation() throws Exception {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, otherNodeId)
.appendToLog(lastEpoch, Arrays.asList("foo", "bar"))
.appendToLog(lastEpoch, Arrays.asList("baz"))
.appendToLog(lastEpoch, singletonList("baz"))
.build();

context.assertElectedLeader(epoch, otherNodeId);
Expand Down Expand Up @@ -2827,7 +2827,7 @@ public void testClusterAuthorizationFailedInVote() throws Exception {
.build();

// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);

Expand Down Expand Up @@ -3186,7 +3186,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception {

// Timeout the election and become candidate
int candidateEpoch = epoch + 2;
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, localId);

Expand Down Expand Up @@ -3232,7 +3232,7 @@ public void testHandleLeaderChangeFiresAfterUnattachedRegistration() throws Exce
LeaderAndEpoch expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), epoch);
assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch());

// Transition to follower and the expect a leader changed notification
// Transition to follower and then expect a leader changed notification
context.deliverRequest(context.beginEpochRequest(epoch, otherNodeId));
context.pollUntilResponse();

Expand Down
4 changes: 2 additions & 2 deletions raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot()
appendBatch(numberOfRecords, 2);
appendBatch(numberOfRecords, 4);

// offset is not equal to oldest snapshot's offset
// offset is not equal to the oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3);
assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(20, 2)), resultOffsetAndEpoch);
}
Expand All @@ -845,7 +845,7 @@ public void testValidateEpochLessThanFirstEpochInLog() {

appendBatch(numberOfRecords, 3);

// offset is not equal to oldest snapshot's offset
// offset is not equal to the oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2);
assertEquals(ValidOffsetAndEpoch.diverging(olderEpochSnapshotId), resultOffsetAndEpoch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ static RaftClientTestContext initializeAsLeader(int localId, Set<Integer> voters

public void becomeLeader() throws Exception {
int currentEpoch = currentEpoch();
time.sleep(electionTimeoutMs * 2);
time.sleep(electionTimeoutMs * 2L);
expectAndGrantVotes(currentEpoch + 1);
expectBeginEpoch(currentEpoch + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception {
List<BatchAccumulator.CompletedBatch<String>> drained = acc.drain();
assertEquals(1, drained.size());
assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
drained.stream().forEach(completedBatch -> {
drained.forEach(completedBatch -> {
completedBatch.data.batches().forEach(recordBatch -> {
assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); });
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -69,7 +70,7 @@ void testBuildBatch(CompressionType compressionType) {

records.forEach(record -> builder.appendRecord(record, null));
MemoryRecords builtRecordSet = builder.build();
assertTrue(builder.bytesNeeded(Arrays.asList("a"), null).isPresent());
assertTrue(builder.bytesNeeded(Collections.singletonList("a"), null).isPresent());
assertThrows(IllegalStateException.class, () -> builder.appendRecord("a", null));

List<MutableRecordBatch> builtBatches = Utils.toList(builtRecordSet.batchIterator());
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testHasRoomForUncompressed(int batchSize) {

String record = "i am a record";

while (!builder.bytesNeeded(Arrays.asList(record), null).isPresent()) {
while (!builder.bytesNeeded(Collections.singletonList(record), null).isPresent()) {
builder.appendRecord(record, null);
}

Expand Down