Skip to content

Commit

Permalink
[#596] feat(netty): Use off heap memory to read HDFS data (#806)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. Use off heap memory to read HDFS data
2. remove some unused code
(to do: use off heap memory to read HDFS index data)

### Why are the changes needed?
Fix: #596 

### Does this PR introduce _any_ user-facing change?
Yes, add the document.

### How was this patch tested?
Pass origin tests.
  • Loading branch information
jerqi authored Apr 13, 2023
1 parent b0ae6db commit c6cde5d
Show file tree
Hide file tree
Showing 22 changed files with 265 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList,
readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable);
readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable, false);
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, merger, copyPhase, reporter, metrics,
shuffleReadClient, blockIdBitmap.getLongCardinality(), RssMRConfig.toRssConf(rssJobConf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ public class RssSparkConfig {
+ "whether this conf is set or not"))
.createWithDefault("");

public static final ConfigEntry<Boolean> RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE = createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConf.OFF_HEAP_MEMORY_ENABLE.key())
.doc(RssClientConf.OFF_HEAP_MEMORY_ENABLE.description()))
.createWithDefault(RssClientConf.OFF_HEAP_MEMORY_ENABLE.defaultValue());

public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.RssUtils;

public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {

Expand Down Expand Up @@ -74,9 +75,14 @@ public RssShuffleDataIterator(
this.codec = compress ? Codec.newInstance(rssConf) : null;
}

public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data, int size) {
public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data) {
clearDeserializationStream();
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size), true);
// Unpooled.wrapperBuffer will return a ByteBuf, but this ByteBuf won't release direct/heap memory
// when the ByteBuf is released. This is because the UnpooledDirectByteBuf's doFree is false
// when it is constructed from user provided ByteBuffer.
// The `releaseOnClose` parameter doesn't take effect, we would release the data ByteBuffer
// manually.
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data), true);
deserializationStream = serializerInstance.deserializeStream(byteBufInputStream);
return deserializationStream.asKeyValueIterator();
}
Expand All @@ -89,6 +95,7 @@ private void clearDeserializationStream() {
LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
}
}

if (deserializationStream != null) {
deserializationStream.close();
}
Expand All @@ -109,10 +116,10 @@ public boolean hasNext() {
long fetchDuration = System.currentTimeMillis() - startFetch;
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (rawData != null) {
int uncompressedLen = uncompress(rawBlock, rawData);
uncompress(rawBlock, rawData);
// create new iterator for shuffle data
long startSerialization = System.currentTimeMillis();
recordsIterator = createKVIterator(uncompressedData, uncompressedLen);
recordsIterator = createKVIterator(uncompressedData);
long serializationDuration = System.currentTimeMillis() - startSerialization;
readTime += fetchDuration;
serializeTime += serializationDuration;
Expand All @@ -139,15 +146,21 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {
int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
// todo: support off-heap bytebuffer
uncompressedData = ByteBuffer.allocate(uncompressedLen);
if (uncompressedData != null) {
RssUtils.releaseByteBuffer(uncompressedData);
}
uncompressedData = rawData.isDirect()
? ByteBuffer.allocateDirect(uncompressedLen) : ByteBuffer.allocate(uncompressedLen);
}
uncompressedData.clear();
long startDecompress = System.currentTimeMillis();
codec.decompress(rawData, uncompressedLen, uncompressedData, 0);
unCompressedBytesLength += uncompressedLen;
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
// uncompressedData's limit is not updated by `codec.decompress`, however this information is used
// by `createKVIterator`. Update limit here.
uncompressedData.limit(uncompressedData.position() + uncompressedLen);
} else {
uncompressedData = rawData;
}
Expand All @@ -162,6 +175,11 @@ public Product2<K, C> next() {

public BoxedUnit cleanup() {
clearDeserializationStream();
// Uncompressed data is released in this class, Compressed data is release in the class ShuffleReadClientImpl
// So if codec is null, we don't release the data when the stream is closed
if (codec != null) {
RssUtils.releaseByteBuffer(uncompressedData);
}
if (shuffleReadClient != null) {
shuffleReadClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public Iterator<Product2<K, C>> read() {
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, shuffleId, startPartition, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable);
shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable,
rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator rssShuffleDataIterator = new RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class MultiPartitionIterator<K, C> extends AbstractIterator<Product2<K, C>> {
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, shuffleId, partition, storageType, basePath, indexReadLimit, readBufferSize,
1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList,
hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable);
hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable,
rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator<K, C> iterator = new RssShuffleDataIterator<>(
shuffleDependency.serializer(), shuffleReadClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest
request.getHadoopConf(),
request.getIdHelper(),
request.getShuffleDataDistributionType(),
request.isExpectedTaskIdsBitmapFilterEnable()
request.isExpectedTaskIdsBitmapFilterEnable(),
request.isOffHeapEnabled()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
private final List<ShuffleServerInfo> shuffleServerInfoList;
private int shuffleId;
private int partitionId;
private byte[] readBuffer;
private ByteBuffer readBuffer;
private Roaring64NavigableMap blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private Roaring64NavigableMap pendingBlockIds;
Expand Down Expand Up @@ -78,7 +78,8 @@ public ShuffleReadClientImpl(
Configuration hadoopConf,
IdHelper idHelper,
ShuffleDataDistributionType dataDistributionType,
boolean expectedTaskIdsBitmapFilterEnable) {
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.blockIdBitmap = blockIdBitmap;
Expand Down Expand Up @@ -106,6 +107,9 @@ public ShuffleReadClientImpl(
if (expectedTaskIdsBitmapFilterEnable) {
request.useExpectedTaskIdsBitmapFilter();
}
if (offHeapEnabled) {
request.enableOffHeap();
}

List<Long> removeBlockIds = Lists.newArrayList();
blockIdBitmap.forEach(bid -> {
Expand Down Expand Up @@ -142,7 +146,7 @@ public ShuffleReadClientImpl(
this(storageType, appId, shuffleId, partitionId, indexReadLimit,
partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
idHelper, ShuffleDataDistributionType.NORMAL, false);
idHelper, ShuffleDataDistributionType.NORMAL, false, false);
}

@Override
Expand Down Expand Up @@ -219,8 +223,10 @@ public CompressedShuffleBlock readShuffleBlockData() {
}

if (bs != null) {
return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer,
bs.getOffset(), bs.getLength()), bs.getUncompressLength());
ByteBuffer compressedBuffer = readBuffer.duplicate();
compressedBuffer.position(bs.getOffset());
compressedBuffer.limit(bs.getOffset() + bs.getLength());
return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength());
}
// current segment hasn't data, try next segment
return readShuffleBlockData();
Expand All @@ -238,8 +244,11 @@ private int read() {
if (sdr == null) {
return 0;
}
readBuffer = sdr.getData();
if (readBuffer == null || readBuffer.length == 0) {
if (readBuffer != null) {
RssUtils.releaseByteBuffer(readBuffer);
}
readBuffer = sdr.getDataBuffer();
if (readBuffer == null || readBuffer.capacity() == 0) {
return 0;
}
bufferSegmentQueue.addAll(sdr.getBufferSegments());
Expand All @@ -253,6 +262,9 @@ public void checkProcessedBlockIds() {

@Override
public void close() {
if (readBuffer != null) {
RssUtils.releaseByteBuffer(readBuffer);
}
if (clientReadHandler != null) {
clientReadHandler.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class CreateShuffleReadClientRequest {
private IdHelper idHelper;
private ShuffleDataDistributionType shuffleDataDistributionType = ShuffleDataDistributionType.NORMAL;
private boolean expectedTaskIdsBitmapFilterEnable = false;
private boolean offHeapEnabled = false;

public CreateShuffleReadClientRequest(
String appId,
Expand All @@ -61,33 +62,14 @@ public CreateShuffleReadClientRequest(
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
ShuffleDataDistributionType dataDistributionType,
boolean expectedTaskIdsBitmapFilterEnable) {
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, offHeapEnabled);
this.shuffleDataDistributionType = dataDistributionType;
}

public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
int partitionId,
String storageType,
String basePath,
int indexReadLimit,
int readBufferSize,
int partitionNumPerRange,
int partitionNum,
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
boolean expectedTaskIdsBitmapFilterEnable) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
}

public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
Expand All @@ -103,7 +85,8 @@ public CreateShuffleReadClientRequest(
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
IdHelper idHelper,
boolean expectedTaskIdsBitmapFilterEnable) {
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
Expand All @@ -119,6 +102,28 @@ public CreateShuffleReadClientRequest(
this.hadoopConf = hadoopConf;
this.idHelper = idHelper;
this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
this.offHeapEnabled = offHeapEnabled;
}

public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
int partitionId,
String storageType,
String basePath,
int indexReadLimit,
int readBufferSize,
int partitionNumPerRange,
int partitionNum,
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
boolean expectedTaskIdsBitmapFilterEnable,
boolean offHeapEnabled) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, offHeapEnabled);
}

public String getAppId() {
Expand Down Expand Up @@ -184,4 +189,8 @@ public ShuffleDataDistributionType getShuffleDataDistributionType() {
public boolean isExpectedTaskIdsBitmapFilterEnable() {
return expectedTaskIdsBitmapFilterEnable;
}

public boolean isOffHeapEnabled() {
return offHeapEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.uniffle.common;

import java.nio.ByteBuffer;
import java.util.List;

import com.google.common.collect.Lists;

public class ShuffleDataResult {

private final byte[] data;
private final ByteBuffer data;
private final List<BufferSegment> bufferSegments;

public ShuffleDataResult() {
Expand All @@ -34,12 +35,29 @@ public ShuffleDataResult(byte[] data) {
this(data, Lists.newArrayList());
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
public ShuffleDataResult(ByteBuffer data, List<BufferSegment> bufferSegments) {
this.data = data;
this.bufferSegments = bufferSegments;
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
this(data != null ? ByteBuffer.wrap(data) : null, bufferSegments);
}

public byte[] getData() {
if (data == null) {
return null;
}
if (data.hasArray()) {
return data.array();
}
ByteBuffer dataBuffer = data.duplicate();
byte[] byteArray = new byte[dataBuffer.remaining()];
dataBuffer.get(byteArray);
return byteArray;
}

public ByteBuffer getDataBuffer() {
return data;
}

Expand All @@ -48,7 +66,7 @@ public List<BufferSegment> getBufferSegments() {
}

public boolean isEmpty() {
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.length == 0;
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.capacity() == 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,10 @@ public class RssClientConf {
.intType()
.noDefaultValue()
.withDescription("internal configuration to indicate which port is actually bind for shuffle manager service.");

public static final ConfigOption<Boolean> OFF_HEAP_MEMORY_ENABLE = ConfigOptions
.key("rss.client.off.heap.memory.enable")
.booleanType()
.defaultValue(false)
.withDescription("Client can use off heap memory");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;
Expand All @@ -47,6 +48,7 @@
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import io.netty.channel.unix.Errors;
import io.netty.util.internal.PlatformDependent;
import org.eclipse.jetty.util.MultiException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -367,4 +369,11 @@ public static List<String> getConfiguredLocalDirs(RssConf conf) {
return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
}
}

public static void releaseByteBuffer(ByteBuffer byteBuffer) {
if (byteBuffer == null || !byteBuffer.isDirect()) {
return;
}
PlatformDependent.freeDirectBuffer(byteBuffer);
}
}
Loading

0 comments on commit c6cde5d

Please sign in to comment.