Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] the nullValue in msgMetadata should be true by default #22372

Merged
merged 10 commits into from
Aug 3, 2024

Conversation

liangyepianzhou
Copy link
Contributor

Motivation

When a message is not set value, the nullValue message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a SchemaSerializationException when calling reconsumerLater.


org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)

Modifications

When a message is not set value, the nullValue message metadata should be true and change to false after the value is set.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@liangyepianzhou liangyepianzhou self-assigned this Mar 27, 2024
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Mar 27, 2024
Copy link
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@codecov-commenter
Copy link

codecov-commenter commented Apr 1, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 73.49%. Comparing base (bbc6224) to head (f825433).
Report is 488 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22372      +/-   ##
============================================
- Coverage     73.57%   73.49%   -0.08%     
- Complexity    32624    33244     +620     
============================================
  Files          1877     1919      +42     
  Lines        139502   144104    +4602     
  Branches      15299    15745     +446     
============================================
+ Hits         102638   105914    +3276     
- Misses        28908    30079    +1171     
- Partials       7956     8111     +155     
Flag Coverage Δ
inttests 27.84% <66.66%> (+3.25%) ⬆️
systests 24.72% <100.00%> (+0.40%) ⬆️
unittests 72.55% <100.00%> (-0.29%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...he/pulsar/client/impl/TypedMessageBuilderImpl.java 83.96% <100.00%> (+0.12%) ⬆️

... and 528 files with indirect coverage changes

1. Setting isNullValue field will improve the entry size and
2. The default value of isNullValue is set to true will make the break changes for some usages.
This reverts commit af2a4ba.
@liangyepianzhou liangyepianzhou added type/bug The PR fixed a bug or issue reported a bug release/3.1.5 labels Apr 7, 2024
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
@liangyepianzhou liangyepianzhou merged commit f3c177e into apache:master Aug 3, 2024
51 checks passed
@liangyepianzhou liangyepianzhou deleted the no-value branch August 3, 2024 07:30
Technoboy- pushed a commit that referenced this pull request Aug 22, 2024
…22372)

Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`.
```

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set.

(cherry picked from commit f3c177e)
Technoboy- pushed a commit that referenced this pull request Aug 22, 2024
…22372)

Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`.
```

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set.

(cherry picked from commit f3c177e)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 22, 2024
…pache#22372)

Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`.
```

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set.

(cherry picked from commit f3c177e)
(cherry picked from commit 0541176)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 23, 2024
…pache#22372)

Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`.
```

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set.

(cherry picked from commit f3c177e)
(cherry picked from commit 0541176)
@lhotari
Copy link
Member

lhotari commented Sep 3, 2024

The bug fixed by this PR might also be related to #23245

grssam pushed a commit to grssam/pulsar that referenced this pull request Sep 4, 2024
…pache#22372)

Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`.
```

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set.
@nodece nodece mentioned this pull request Nov 12, 2024
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants