diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index 34869314bab0..47b0b29a5c6e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -126,6 +126,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } + private void finishCall(ResponseHeader responseHeader, ByteBufInputStream in, Call call) + throws IOException { + Message value; + if (call.responseDefaultType != null) { + Message.Builder builder = call.responseDefaultType.newBuilderForType(); + if (!builder.mergeDelimitedFrom(in)) { + // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF + // before reading any bytes out, so here we need to manually finish create the EOFException + // and finish the call + call.setException(new EOFException("EOF while reading response with type: " + + call.responseDefaultType.getClass().getName())); + return; + } + value = builder.build(); + } else { + value = null; + } + CellScanner cellBlockScanner; + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + // Maybe we could read directly from the ByteBuf. + // The problem here is that we do not know when to release it. + byte[] cellBlock = new byte[size]; + in.readFully(cellBlock); + cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); + } else { + cellBlockScanner = null; + } + call.setResponse(value, cellBlockScanner); + } + private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { int totalSize = buf.readInt(); ByteBufInputStream in = new ByteBufInputStream(buf); @@ -166,31 +197,17 @@ private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOExcep call.setException(remoteExc); return; } - Message value; - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - if (!builder.mergeDelimitedFrom(in)) { - // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF - // before reading any bytes out, so here we need to manually throw the EOFException out - throw new EOFException( - "EOF while reading response with type: " + call.responseDefaultType.getClass().getName()); - } - value = builder.build(); - } else { - value = null; - } - CellScanner cellBlockScanner; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - // Maybe we could read directly from the ByteBuf. - // The problem here is that we do not know when to release it. - byte[] cellBlock = new byte[size]; - buf.readBytes(cellBlock); - cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); - } else { - cellBlockScanner = null; + try { + finishCall(responseHeader, in, call); + } catch (IOException e) { + // As the call has been removed from id2Call map, if we hit an exception here, the + // exceptionCaught method can not help us finish the call, so here we need to catch the + // exception and finish it + // And in netty, the decoding the frame based, when reaching here we have already read a full + // frame, so hitting exception here does not mean the stream decoding is broken, thus we do + // not need to throw the exception out and close the connection. + call.setException(e); } - call.setResponse(value, cellBlockScanner); } @Override