Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: convert ProduceRequestTest to KRaft #17780

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.server
import java.nio.ByteBuffer
import java.util.{Collections, Properties}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, TopicDescription}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
Expand All @@ -35,6 +36,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.junit.jupiter.params.provider.ValueSource

import java.util.concurrent.TimeUnit
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -84,14 +86,41 @@ class ProduceRequestTest extends BaseRequestTest {
new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
}

@ParameterizedTest
private def getPartitionToLeader(
admin: Admin,
topic: String
): Map[Int, Int] = {
var topicDescription: TopicDescription = null
TestUtils.waitUntilTrue(() => {
val topicMap = admin.
describeTopics(java.util.Arrays.asList(topic)).
allTopicNames().get(10, TimeUnit.MINUTES)
topicDescription = topicMap.get(topic)
topicDescription != null
}, "Timed out waiting to describe topic " + topic)
topicDescription.partitions().asScala.map(p => {
p.partition() -> p.leader().id()
}).toMap
}

@ParameterizedTest(name = "quorum=kraft")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to indicate in the name the test is running via kraft? Just double checking since the quorum is not a parameter in the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test name is what TestInfo.scala looks at to determine if the test is kraft. Changing the test name was just an easy way of getting it to be kraft

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- just wasn't sure if we also needed the quorum param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, no, since we changed the test name directly. sorry I guess that was a bit obscure :)

soon, we will have everything as kraft and we can get rid of this.

@MethodSource(Array("timestampConfigProvider"))
def testProduceWithInvalidTimestamp(messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
val topic = "topic"
val partition = 0
val topicConfig = new Properties
topicConfig.setProperty(messageTimeStampConfig, "1000")
val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig)
val admin = createAdminClient()
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
brokers = brokers,
controllers = controllerServers,
numPartitions = 1,
replicationFactor = 1,
topicConfig = topicConfig
)
val partitionToLeader = getPartitionToLeader(admin, topic)
val leader = partitionToLeader(partition)

def createRecords(magicValue: Byte, timestamp: Long, codec: Compression): MemoryRecords = {
Expand Down Expand Up @@ -138,7 +167,14 @@ class ProduceRequestTest extends BaseRequestTest {
val partition = 0

// Create a single-partition topic and find a broker which is not the leader
val partitionToLeader = createTopic(topic)
val admin = createAdminClient()
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
brokers = brokers,
controllers = controllerServers
)
val partitionToLeader = getPartitionToLeader(admin, topic)
val leader = partitionToLeader(partition)
val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
assertTrue(nonReplicaOpt.isDefined)
Expand Down