Skip to content

Commit

Permalink
[Fix][Client] Fix pending message not complete when closeAsync (#23761)
Browse files Browse the repository at this point in the history
(cherry picked from commit e0a9e4c)
  • Loading branch information
pengxiangrui127 authored and lhotari committed Dec 20, 2024
1 parent 7915b66 commit 4d5d672
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1122,11 +1122,11 @@ public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
cnx.removeProducer(producerId);
closeAndClearPendingMessages();
if (exception == null || !cnx.ctx().channel().isActive()) {
// Either we've received the success response for the close producer command from the broker, or the
// connection did break in the meantime. In any case, the producer is gone.
log.info("[{}] [{}] Closed Producer", topic, producerName);
closeAndClearPendingMessages();
closeFuture.complete(null);
} else {
closeFuture.completeExceptionally(exception);
Expand All @@ -1138,7 +1138,8 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}

private synchronized void closeAndClearPendingMessages() {
@VisibleForTesting
protected synchronized void closeAndClearPendingMessages() {
setState(State.Closed);
client.cleanupProducer(this);
PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import io.netty.util.HashedWheelTimer;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -66,4 +72,33 @@ public void testPopulateMessageSchema() {
verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
}

@Test
public void testClearPendingMessageWhenCloseAsync() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
Mockito.doReturn(1L).when(client).newProducerId();
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setStatsIntervalSeconds(-1);
Mockito.doReturn(clientConf).when(client).getConfiguration();
ConnectionPool connectionPool = mock(ConnectionPool.class);
Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon();
Mockito.doReturn(connectionPool).when(client).getCnxPool();
HashedWheelTimer timer = mock(HashedWheelTimer.class);
Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), Mockito.anyLong(), Mockito.any());
Mockito.doReturn(timer).when(client).timer();
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setSendTimeoutMs(-1);
ProducerImpl<?> producer = Mockito.spy(new ProducerImpl<>(client, "topicName", producerConf, null, 0, null, null, Optional.empty()));

// make sure throw exception when send request to broker
ClientCnx clientCnx = mock(ClientCnx.class);
CompletableFuture<ProducerResponse> tCompletableFuture = new CompletableFuture<>();
tCompletableFuture.completeExceptionally(new PulsarClientException("error"));
when(clientCnx.sendRequestWithId(Mockito.any(), Mockito.anyLong())).thenReturn(tCompletableFuture);
Mockito.doReturn(clientCnx).when(producer).cnx();

// run closeAsync and verify
CompletableFuture<Void> voidCompletableFuture = producer.closeAsync();
verify(producer).closeAndClearPendingMessages();
}

}

0 comments on commit 4d5d672

Please sign in to comment.