From ea703c8990e9265aaf665635c6bd11cecb8e5c83 Mon Sep 17 00:00:00 2001 From: Yike Xiao Date: Fri, 27 Dec 2024 18:07:18 +0800 Subject: [PATCH] [fix][client] Cannot access message data inside ProducerInterceptor#onSendAcknowledgement --- .../pulsar/client/api/InterceptorsTest.java | 44 +++++++++++++++++++ .../pulsar/client/impl/ProducerImpl.java | 28 ++++++------ 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index f71cdc551411b..68d082adbb6b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -21,10 +21,12 @@ import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -216,6 +218,48 @@ public void onSendAcknowledgement(Producer producer, Message mes producer.close(); } + @Test + public void testProducerInterceptorAccessMessageData() throws PulsarClientException { + List messageDataInBeforeSend = Collections.synchronizedList(new ArrayList<>()); + List messageDataOnSendAcknowledgement = Collections.synchronizedList(new ArrayList<>()); + ProducerInterceptor interceptor = new ProducerInterceptor<>() { + @Override + public void close() { + } + + @Override + public Message beforeSend(Producer producer, Message message) { + messageDataInBeforeSend.add(new String(message.getData())); + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + messageDataOnSendAcknowledgement.add(new String(message.getData())); + } + }; + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .intercept(interceptor) + .create(); + + final String messageValue = UUID.randomUUID().toString(); + try { + producer.newMessage().value(messageValue).send(); + } catch (Exception ignore) { + } + Assert.assertEquals(messageDataInBeforeSend.size(), 1, + "Message data should be available in beforeSend"); + Assert.assertEquals(messageDataInBeforeSend.get(0), messageValue, + "Message data should be available in beforeSend"); + Assert.assertEquals(messageDataOnSendAcknowledgement.size(), 1, + "Message data should be available in onSendAcknowledgement"); + Assert.assertEquals(messageDataOnSendAcknowledgement.get(0), messageValue, + "Message data should be available in onSendAcknowledgement"); + } + @Test public void testConsumerInterceptorWithErrors() throws PulsarClientException { ConsumerInterceptor interceptor = new ConsumerInterceptor() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 54d337925dcf9..64b706cc5faa9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -436,20 +436,22 @@ private void onSendComplete(Throwable e, SendCallback sendCallback, MessageImpl< if (payload == null) { log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", topic, producerName); - } else { - ReferenceCountUtil.safeRelease(payload); } - if (e != null) { - latencyHistogram.recordFailure(latencyNanos); - stats.incrementSendFailed(); - onSendAcknowledgement(msg, null, e); - sendCallback.getFuture().completeExceptionally(e); - } else { - latencyHistogram.recordSuccess(latencyNanos); - publishedBytesCounter.add(msgSize); - stats.incrementNumAcksReceived(latencyNanos); - onSendAcknowledgement(msg, msg.getMessageId(), null); - sendCallback.getFuture().complete(msg.getMessageId()); + try { + if (e != null) { + latencyHistogram.recordFailure(latencyNanos); + stats.incrementSendFailed(); + onSendAcknowledgement(msg, null, e); + sendCallback.getFuture().completeExceptionally(e); + } else { + latencyHistogram.recordSuccess(latencyNanos); + publishedBytesCounter.add(msgSize); + stats.incrementNumAcksReceived(latencyNanos); + onSendAcknowledgement(msg, msg.getMessageId(), null); + sendCallback.getFuture().complete(msg.getMessageId()); + } + } finally { + ReferenceCountUtil.safeRelease(payload); } }