From 03d7198563088383ea1321bdbd93bd2209544999 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Wed, 5 Jun 2024 11:40:04 +0530 Subject: [PATCH] MINOR: Raft module Cleanup --- .../kafka/raft/FileQuorumStateStore.java | 2 +- .../kafka/raft/internals/LogHistory.java | 6 ++--- .../kafka/raft/internals/ReplicaKey.java | 4 +-- .../apache/kafka/raft/internals/VoterSet.java | 8 ++---- .../raft/KafkaRaftClientSnapshotTest.java | 25 ++++++++++--------- .../kafka/raft/KafkaRaftClientTest.java | 22 ++++++++-------- .../org/apache/kafka/raft/MockLogTest.java | 4 +-- .../kafka/raft/RaftClientTestContext.java | 2 +- .../raft/internals/BatchAccumulatorTest.java | 2 +- .../raft/internals/BatchBuilderTest.java | 5 ++-- 10 files changed, 37 insertions(+), 43 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java b/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java index 698a2ca67ec7a..439d19a6aa8e7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java +++ b/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java @@ -108,7 +108,7 @@ private QuorumStateData readStateFromFile(File file) { if (dataVersion < LOWEST_SUPPORTED_VERSION || dataVersion > HIGHEST_SUPPORTED_VERSION) { throw new IllegalStateException( String.format( - "data_version (%d) is not within the min (%d) and max ($d) supported version", + "data_version (%d) is not within the min (%d) and max (%d) supported version", dataVersion, LOWEST_SUPPORTED_VERSION, HIGHEST_SUPPORTED_VERSION diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java b/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java index 6751400678e14..a3a37daf80c71 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java @@ -76,7 +76,7 @@ public interface LogHistory { */ void clear(); - final static class Entry { + final class Entry { private final long offset; private final T value; @@ -101,9 +101,7 @@ public boolean equals(Object o) { Entry that = (Entry) o; if (offset != that.offset) return false; - if (!Objects.equals(value, that.value)) return false; - - return true; + return Objects.equals(value, that.value); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java b/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java index 7d799a9bd6d40..002a2dee1914a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/ReplicaKey.java @@ -45,9 +45,7 @@ public boolean equals(Object o) { ReplicaKey that = (ReplicaKey) o; if (id != that.id) return false; - if (!Objects.equals(directoryId, that.directoryId)) return false; - - return true; + return Objects.equals(directoryId, that.directoryId); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index 3ab41f5788cfd..393cb373b316c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -248,9 +248,7 @@ public boolean hasOverlappingMajority(VoterSet that) { .collect(Collectors.toSet()); if (Utils.diff(HashSet::new, thisReplicaKeys, thatReplicaKeys).size() > 1) return false; - if (Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() > 1) return false; - - return true; + return Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() <= 1; } @Override @@ -314,9 +312,7 @@ public boolean equals(Object o) { if (!Objects.equals(voterKey, that.voterKey)) return false; if (!Objects.equals(supportedKRaftVersion, that.supportedKRaftVersion)) return false; - if (!Objects.equals(listeners, that.listeners)) return false; - - return true; + return Objects.equals(listeners, that.listeners); } @Override diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 299fa819d5882..5c7b00e30953b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -122,7 +123,7 @@ public void testLeaderListenerNotified(boolean entireLog) throws Exception { // Check that listener was notified of the new snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -164,7 +165,7 @@ public void testFollowerListenerNotified(boolean entireLog) throws Exception { // Check that listener was notified of the new snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -210,7 +211,7 @@ public void testSecondListenerNotified(boolean entireLog) throws Exception { // Check that the second listener was notified of the new snapshot try (SnapshotReader snapshot = secondListener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -245,7 +246,7 @@ public void testListenerRenotified() throws Exception { // Check that listener was notified of the new snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } // Generate a new snapshot @@ -264,7 +265,7 @@ public void testListenerRenotified() throws Exception { // Check that listener was notified of the second snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { assertEquals(secondSnapshotId, snapshot.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot); } } @@ -660,7 +661,7 @@ public void testFetchSnapshotRequestAsLeader() throws Exception { List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Collections.singletonList("a")) .build(); context.becomeLeader(); @@ -712,7 +713,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajo List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Collections.singletonList("a")) .build(); int resignLeadershipTimeout = context.checkQuorumTimeoutMs; @@ -909,7 +910,7 @@ public void testFetchSnapshotRequestWithInvalidPosition() throws Exception { List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch(), Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Collections.singletonList("a")) .build(); context.becomeLeader(); @@ -1136,12 +1137,12 @@ public void testFetchResponseWithSnapshotId() throws Exception { // Check that the snapshot was written to the log RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot); // Check that listener was notified of the new snapshot try (SnapshotReader reader = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, reader.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader); } } @@ -1239,12 +1240,12 @@ public void testFetchSnapshotResponsePartialData() throws Exception { // Check that the snapshot was written to the log RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot); // Check that listener was notified of the new snapshot try (SnapshotReader reader = context.listener.drainHandledSnapshot().get()) { assertEquals(snapshotId, reader.snapshotId()); - SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader); + SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader); } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 049b648d8811a..00588a4230595 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -465,7 +465,7 @@ public void testResignInOlderEpochIgnored() throws Exception { context.client.poll(); // Ensure we are still leader even after expiration of the election timeout. - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); context.client.poll(); context.assertElectedLeader(currentEpoch, localId); } @@ -607,7 +607,7 @@ public void testElectionTimeoutAfterUserInitiatedResign() throws Exception { resignedEpoch, OptionalInt.of(localId)); // After the election timer, we should become a candidate. - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntil(context.client.quorum()::isCandidate); assertEquals(resignedEpoch + 1, context.currentEpoch()); assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1), @@ -693,7 +693,7 @@ public void testInitializeAsCandidateAndBecomeLeader() throws Exception { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(1, localId); @@ -737,7 +737,7 @@ public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree() throws Excep RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(1, localId); @@ -1118,7 +1118,7 @@ public void testVoteRequestTimeout() throws Exception { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(epoch, localId); @@ -1361,7 +1361,7 @@ public void testRetryElection() throws Exception { context.assertUnknownLeader(0); - context.time.sleep(2 * context.electionTimeoutMs()); + context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest(); context.assertVotedCandidate(epoch, localId); @@ -2090,7 +2090,7 @@ public void testVoteResponseIgnoredAfterBecomingFollower() throws Exception { context.assertUnknownLeader(epoch - 1); // Sleep a little to ensure that we become a candidate - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); // Wait until the vote requests are inflight context.pollUntilRequest(); @@ -2696,7 +2696,7 @@ public void testFollowerLogReconciliation() throws Exception { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) .withElectedLeader(epoch, otherNodeId) .appendToLog(lastEpoch, Arrays.asList("foo", "bar")) - .appendToLog(lastEpoch, Arrays.asList("baz")) + .appendToLog(lastEpoch, singletonList("baz")) .build(); context.assertElectedLeader(epoch, otherNodeId); @@ -2827,7 +2827,7 @@ public void testClusterAuthorizationFailedInVote() throws Exception { .build(); // Sleep a little to ensure that we become a candidate - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); context.pollUntilRequest(); context.assertVotedCandidate(epoch, localId); @@ -3186,7 +3186,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception { // Timeout the election and become candidate int candidateEpoch = epoch + 2; - context.time.sleep(context.electionTimeoutMs() * 2); + context.time.sleep(context.electionTimeoutMs() * 2L); context.client.poll(); context.assertVotedCandidate(candidateEpoch, localId); @@ -3232,7 +3232,7 @@ public void testHandleLeaderChangeFiresAfterUnattachedRegistration() throws Exce LeaderAndEpoch expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), epoch); assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch()); - // Transition to follower and the expect a leader changed notification + // Transition to follower and then expect a leader changed notification context.deliverRequest(context.beginEpochRequest(epoch, otherNodeId)); context.pollUntilResponse(); diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java index e0c40d301be34..f57669da86411 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java @@ -827,7 +827,7 @@ public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() appendBatch(numberOfRecords, 2); appendBatch(numberOfRecords, 4); - // offset is not equal to oldest snapshot's offset + // offset is not equal to the oldest snapshot's offset ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3); assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(20, 2)), resultOffsetAndEpoch); } @@ -845,7 +845,7 @@ public void testValidateEpochLessThanFirstEpochInLog() { appendBatch(numberOfRecords, 3); - // offset is not equal to oldest snapshot's offset + // offset is not equal to the oldest snapshot's offset ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2); assertEquals(ValidOffsetAndEpoch.diverging(olderEpochSnapshotId), resultOffsetAndEpoch); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 8d6b9c1cad989..19f2fb61ec685 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -407,7 +407,7 @@ static RaftClientTestContext initializeAsLeader(int localId, Set voters public void becomeLeader() throws Exception { int currentEpoch = currentEpoch(); - time.sleep(electionTimeoutMs * 2); + time.sleep(electionTimeoutMs * 2L); expectAndGrantVotes(currentEpoch + 1); expectBeginEpoch(currentEpoch + 1); } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index bcb4f29a9d915..9b70afbd7f53d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -493,7 +493,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { List> drained = acc.drain(); assertEquals(1, drained.size()); assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds())); - drained.stream().forEach(completedBatch -> { + drained.forEach(completedBatch -> { completedBatch.data.batches().forEach(recordBatch -> { assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); }); }); diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java index 824ec20afc0c7..ba356fff4b6a1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -69,7 +70,7 @@ void testBuildBatch(CompressionType compressionType) { records.forEach(record -> builder.appendRecord(record, null)); MemoryRecords builtRecordSet = builder.build(); - assertTrue(builder.bytesNeeded(Arrays.asList("a"), null).isPresent()); + assertTrue(builder.bytesNeeded(Collections.singletonList("a"), null).isPresent()); assertThrows(IllegalStateException.class, () -> builder.appendRecord("a", null)); List builtBatches = Utils.toList(builtRecordSet.batchIterator()); @@ -113,7 +114,7 @@ public void testHasRoomForUncompressed(int batchSize) { String record = "i am a record"; - while (!builder.bytesNeeded(Arrays.asList(record), null).isPresent()) { + while (!builder.bytesNeeded(Collections.singletonList(record), null).isPresent()) { builder.appendRecord(record, null); }