diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 10e0ee2ee3d9f..54d337925dcf9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1177,11 +1177,11 @@ public CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> { cnx.removeProducer(producerId); + closeAndClearPendingMessages(); if (exception == null || !cnx.ctx().channel().isActive()) { // Either we've received the success response for the close producer command from the broker, or the // connection did break in the meantime. In any case, the producer is gone. log.info("[{}] [{}] Closed Producer", topic, producerName); - closeAndClearPendingMessages(); closeFuture.complete(null); } else { closeFuture.completeExceptionally(exception); @@ -1193,7 +1193,8 @@ public CompletableFuture closeAsync() { return closeFuture; } - private synchronized void closeAndClearPendingMessages() { + @VisibleForTesting + protected synchronized void closeAndClearPendingMessages() { setState(State.Closed); client.cleanupProducer(this); PulsarClientException ex = new PulsarClientException.AlreadyClosedException( diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index f9df63759394a..5f690ead6c592 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -22,12 +22,19 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; + import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import io.netty.util.HashedWheelTimer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.metrics.LatencyHistogram; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -68,4 +75,34 @@ public void testPopulateMessageSchema() { verify(msg).setSchemaState(MessageImpl.SchemaState.Ready); } + @Test + public void testClearPendingMessageWhenCloseAsync() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + Mockito.doReturn(1L).when(client).newProducerId(); + ClientConfigurationData clientConf = new ClientConfigurationData(); + clientConf.setStatsIntervalSeconds(-1); + Mockito.doReturn(clientConf).when(client).getConfiguration(); + Mockito.doReturn(new InstrumentProvider(null)).when(client).instrumentProvider(); + ConnectionPool connectionPool = mock(ConnectionPool.class); + Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon(); + Mockito.doReturn(connectionPool).when(client).getCnxPool(); + HashedWheelTimer timer = mock(HashedWheelTimer.class); + Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), Mockito.anyLong(), Mockito.any()); + Mockito.doReturn(timer).when(client).timer(); + ProducerConfigurationData producerConf = new ProducerConfigurationData(); + producerConf.setSendTimeoutMs(-1); + ProducerImpl producer = Mockito.spy(new ProducerImpl<>(client, "topicName", producerConf, null, 0, null, null, Optional.empty())); + + // make sure throw exception when send request to broker + ClientCnx clientCnx = mock(ClientCnx.class); + CompletableFuture tCompletableFuture = new CompletableFuture<>(); + tCompletableFuture.completeExceptionally(new PulsarClientException("error")); + when(clientCnx.sendRequestWithId(Mockito.any(), Mockito.anyLong())).thenReturn(tCompletableFuture); + Mockito.doReturn(clientCnx).when(producer).cnx(); + + // run closeAsync and verify + CompletableFuture voidCompletableFuture = producer.closeAsync(); + verify(producer).closeAndClearPendingMessages(); + } + }