-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP 96] Add message payload processor for Pulsar client #12088
[PIP 96] Add message payload processor for Pulsar client #12088
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/converter/PayloadConverterProxy.java
Outdated
Show resolved
Hide resolved
23a6bb7
to
b0d9f00
Compare
b0d9f00
to
5fdcbc3
Compare
4970aa1
to
31e346d
Compare
Now this PR is ready for review. And the PR description was updated as well. Currently there's two +1 in vote email, you can take a look first, @eolivelli @codelipenghui |
@eolivelli @315157973 @codelipenghui @hangc0276 @merlimat @rdhabalia PTAL when you have time since Pulsar 2.9.0 release cut is coming. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work
I left some minor comments PTAL
pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
Outdated
Show resolved
Hide resolved
Comments were addressed, PTAL again @eolivelli |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
Confirmed with @BewareMyPower, this PR adds docs in JavaDoc |
Master Issue: #12087
Motivation
See #12087 for details.
Modifications
MessagePayloadContext
,MessagePayload
,MessagePayloadProcessor
interfaces following the PIP 96.MessagePayloadImpl
, which usesByteBuf
as the backed buffer, as the default implementation ofMessagePayload
.MessagePayloadFactory
, theDEFAULT
field is an instance that can wrap a byte array or NIO buffer to aMessagePayloadImpl
object.MessagePayloadUtils
class to convertMessagePayload
toByteBuf
. No bytes copy happens if theMessagePayload
isMessagePayloadImpl
.ConsumerImpl
to provide methods to create a single message in batch or a normal message based on metadata and payload. Some repeated code is also removed.MessagePayloadContextImpl
as the default implementation ofMessagePayloadContext
. It uses methods fromConsumerImpl
to create aMessage
based on aMessagePayload
.MessagePayloadProcessor
inConsumerBuilder
.consumeMessagesFromConverter
method toConsumerImpl
when a payload converter is configured.Verifying this change
Make sure that the change passes the CI checks.
Add
MessagePayloadTest
to test convertingMessagePayload
toByteBuf
, it covers the cases for both the defaultMessagePayloadImpl
and another implementation based on a NIOByteBuffer
.Add
MessagePayloadProcessorTest
, which includes two tests:testDefaultProcessor
: Test the default processor could work. In addition, it checks therefCnt
to verify afterMessagePayloadContext
creates aMessage<T>
, theByteBuf
will be released.testCustomProcessor
: Test the processor for custom message format that represents a batched message.CustomBatchFormat
: Define the custom message format.CustomBatchProducer
: A producer that writes the custom formatted message to bookie directly. When the batch is full, send the batch.Does this pull request potentially affect one of the following parts: