Skip to content

Commit

Permalink
[SPARK-24578][CORE] Cap sub-region's size of returned nio buffer
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do `consolidateIfNeeded` anymore as we are using
```
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
```
in ChunkedByteBuffer. If we have many small chunks, it could cause the `buf.notBuffer(...)` have very bad performance in the case that we have to call `copyByteBuf(...)` many times.

## How was this patch tested?
Existing unit tests and also test in production

Author: Wenbo Zhao <[email protected]>

Closes apache#21593 from WenboZhao/spark-24578.
  • Loading branch information
WenboZhao authored and zsxwing committed Jun 20, 2018
1 parent c5a0d11 commit 3f4bda7
Showing 1 changed file with 5 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,15 @@ protected void deallocate() {
}

private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
ByteBuffer buffer = buf.nioBuffer();
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
target.write(buffer) : writeNioBuffer(target, buffer);
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
int written = target.write(buffer);
buf.skipBytes(written);
return written;
}

private int writeNioBuffer(
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int ret = 0;

try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = writeCh.write(buf);
} finally {
buf.limit(originalLimit);
}

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
Expand Down

0 comments on commit 3f4bda7

Please sign in to comment.