Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP 96] Add message payload processor for Pulsar client #12088

Merged
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
339c5e2
Add MessagePayload classes
BewareMyPower Sep 24, 2021
c35a0fe
Add MessageContext classes
BewareMyPower Sep 24, 2021
837978c
Add PayloadConverter interface
BewareMyPower Sep 24, 2021
cfabc26
Fix refCnt error
BewareMyPower Sep 24, 2021
db9f84b
Add tests for default converter
BewareMyPower Sep 24, 2021
9630f41
Add tests for custom batch format
BewareMyPower Sep 24, 2021
243d76a
Move default converter to tests
BewareMyPower Sep 24, 2021
f5a0276
Add tests for refCnt when the iteration was stopped
BewareMyPower Sep 24, 2021
6fdaabd
Remove recycle() method from MessagePayload
BewareMyPower Sep 25, 2021
6ec4a28
Add cleanup() method to PayloadConverter interface
BewareMyPower Sep 25, 2021
e5fc958
Rename cleanup method to afterConvert
BewareMyPower Sep 28, 2021
b050d63
Rename methods of EntryContext
BewareMyPower Sep 28, 2021
441f8a0
Use the actual number of messages
BewareMyPower Sep 28, 2021
a45483d
Add MessagePayloadFactory to avoid raw use of MessagePayloadImpl
BewareMyPower Sep 29, 2021
668b1ca
Add tests for refCnt
BewareMyPower Sep 29, 2021
f0b1de1
Test BatchMessageAcker works when converter is configured
BewareMyPower Sep 29, 2021
2b4fd06
Add whenInterrupted() method to converter
BewareMyPower Sep 29, 2021
2ebd276
Add whenInterrupted call
BewareMyPower Sep 29, 2021
985e35e
Use PayloadProcessor instead of PayloadConverter
BewareMyPower Sep 29, 2021
9635eb4
Test default payload processor
BewareMyPower Sep 29, 2021
8b3b61a
Remove the Iterable interfaces of custom batch
BewareMyPower Sep 30, 2021
a551152
Catch Throwable in pulsar-client
BewareMyPower Sep 30, 2021
4bd13b2
Rename some interfaces and classes
BewareMyPower Sep 30, 2021
723c8f5
Fix tests
BewareMyPower Sep 30, 2021
31e346d
Add tests for custom batch
BewareMyPower Sep 30, 2021
121685d
Move CustomBatchFormat test to MessagePayloadProcessorTest for CI
BewareMyPower Oct 5, 2021
1cabbd2
Return a copy of an empty array instead of a reference
BewareMyPower Oct 5, 2021
b9ca0ed
Revert changes for MessageImpl#setBrokerEntryMetadata
BewareMyPower Oct 5, 2021
c9e8a1f
Skip null messages
BewareMyPower Oct 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.processor;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.testng.Assert;
import org.testng.annotations.Test;

/**
* A batch message whose format is customized.
*
* 1. First 2 bytes represent the number of messages.
* 2. Each message is a string, whose format is
* 1. First 2 bytes represent the length `N`.
* 2. Followed N bytes are the bytes of the string.
*/
public class CustomBatchFormat {

public static final String KEY = "entry.format";
public static final String VALUE = "custom";

@AllArgsConstructor
@Getter
public static class Metadata {
private final int numMessages;
}

public static ByteBuf serialize(Iterable<String> strings) {
final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024);
buf.writeShort(0);
short numMessages = 0;
for (String s : strings) {
writeString(buf, s);
numMessages++;
}
buf.setShort(0, numMessages);
return buf;
}

private static void writeString(final ByteBuf buf, final String s) {
final byte[] bytes = Schema.STRING.encode(s);
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}

public static Metadata readMetadata(final ByteBuf buf) {
return new Metadata(buf.readShort());
}

public static byte[] readMessage(final ByteBuf buf) {
final short length = buf.readShort();
final byte[] bytes = new byte[length];
buf.readBytes(bytes);
return bytes;
}

@Test
public void testMultipleStrings() {
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
final List<List<String>> inputs = new ArrayList<>();
inputs.add(Collections.emptyList());
inputs.add(Collections.singletonList("java"));
inputs.add(Arrays.asList("hello", "world", "java"));

for (List<String> input : inputs) {
final ByteBuf buf = serialize(input);

final Metadata metadata = readMetadata(buf);
final List<String> parsedTokens = new ArrayList<>();
for (int i = 0; i < metadata.getNumMessages(); i++) {
parsedTokens.add(Schema.STRING.decode(readMessage(buf)));
}

Assert.assertEquals(parsedTokens, input);
Assert.assertEquals(parsedTokens.size(), input.size());

Assert.assertEquals(buf.refCnt(), 1);
buf.release();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.processor;

import io.netty.buffer.ByteBuf;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessagePayloadContext;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessagePayload;
import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessagePayloadUtils;

@Slf4j
public class CustomBatchPayloadProcessor implements MessagePayloadProcessor {

@Override
public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
Consumer<Message<T>> messageConsumer) throws Exception {
final String value = context.getProperty(CustomBatchFormat.KEY);
if (value == null || !value.equals(CustomBatchFormat.VALUE)) {
DEFAULT.process(payload, context, schema, messageConsumer);
return;
}

final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(payload);
try {
final int numMessages = CustomBatchFormat.readMetadata(buf).getNumMessages();
for (int i = 0; i < numMessages; i++) {
final MessagePayload singlePayload =
MessagePayloadFactory.DEFAULT.wrap(CustomBatchFormat.readMessage(buf));
try {
messageConsumer.accept(
context.getMessageAt(i, numMessages, singlePayload, false, schema));
} finally {
singlePayload.release();
}
}
} finally {
buf.release();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.processor;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;

@RequiredArgsConstructor
@Slf4j
public class CustomBatchProducer {

private final List<String> messages = new ArrayList<>();
private final PersistentTopic persistentTopic;
private final int batchingMaxMessages;

public void sendAsync(final String value) {
messages.add(value);
if (messages.size() >= batchingMaxMessages) {
flush();
}
}

public void flush() {
final ByteBuf buf = CustomBatchFormat.serialize(messages);
final ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None,
createCustomMetadata(), buf);
buf.release();
persistentTopic.publishMessage(headerAndPayload, (e, ledgerId, entryId) -> {
if (e == null) {
log.info("Send successfully to {} ({}, {})", persistentTopic.getName(), ledgerId, entryId);
} else {
log.error("Failed to send: {}", e.getMessage());
}
});
messages.clear();
}

private static MessageMetadata createCustomMetadata() {
final MessageMetadata messageMetadata = new MessageMetadata();
// Here are required fields
messageMetadata.setProducerName("");
messageMetadata.setSequenceId(0L);
messageMetadata.setPublishTime(0L);
// Add the property to identify the message format
messageMetadata.addProperty().setKey(CustomBatchFormat.KEY).setValue(CustomBatchFormat.VALUE);
return messageMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.processor;

import java.util.function.Consumer;
import lombok.Getter;
import org.apache.pulsar.client.api.MessagePayloadContext;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessagePayload;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessagePayloadImpl;

/**
* The processor for Pulsar format messages and maintains a total reference count.
*
* It's used to verify {@link MessagePayloadContext#getMessageAt} and {@link MessagePayloadContext#asSingleMessage} have release the
* ByteBuf successfully.
*/
public class DefaultProcessorWithRefCnt implements MessagePayloadProcessor {

@Getter
int totalRefCnt = 0;

@Override
public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema,
Consumer<Message<T>> messageConsumer) throws Exception {
MessagePayloadProcessor.DEFAULT.process(payload, context, schema, messageConsumer);
totalRefCnt += ((MessagePayloadImpl) payload).getByteBuf().refCnt();
}
}
Loading