Skip to content

Commit

Permalink
Copy ByteBufPair buffers when using with SSL (#2401)
Browse files Browse the repository at this point in the history
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.

If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.

This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
  • Loading branch information
ivankelly committed Aug 28, 2018
1 parent c9a3699 commit 92b8097
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}

ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerCnx(brokerService));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ public void initChannel(SocketChannel ch) throws Exception {
conf.getTlsTrustCertsFilePath());
}
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}

ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ClientCnx(conf, eventLoopGroup));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public ReferenceCounted touch(Object hint) {
}

public static final Encoder ENCODER = new Encoder();
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();

@Sharable
public static class Encoder extends ChannelOutboundHandlerAdapter {
Expand All @@ -132,4 +133,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

}
@Sharable
public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;

// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
// for multiple requests.
try {
ctx.write(b.getFirst().copy(), ctx.voidPromise());
ctx.write(b.getSecond().copy(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
} else {
ctx.write(msg, promise);
}
}
}

}

0 comments on commit 92b8097

Please sign in to comment.