From 0459f97c8086b4bb80e8367105ab66836ea76a47 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Wed, 26 Feb 2020 14:59:32 -0800 Subject: [PATCH 1/4] check inflight requests to determine whether to use respond or prepareResponse --- .../apache/kafka/clients/producer/KafkaProducerTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 14f39a984f838..9494f4f23647d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -767,7 +767,13 @@ public void testInitTransactionTimeout() { assertThrows(TimeoutException.class, producer::initTransactions); - client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); + assertTrue("Should at most have one pending quest to find coordinator", client.inFlightRequestCount() <= 1); + if (client.inFlightRequestCount() > 0) { + client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); + } else { + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); + } + client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); // retry initialization should work From 51f798e557b7da96f25438dfb7814004d81a429a Mon Sep 17 00:00:00 2001 From: abbccdda Date: Wed, 26 Feb 2020 15:50:53 -0800 Subject: [PATCH 2/4] avoid check --- .../org/apache/kafka/clients/producer/KafkaProducerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 9494f4f23647d..97fec5447241b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -767,7 +767,6 @@ public void testInitTransactionTimeout() { assertThrows(TimeoutException.class, producer::initTransactions); - assertTrue("Should at most have one pending quest to find coordinator", client.inFlightRequestCount() <= 1); if (client.inFlightRequestCount() > 0) { client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); } else { From 7c70978d80532c6156650386392c5c902e6f2998 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Wed, 26 Feb 2020 16:24:34 -0800 Subject: [PATCH 3/4] use one time poll --- .../src/test/java/org/apache/kafka/clients/MockClient.java | 7 +------ .../apache/kafka/clients/producer/KafkaProducerTest.java | 5 ++++- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 1f626435c13e4..eaf5dcb4f8fd1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -44,12 +44,7 @@ * A mock network client for use testing code */ public class MockClient implements KafkaClient { - public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - return true; - } - }; + public static final RequestMatcher ALWAYS_TRUE = body -> true; private static class FutureResponse { private final Node node; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 97fec5447241b..0daf432d4b497 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -747,8 +747,9 @@ public void testPartitionsForWithNullTopic() { @Test public void testInitTransactionTimeout() { Map configs = new HashMap<>(); + final long maxBlockMs = 500L; configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); - configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 500); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); Time time = new MockTime(1); @@ -767,6 +768,8 @@ public void testInitTransactionTimeout() { assertThrows(TimeoutException.class, producer::initTransactions); + // Explicit poll once here to clean up any unfinished find coordinator response + client.poll(maxBlockMs, time.milliseconds()); if (client.inFlightRequestCount() > 0) { client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); } else { From 6c04a30667cd9e797a151b138e1e48b2f8b228c3 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Wed, 26 Feb 2020 20:52:39 -0800 Subject: [PATCH 4/4] prepare response with matcher --- .../kafka/clients/producer/KafkaProducerTest.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 0daf432d4b497..f672918eac201 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -747,9 +747,8 @@ public void testPartitionsForWithNullTopic() { @Test public void testInitTransactionTimeout() { Map configs = new HashMap<>(); - final long maxBlockMs = 500L; configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); - configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 500); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); Time time = new MockTime(1); @@ -768,13 +767,10 @@ public void testInitTransactionTimeout() { assertThrows(TimeoutException.class, producer::initTransactions); - // Explicit poll once here to clean up any unfinished find coordinator response - client.poll(maxBlockMs, time.milliseconds()); - if (client.inFlightRequestCount() > 0) { - client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); - } else { - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); - } + client.prepareResponse( + request -> request instanceof FindCoordinatorRequest && + ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), + FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));