Skip to content

Commit

Permalink
KAFKA-6234; Increased timeout value for lowWatermark response to fix …
Browse files Browse the repository at this point in the history
…transient failures (#4238)

Removed timeout from get call that caused the test to fail occasionally, this will instead fall back to the wrapping waitUntilTrue timeout. Also added unnesting of exceptions from ExecutionException that was originally missing and put the retrieved value for lowWatermark in the fail message for better readability in case of test failure.

Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
soenkeliebau authored and Jason Gustafson committed Apr 12, 2018
1 parent fb3a948 commit 886daf5
Showing 1 changed file with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.kafka.common.{KafkaFuture, TopicPartition, TopicPartitionRepli
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.junit.{After, Before, Ignore, Rule, Test}
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
import org.apache.kafka.common.resource.{Resource, ResourceType}
import org.junit.rules.Timeout
Expand Down Expand Up @@ -741,7 +741,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}

@Test
@Ignore // Disabled temporarily until flakiness is resolved
def testLogStartOffsetCheckpoint(): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = serverCount)

Expand All @@ -751,8 +750,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {

sendRecords(producers.head, 10, topicPartition)
var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
var lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
assertEquals(5L, lowWatermark)
var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark)
assertEquals(Some(5), lowWatermark)

for (i <- 0 until serverCount) {
killBroker(i)
Expand All @@ -767,16 +766,16 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// Need to retry if leader is not available for the partition
result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(0L)).asJava)

lowWatermark = None
val future = result.lowWatermarks().get(topicPartition)
try {
lowWatermark = future.get(1000L, TimeUnit.MILLISECONDS).lowWatermark()
lowWatermark == 5L
lowWatermark = Some(future.get.lowWatermark)
lowWatermark.contains(5L)
} catch {
case e: LeaderNotAvailableException => false
}

}, "Expected low watermark of the partition to be 5L")

case e: ExecutionException if e.getCause.isInstanceOf[LeaderNotAvailableException] ||
e.getCause.isInstanceOf[NotLeaderForPartitionException] => false
}
}, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}")
client.close()
}

Expand Down

0 comments on commit 886daf5

Please sign in to comment.