Skip to content

Commit

Permalink
HBASE-22905 Avoid temp ByteBuffer allocation in (apache#538)
Browse files Browse the repository at this point in the history
BlockingRpcConnection#writeRequest
(cherry picked from commit ed0bed8)

Change-Id: I0cb22a366c673682f22b4743b1fa1a27d60befe6
  • Loading branch information
chenxu14 authored and Jenkins committed Sep 2, 2019
1 parent 18a946b commit ae11808
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
Expand Down Expand Up @@ -70,6 +69,8 @@
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
Expand Down Expand Up @@ -599,37 +600,44 @@ private void tracedWriteRequest(Call call) throws IOException {
* @see #readResponse()
*/
private void writeRequest(Call call) throws IOException {
ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
this.compressor, call.cells);
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
ByteBuf cellBlock = null;
try {
cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
call.cells, PooledByteBufAllocator.DEFAULT);
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);

setupIOstreams();
setupIOstreams();

// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}

calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in the
// pending calls map.
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in
// the pending calls map.
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
} finally {
if (cellBlock != null) {
cellBlock.release();
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand All @@ -41,7 +40,7 @@
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
Expand All @@ -62,19 +61,19 @@ class IPCUtil {
* @throws IOException if write action fails
*/
public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock) throws IOException {
final ByteBuf cellBlock) throws IOException {
// Must calculate total size and write that first so other side can read it all in in one
// swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) {
totalSize += cellBlock.remaining();
totalSize += cellBlock.readableBytes();
}
return write(dos, header, param, cellBlock, totalSize);
}

private static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock, final int totalSize) throws IOException {
final ByteBuf cellBlock, final int totalSize) throws IOException {
// I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally.
Expand All @@ -83,7 +82,7 @@ private static int write(final OutputStream dos, final Message header, final Mes
param.writeDelimitedTo(dos);
}
if (cellBlock != null) {
dos.write(cellBlock.array(), 0, cellBlock.remaining());
cellBlock.readBytes(dos, cellBlock.readableBytes());
}
dos.flush();
return totalSize;
Expand Down

0 comments on commit ae11808

Please sign in to comment.