diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 0676a1a32637b..2b3ca623c5768 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.{KafkaFuture, TopicPartition, TopicPartitionRepli import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ -import org.junit.{After, Before, Ignore, Rule, Test} +import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse} import org.apache.kafka.common.resource.{Resource, ResourceType} import org.junit.rules.Timeout @@ -733,7 +733,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { } @Test - @Ignore // Disabled temporarily until flakiness is resolved def testLogStartOffsetCheckpoint(): Unit = { TestUtils.createTopic(zkUtils, topic, 2, serverCount, servers) @@ -743,8 +742,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { sendRecords(producers.head, 10, topicPartition) var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava) - var lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark() - assertEquals(5L, lowWatermark) + var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark) + assertEquals(Some(5), lowWatermark) for (i <- 0 until serverCount) { killBroker(i) @@ -759,16 +758,16 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { // Need to retry if leader is not available for the partition result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(0L)).asJava) + lowWatermark = None val future = result.lowWatermarks().get(topicPartition) try { - lowWatermark = future.get(1000L, TimeUnit.MILLISECONDS).lowWatermark() - lowWatermark == 5L + lowWatermark = Some(future.get.lowWatermark) + lowWatermark.contains(5L) } catch { - case e: LeaderNotAvailableException => false - } - - }, "Expected low watermark of the partition to be 5L") - + case e: ExecutionException if e.getCause.isInstanceOf[LeaderNotAvailableException] || + e.getCause.isInstanceOf[NotLeaderForPartitionException] => false + } + }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}") client.close() }