Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[improvement] Use Direct ByteBuffers after upgrade to 2.8.x #1631

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);


public static class KafkaHeaderAndRequest {

private static final String DEFAULT_CLIENT_HOST = "";
Expand All @@ -606,6 +605,8 @@ public static class KafkaHeaderAndRequest {
private final ByteBuf buffer;
private final SocketAddress remoteAddress;

private final AtomicBoolean released = new AtomicBoolean();

public KafkaHeaderAndRequest(RequestHeader header,
AbstractRequest request,
ByteBuf buffer,
Expand All @@ -617,6 +618,9 @@ public KafkaHeaderAndRequest(RequestHeader header,
}

public ByteBuf getBuffer() {
if (released.get()) {
throw new IllegalStateException("Already released");
}
return buffer;
}

Expand Down Expand Up @@ -650,7 +654,16 @@ public String toString() {
this.header, this.request, this.remoteAddress);
}

public void bufferReleased() {
if (!released.compareAndSet(false, true)) {
throw new IllegalStateException("Already released");
}
}

public void close() {
if (!released.compareAndSet(false, true)) {
return;
}
ReferenceCountUtil.safeRelease(this.buffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -41,7 +42,7 @@ public PendingRequest(final ApiKeys apiKeys,
this.responseConsumerHandler = responseConsumerHandler;
}

public ByteBuffer serialize() {
public ByteBuf serialize() {
return KopResponseUtils.serializeRequest(requestHeader, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.streamnative.pulsar.handlers.kop.security.PlainSaslServer;
Expand Down Expand Up @@ -73,7 +72,7 @@ public TransactionMarkerChannelHandler(
private void enqueueRequest(ChannelHandlerContext channel, PendingRequest pendingRequest) {
final long correlationId = pendingRequest.getCorrelationId();
pendingRequestMap.put(correlationId, pendingRequest);
channel.writeAndFlush(Unpooled.wrappedBuffer(pendingRequest.serialize())).addListener(writeFuture -> {
channel.writeAndFlush(pendingRequest.serialize()).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
pendingRequest.completeExceptionally(writeFuture.cause());
pendingRequestMap.remove(correlationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.ObjectSerializationCache;

/**
* Provide util classes to access protected fields in kafka structures.
Expand All @@ -34,12 +37,34 @@ public class KopResponseUtils {
public static ByteBuf serializeResponse(short version,
ResponseHeader responseHeader,
AbstractResponse response) {
return Unpooled.wrappedBuffer(response.serializeWithHeader(responseHeader, version));
return serializeWithHeader(response, responseHeader, version);
}

public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
return RequestUtils.serialize(requestHeader.data(), requestHeader.headerVersion(),
private static ByteBuf serializeWithHeader(AbstractResponse response, ResponseHeader header, short version) {
return serialize(header.data(), header.headerVersion(), response.data(), version);
}

public static ByteBuf serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
return serialize(requestHeader.data(), requestHeader.headerVersion(),
request.data(), request.version());
}

public static ByteBuf serialize(
Message header,
short headerVersion,
Message apiMessage,
short apiVersion
) {
ObjectSerializationCache cache = new ObjectSerializationCache();

int headerSize = header.size(cache, headerVersion);
int messageSize = apiMessage.size(cache, apiVersion);
ByteBuffer result = ByteBuffer.allocate(headerSize + messageSize);
ByteBufferAccessor writable = new ByteBufferAccessor(result);
header.write(writable, cache, headerVersion);
apiMessage.write(writable, cache, apiVersion);
result.flip();
return Unpooled.wrappedBuffer(result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static org.testng.Assert.assertEquals;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
Expand Down Expand Up @@ -108,22 +109,24 @@ public static ListOffsetsResponseData.ListOffsetsPartitionResponse getListOffset
return listOffsetsPartitionResponse;
}


public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder,
SocketAddress serviceAddress) {
AbstractRequest request = builder.build(builder.apiKey().latestVersion());
SocketAddress serviceAddress) {
return buildRequest(builder, serviceAddress, builder.latestAllowedVersion());
}
public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder,
SocketAddress serviceAddress, short version) {
AbstractRequest request = builder.build(version);
assertEquals(version, request.version());
RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233);


ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(mockHeader, request);

ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest);

RequestHeader header = RequestHeader.parse(serializedRequest);
ByteBuf byteBuf = KopResponseUtils.serializeRequest(mockHeader, request);
ByteBuffer byteBuffer = byteBuf.nioBuffer();
RequestHeader header = RequestHeader.parse(byteBuffer);

ApiKeys apiKey = header.apiKey();
short apiVersion = header.apiVersion();
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, serializedRequest).request;
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuffer).request;
return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,16 @@ public void testByteBufToRequest() {
correlationId);

// 1. serialize request into ByteBuf
ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);
int size = serializedRequest.remaining();
ByteBuf inputBuf = Unpooled.buffer(size);
inputBuf.writeBytes(serializedRequest);
ByteBuf serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);

// 2. turn Bytebuf into KafkaHeaderAndRequest.
KafkaHeaderAndRequest request = handler.byteBufToRequest(inputBuf, null);
KafkaHeaderAndRequest request = handler.byteBufToRequest(serializedRequest, null);

// 3. verify byteBufToRequest works well.
assertEquals(request.getHeader().data(), header.data());
assertTrue(request.getRequest() instanceof ApiVersionsRequest);

request.close();
}


Expand Down