diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index cfd89d3bb28ab..070d2fe11d605 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -141,19 +141,45 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (msg instanceof ByteBufPair) { ByteBufPair b = (ByteBufPair) msg; + ChannelPromise compositePromise = ctx.newPromise(); + compositePromise.addListener(future -> { + // release the ByteBufPair after the write operation is completed + ReferenceCountUtil.safeRelease(b); + // complete the promise passed as an argument unless it's a void promise + if (!promise.isVoid()) { + if (future.isSuccess()) { + promise.setSuccess(); + } else { + promise.setFailure(future.cause()); + } + } + }); + // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached // for multiple requests. - try { - ctx.write(b.getFirst().copy(), ctx.voidPromise()); - ctx.write(b.getSecond().copy(), promise); - } finally { - ReferenceCountUtil.safeRelease(b); - } + ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise()); + ctx.write(nioBufferCopy(b.getSecond()), compositePromise); } else { ctx.write(msg, promise); } } + + // Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method. + // This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is + // not thread safe when the ByteBuf instance is shared across multiple threads. + // This method works around the issue. + // Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation. + // This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after + // the write method completes. + private ByteBuf nioBufferCopy(ByteBuf buf) { + int nioBufferCount = buf.nioBufferCount(); + if (nioBufferCount > 1) { + return Unpooled.wrappedBuffer(buf.nioBuffers()); + } else { + return Unpooled.wrappedBuffer(buf.nioBuffer()); + } + } } }