Skip to content

Commit

Permalink
[fix][client] the nullValue in msgMetadata should be true by default (a…
Browse files Browse the repository at this point in the history
…pache#22372)

Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`.
```

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set.

(cherry picked from commit f3c177e)
(cherry picked from commit 0541176)
  • Loading branch information
liangyepianzhou authored and srinath-ctds committed Aug 23, 2024
1 parent 959f80b commit 8ad5249
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,28 @@ public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackTy
assertTrue(values.isEmpty());
}
}

@Test
public void testRedeliverMessagesWithoutValue() throws Exception {
String topic = "persistent://my-property/my-ns/testRedeliverMessagesWithoutValue";
@Cleanup Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.enableRetry(true)
.subscribe();
@Cleanup Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(true)
.create();
for (int i = 0; i < 10; i++) {
producer.newMessage().key("messages without value").send();
}

Message<Integer> message = consumer.receive();
consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);
for (int i = 0; i < 9; i++) {
assertNotNull(consumer.receive(5, TimeUnit.SECONDS));
}
assertTrue(consumer.receive(5, TimeUnit.SECONDS).getTopicName().contains("sub-RETRY"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -224,6 +226,9 @@ public void testTamperingMessageIsDetected() throws Exception {
.create();
TypedMessageBuilderImpl<byte[]> msgBuilder = (TypedMessageBuilderImpl<byte[]>) producer.newMessage()
.value("a message".getBytes());
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(msgBuilder);
MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder()
.setProducerName("test")
.setSequenceId(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {

Message<byte[]> message4 = consumer.receive();
Assert.assertEquals(message4.getKey(), "key2");
Assert.assertEquals(new String(message4.getData()), "");
assertNull(message4.getData());

Message<byte[]> message5 = consumer.receive();
Assert.assertEquals(message5.getKey(), "key4");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
private final transient Schema<T> schema;
private transient ByteBuffer content;
private final transient TransactionImpl txn;
private transient T value;

public TypedMessageBuilderImpl(ProducerBase<?> producer, Schema<T> schema) {
this(producer, schema, null);
Expand All @@ -65,6 +66,22 @@ public TypedMessageBuilderImpl(ProducerBase<?> producer,
}

private long beforeSend() {
if (value == null) {
msgMetadata.setNullValue(true);
} else {
getKeyValueSchema().map(keyValueSchema -> {
if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
setSeparateKeyValue(value, keyValueSchema);
return this;
} else {
return null;
}
}).orElseGet(() -> {
content = ByteBuffer.wrap(schema.encode(value));
return this;
});
}

if (txn == null) {
return -1L;
}
Expand Down Expand Up @@ -140,22 +157,8 @@ public TypedMessageBuilder<T> orderingKey(byte[] orderingKey) {

@Override
public TypedMessageBuilder<T> value(T value) {
if (value == null) {
msgMetadata.setNullValue(true);
return this;
}

return getKeyValueSchema().map(keyValueSchema -> {
if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
setSeparateKeyValue(value, keyValueSchema);
return this;
} else {
return null;
}
}).orElseGet(() -> {
content = ByteBuffer.wrap(schema.encode(value));
return this;
});
this.value = value;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.mockito.Mock;
import org.testng.annotations.Test;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Base64;

Expand All @@ -45,7 +47,7 @@ public class TypedMessageBuilderImplTest {
protected ProducerBase producerBase;

@Test
public void testDefaultValue() {
public void testDefaultValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
producerBase = mock(ProducerBase.class);

AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
Expand All @@ -63,6 +65,9 @@ public void testDefaultValue() {

// Check kv.encoding.type default, not set value
TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
Expand All @@ -73,7 +78,7 @@ public void testDefaultValue() {
}

@Test
public void testInlineValue() {
public void testInlineValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
producerBase = mock(ProducerBase.class);

AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
Expand All @@ -91,6 +96,9 @@ public void testInlineValue() {

// Check kv.encoding.type INLINE
TypedMessageBuilderImpl<KeyValue> typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
Expand All @@ -101,7 +109,7 @@ public void testInlineValue() {
}

@Test
public void testSeparatedValue() {
public void testSeparatedValue() throws Exception {
producerBase = mock(ProducerBase.class);

AvroSchema<SchemaTestUtils.Foo> fooSchema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
Expand All @@ -119,6 +127,9 @@ public void testSeparatedValue() {

// Check kv.encoding.type SEPARATED
TypedMessageBuilderImpl typedMessageBuilder = (TypedMessageBuilderImpl)typedMessageBuilderImpl.value(keyValue);
Method method = TypedMessageBuilderImpl.class.getDeclaredMethod("beforeSend");
method.setAccessible(true);
method.invoke(typedMessageBuilder);
ByteBuffer content = typedMessageBuilder.getContent();
byte[] contentByte = new byte[content.remaining()];
content.get(contentByte);
Expand Down

0 comments on commit 8ad5249

Please sign in to comment.