diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java index 372a265e90..d08a5b2b2f 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java @@ -195,7 +195,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException { CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest( appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize, partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList, - readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable); + readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable, false); ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request); RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, merger, copyPhase, reporter, metrics, shuffleReadClient, blockIdBitmap.getLongCardinality(), RssMRConfig.toRssConf(rssJobConf)); diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index e3c315557a..20974e7313 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -231,6 +231,11 @@ public class RssSparkConfig { + "whether this conf is set or not")) .createWithDefault(""); + public static final ConfigEntry RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE = createBooleanBuilder( + new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConf.OFF_HEAP_MEMORY_ENABLE.key()) + .doc(RssClientConf.OFF_HEAP_MEMORY_ENABLE.description())) + .createWithDefault(RssClientConf.OFF_HEAP_MEMORY_ENABLE.defaultValue()); + public static final ConfigEntry RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = createIntegerBuilder( new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER)) .createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE); diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index cb24be53dd..c905d27c66 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -41,6 +41,7 @@ import org.apache.uniffle.client.response.CompressedShuffleBlock; import org.apache.uniffle.common.compression.Codec; import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.util.RssUtils; public class RssShuffleDataIterator extends AbstractIterator> { @@ -74,9 +75,14 @@ public RssShuffleDataIterator( this.codec = compress ? Codec.newInstance(rssConf) : null; } - public Iterator> createKVIterator(ByteBuffer data, int size) { + public Iterator> createKVIterator(ByteBuffer data) { clearDeserializationStream(); - byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size), true); + // Unpooled.wrapperBuffer will return a ByteBuf, but this ByteBuf won't release direct/heap memory + // when the ByteBuf is released. This is because the UnpooledDirectByteBuf's doFree is false + // when it is constructed from user provided ByteBuffer. + // The `releaseOnClose` parameter doesn't take effect, we would release the data ByteBuffer + // manually. + byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data), true); deserializationStream = serializerInstance.deserializeStream(byteBufInputStream); return deserializationStream.asKeyValueIterator(); } @@ -89,6 +95,7 @@ private void clearDeserializationStream() { LOG.warn("Can't close ByteBufInputStream, memory may be leaked."); } } + if (deserializationStream != null) { deserializationStream.close(); } @@ -109,10 +116,10 @@ public boolean hasNext() { long fetchDuration = System.currentTimeMillis() - startFetch; shuffleReadMetrics.incFetchWaitTime(fetchDuration); if (rawData != null) { - int uncompressedLen = uncompress(rawBlock, rawData); + uncompress(rawBlock, rawData); // create new iterator for shuffle data long startSerialization = System.currentTimeMillis(); - recordsIterator = createKVIterator(uncompressedData, uncompressedLen); + recordsIterator = createKVIterator(uncompressedData); long serializationDuration = System.currentTimeMillis() - startSerialization; readTime += fetchDuration; serializeTime += serializationDuration; @@ -139,8 +146,11 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) { int uncompressedLen = rawBlock.getUncompressLength(); if (codec != null) { if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) { - // todo: support off-heap bytebuffer - uncompressedData = ByteBuffer.allocate(uncompressedLen); + if (uncompressedData != null) { + RssUtils.releaseByteBuffer(uncompressedData); + } + uncompressedData = rawData.isDirect() + ? ByteBuffer.allocateDirect(uncompressedLen) : ByteBuffer.allocate(uncompressedLen); } uncompressedData.clear(); long startDecompress = System.currentTimeMillis(); @@ -148,6 +158,9 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) { unCompressedBytesLength += uncompressedLen; long decompressDuration = System.currentTimeMillis() - startDecompress; decompressTime += decompressDuration; + // uncompressedData's limit is not updated by `codec.decompress`, however this information is used + // by `createKVIterator`. Update limit here. + uncompressedData.limit(uncompressedData.position() + uncompressedLen); } else { uncompressedData = rawData; } @@ -162,6 +175,11 @@ public Product2 next() { public BoxedUnit cleanup() { clearDeserializationStream(); + // Uncompressed data is released in this class, Compressed data is release in the class ShuffleReadClientImpl + // So if codec is null, we don't release the data when the stream is closed + if (codec != null) { + RssUtils.releaseByteBuffer(uncompressedData); + } if (shuffleReadClient != null) { shuffleReadClient.close(); } diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java index 96252f443e..ec97611325 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java @@ -121,7 +121,8 @@ public Iterator> read() { CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest( appId, shuffleId, startPartition, storageType, basePath, indexReadLimit, readBufferSize, partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, - shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable); + shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable, + rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE)); ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request); RssShuffleDataIterator rssShuffleDataIterator = new RssShuffleDataIterator( shuffleDependency.serializer(), shuffleReadClient, diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java index 4bd44d1b62..7cba50514c 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java @@ -227,7 +227,8 @@ class MultiPartitionIterator extends AbstractIterator> { CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest( appId, shuffleId, partition, storageType, basePath, indexReadLimit, readBufferSize, 1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList, - hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable); + hadoopConf, dataDistributionType, expectedTaskIdsBitmapFilterEnable, + rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE)); ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request); RssShuffleDataIterator iterator = new RssShuffleDataIterator<>( shuffleDependency.serializer(), shuffleReadClient, diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java index 39d9ded0ea..710d9a1360 100644 --- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java +++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java @@ -87,7 +87,8 @@ public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest request.getHadoopConf(), request.getIdHelper(), request.getShuffleDataDistributionType(), - request.isExpectedTaskIdsBitmapFilterEnable() + request.isExpectedTaskIdsBitmapFilterEnable(), + request.isOffHeapEnabled() ); } } diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index 852a253ffb..4773e30e79 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -50,7 +50,7 @@ public class ShuffleReadClientImpl implements ShuffleReadClient { private final List shuffleServerInfoList; private int shuffleId; private int partitionId; - private byte[] readBuffer; + private ByteBuffer readBuffer; private Roaring64NavigableMap blockIdBitmap; private Roaring64NavigableMap taskIdBitmap; private Roaring64NavigableMap pendingBlockIds; @@ -78,7 +78,8 @@ public ShuffleReadClientImpl( Configuration hadoopConf, IdHelper idHelper, ShuffleDataDistributionType dataDistributionType, - boolean expectedTaskIdsBitmapFilterEnable) { + boolean expectedTaskIdsBitmapFilterEnable, + boolean offHeapEnabled) { this.shuffleId = shuffleId; this.partitionId = partitionId; this.blockIdBitmap = blockIdBitmap; @@ -106,6 +107,9 @@ public ShuffleReadClientImpl( if (expectedTaskIdsBitmapFilterEnable) { request.useExpectedTaskIdsBitmapFilter(); } + if (offHeapEnabled) { + request.enableOffHeap(); + } List removeBlockIds = Lists.newArrayList(); blockIdBitmap.forEach(bid -> { @@ -142,7 +146,7 @@ public ShuffleReadClientImpl( this(storageType, appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange, partitionNum, readBufferSize, storageBasePath, blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf, - idHelper, ShuffleDataDistributionType.NORMAL, false); + idHelper, ShuffleDataDistributionType.NORMAL, false, false); } @Override @@ -219,8 +223,10 @@ public CompressedShuffleBlock readShuffleBlockData() { } if (bs != null) { - return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer, - bs.getOffset(), bs.getLength()), bs.getUncompressLength()); + ByteBuffer compressedBuffer = readBuffer.duplicate(); + compressedBuffer.position(bs.getOffset()); + compressedBuffer.limit(bs.getOffset() + bs.getLength()); + return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength()); } // current segment hasn't data, try next segment return readShuffleBlockData(); @@ -238,8 +244,11 @@ private int read() { if (sdr == null) { return 0; } - readBuffer = sdr.getData(); - if (readBuffer == null || readBuffer.length == 0) { + if (readBuffer != null) { + RssUtils.releaseByteBuffer(readBuffer); + } + readBuffer = sdr.getDataBuffer(); + if (readBuffer == null || readBuffer.capacity() == 0) { return 0; } bufferSegmentQueue.addAll(sdr.getBufferSegments()); @@ -253,6 +262,9 @@ public void checkProcessedBlockIds() { @Override public void close() { + if (readBuffer != null) { + RssUtils.releaseByteBuffer(readBuffer); + } if (clientReadHandler != null) { clientReadHandler.close(); } diff --git a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java index a4b4a3251b..a6f676fe4e 100644 --- a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java +++ b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java @@ -45,6 +45,7 @@ public class CreateShuffleReadClientRequest { private IdHelper idHelper; private ShuffleDataDistributionType shuffleDataDistributionType = ShuffleDataDistributionType.NORMAL; private boolean expectedTaskIdsBitmapFilterEnable = false; + private boolean offHeapEnabled = false; public CreateShuffleReadClientRequest( String appId, @@ -61,33 +62,14 @@ public CreateShuffleReadClientRequest( List shuffleServerInfoList, Configuration hadoopConf, ShuffleDataDistributionType dataDistributionType, - boolean expectedTaskIdsBitmapFilterEnable) { + boolean expectedTaskIdsBitmapFilterEnable, + boolean offHeapEnabled) { this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize, partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList, - hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable); + hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, offHeapEnabled); this.shuffleDataDistributionType = dataDistributionType; } - public CreateShuffleReadClientRequest( - String appId, - int shuffleId, - int partitionId, - String storageType, - String basePath, - int indexReadLimit, - int readBufferSize, - int partitionNumPerRange, - int partitionNum, - Roaring64NavigableMap blockIdBitmap, - Roaring64NavigableMap taskIdBitmap, - List shuffleServerInfoList, - Configuration hadoopConf, - boolean expectedTaskIdsBitmapFilterEnable) { - this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize, - partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList, - hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable); - } - public CreateShuffleReadClientRequest( String appId, int shuffleId, @@ -103,7 +85,8 @@ public CreateShuffleReadClientRequest( List shuffleServerInfoList, Configuration hadoopConf, IdHelper idHelper, - boolean expectedTaskIdsBitmapFilterEnable) { + boolean expectedTaskIdsBitmapFilterEnable, + boolean offHeapEnabled) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; @@ -119,6 +102,28 @@ public CreateShuffleReadClientRequest( this.hadoopConf = hadoopConf; this.idHelper = idHelper; this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable; + this.offHeapEnabled = offHeapEnabled; + } + + public CreateShuffleReadClientRequest( + String appId, + int shuffleId, + int partitionId, + String storageType, + String basePath, + int indexReadLimit, + int readBufferSize, + int partitionNumPerRange, + int partitionNum, + Roaring64NavigableMap blockIdBitmap, + Roaring64NavigableMap taskIdBitmap, + List shuffleServerInfoList, + Configuration hadoopConf, + boolean expectedTaskIdsBitmapFilterEnable, + boolean offHeapEnabled) { + this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize, + partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList, + hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, offHeapEnabled); } public String getAppId() { @@ -184,4 +189,8 @@ public ShuffleDataDistributionType getShuffleDataDistributionType() { public boolean isExpectedTaskIdsBitmapFilterEnable() { return expectedTaskIdsBitmapFilterEnable; } + + public boolean isOffHeapEnabled() { + return offHeapEnabled; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java index 1e8109e48f..19e93dd10f 100644 --- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java @@ -17,13 +17,14 @@ package org.apache.uniffle.common; +import java.nio.ByteBuffer; import java.util.List; import com.google.common.collect.Lists; public class ShuffleDataResult { - private final byte[] data; + private final ByteBuffer data; private final List bufferSegments; public ShuffleDataResult() { @@ -34,12 +35,29 @@ public ShuffleDataResult(byte[] data) { this(data, Lists.newArrayList()); } - public ShuffleDataResult(byte[] data, List bufferSegments) { + public ShuffleDataResult(ByteBuffer data, List bufferSegments) { this.data = data; this.bufferSegments = bufferSegments; } + public ShuffleDataResult(byte[] data, List bufferSegments) { + this(data != null ? ByteBuffer.wrap(data) : null, bufferSegments); + } + public byte[] getData() { + if (data == null) { + return null; + } + if (data.hasArray()) { + return data.array(); + } + ByteBuffer dataBuffer = data.duplicate(); + byte[] byteArray = new byte[dataBuffer.remaining()]; + dataBuffer.get(byteArray); + return byteArray; + } + + public ByteBuffer getDataBuffer() { return data; } @@ -48,7 +66,7 @@ public List getBufferSegments() { } public boolean isEmpty() { - return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.length == 0; + return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.capacity() == 0; } } diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 33d20ddf21..35d8172c05 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -102,4 +102,10 @@ public class RssClientConf { .intType() .noDefaultValue() .withDescription("internal configuration to indicate which port is actually bind for shuffle manager service."); + + public static final ConfigOption OFF_HEAP_MEMORY_ENABLE = ConfigOptions + .key("rss.client.off.heap.memory.enable") + .booleanType() + .defaultValue(false) + .withDescription("Client can use off heap memory"); } diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java index 92688083fa..3935ab2f1b 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java @@ -31,6 +31,7 @@ import java.net.InetAddress; import java.net.InterfaceAddress; import java.net.NetworkInterface; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Enumeration; @@ -47,6 +48,7 @@ import com.google.common.collect.Sets; import com.google.common.net.InetAddresses; import io.netty.channel.unix.Errors; +import io.netty.util.internal.PlatformDependent; import org.eclipse.jetty.util.MultiException; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; @@ -367,4 +369,11 @@ public static List getConfiguredLocalDirs(RssConf conf) { return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH); } } + + public static void releaseByteBuffer(ByteBuffer byteBuffer) { + if (byteBuffer == null || !byteBuffer.isDirect()) { + return; + } + PlatformDependent.freeDirectBuffer(byteBuffer); + } } diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java index 526a4e8491..5affe92aa6 100644 --- a/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java +++ b/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java @@ -30,9 +30,10 @@ public class ShuffleDataResultTest { @Test public void testEmpty() { List segments = Collections.singletonList(new BufferSegment(1, 2, 3, 4, 5, 6)); + byte[] bytes = null; assertTrue(new ShuffleDataResult().isEmpty()); assertTrue(new ShuffleDataResult(new byte[1]).isEmpty()); - assertTrue(new ShuffleDataResult(null, segments).isEmpty()); + assertTrue(new ShuffleDataResult(bytes, segments).isEmpty()); assertTrue(new ShuffleDataResult(new byte[0], segments).isEmpty()); assertTrue(new ShuffleDataResult(new byte[1], null).isEmpty()); assertTrue(new ShuffleDataResult(new byte[1], Collections.emptyList()).isEmpty()); diff --git a/docs/client_guide.md b/docs/client_guide.md index 413a5334a5..43bbceb6fc 100644 --- a/docs/client_guide.md +++ b/docs/client_guide.md @@ -191,6 +191,7 @@ The important configuration is listed as following. |spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server| |spark.rss.client.unregister.thread.pool.size|10|The max size of thread pool of unregistering| |spark.rss.client.unregister.request.timeout.sec|10|The max timeout sec when doing unregister to remote shuffle-servers| +|spark.rss.client.off.heap.memory.enable|false|The client use off heap memory to process data| ### MapReduce Specialized Setting diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java index 43b4a7c0bc..9bbc0e2863 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java @@ -20,24 +20,36 @@ import java.io.File; import java.util.Arrays; import java.util.Map; +import java.util.Random; import com.google.common.collect.Maps; import org.apache.spark.SparkConf; import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; public class RepartitionWithHdfsMultiStorageRssTest extends RepartitionTest { + + private static final Logger LOG = LoggerFactory.getLogger(RepartitionWithHdfsMultiStorageRssTest.class); + @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.name()); + Random random = new Random(); + // todo: we should use parameterized test to modify here when we could solve the issue that + // the test case use too long time. + boolean useOffHeap = random.nextInt() % 2 == 0; + LOG.info("use off heap: " + useOffHeap); + dynamicConf.put(RssSparkConfig.RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE.key(), String.valueOf(useOffHeap)); + CoordinatorConf coordinatorConf = getCoordinatorConf(); addDynamicConf(coordinatorConf, dynamicConf); createCoordinatorServer(coordinatorConf); diff --git a/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java b/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java index b3be22bbb0..18341e25ee 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java +++ b/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java @@ -17,9 +17,41 @@ package org.apache.uniffle.storage.api; +import java.nio.ByteBuffer; + public interface FileReader { + /** + * This method will return a byte array, will read + * the length of this file data from offset position. + * + * @param offset the file offset which we start to read + * @param length the data length which we need to read + * @return file data + */ byte[] read(long offset, int length); + /** + * This method will return a byte array, will read + * the data from current position to the end of file + * @return file data + */ byte[] read(); + + /** + * This method will return a direct byte buffer, will read + * the length of this file data from offset position. + * + * @param offset the file offset which we start to read + * @param length the data length which we need to read + * @return file data + */ + ByteBuffer readAsByteBuffer(long offset, int length); + + /** + * This method will return a direct byte buffer, will read + * the data from current position to the end of file + * @return file data + */ + ByteBuffer readAsByteBuffer(); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java index 08dd548cee..efcc8fd67c 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java +++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java @@ -163,8 +163,8 @@ private ClientReadHandler getHdfsClientReadHandler(CreateShuffleReadHandlerReque request.getHadoopConf(), request.getDistributionType(), request.getExpectTaskIds(), - ssi.getId() - ); + ssi.getId(), + request.isOffHeapEnabled()); } public ShuffleDeleteHandler createShuffleDeleteHandler(CreateShuffleDeleteHandlerRequest request) { diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java index a8ac642565..003d03e484 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java @@ -54,6 +54,7 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler { private int readHandlerIndex; private ShuffleDataDistributionType distributionType; private Roaring64NavigableMap expectTaskIds; + private boolean offHeapEnable = false; public HdfsClientReadHandler( String appId, @@ -69,7 +70,8 @@ public HdfsClientReadHandler( Configuration hadoopConf, ShuffleDataDistributionType distributionType, Roaring64NavigableMap expectTaskIds, - String shuffleServerId) { + String shuffleServerId, + boolean offHeapEnable) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; @@ -84,6 +86,7 @@ public HdfsClientReadHandler( this.distributionType = distributionType; this.expectTaskIds = expectTaskIds; this.shuffleServerId = shuffleServerId; + this.offHeapEnable = offHeapEnable; } // Only for test @@ -101,7 +104,7 @@ public HdfsClientReadHandler( Configuration hadoopConf) { this(appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange, partitionNum, readBufferSize, expectBlockIds, processBlockIds, storageBasePath, hadoopConf, ShuffleDataDistributionType.NORMAL, - Roaring64NavigableMap.bitmapOf(), null); + Roaring64NavigableMap.bitmapOf(), null, false); } protected void init(String fullShufflePath) { @@ -139,7 +142,7 @@ protected void init(String fullShufflePath) { HdfsShuffleReadHandler handler = new HdfsShuffleReadHandler( appId, shuffleId, partitionId, filePrefix, readBufferSize, expectBlockIds, processBlockIds, hadoopConf, - distributionType, expectTaskIds); + distributionType, expectTaskIds, offHeapEnable); readHandlers.add(handler); } catch (Exception e) { LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java index 38d6439e0c..1c9cb41aac 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -79,6 +80,35 @@ public byte[] read() { } } + @Override + public ByteBuffer readAsByteBuffer(long offset, int length) { + try { + fsDataInputStream.seek(offset); + ByteBuffer buffer = ByteBuffer.allocateDirect(length); + readFully(buffer); + buffer.flip(); + return buffer; + } catch (Exception e) { + LOG.warn("Can't read buffer data for path:" + path + " with offset[" + offset + "], length[" + length + "]", e); + return ByteBuffer.allocateDirect(0); + } + } + + @Override + public ByteBuffer readAsByteBuffer() { + try { + long length = getFileLen(); + if (length - fsDataInputStream.getPos() > Integer.MAX_VALUE) { + LOG.warn("File " + path + "length is too long"); + return ByteBuffer.allocateDirect(0); + } + return readAsByteBuffer(fsDataInputStream.getPos(), (int) length); + } catch (Exception e) { + LOG.warn("Can't read buffer data for path:" + path, e); + return ByteBuffer.allocateDirect(0); + } + } + public long getOffset() throws IOException { return fsDataInputStream.getPos(); } @@ -90,6 +120,15 @@ public synchronized void close() throws IOException { } } + private void readFully(ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + int result = fsDataInputStream.read(buffer); + if (result < 0) { + return; + } + } + } + public Path getPath() { return path; } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java index 3e749679f6..0be589b611 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java @@ -18,6 +18,7 @@ package org.apache.uniffle.storage.handler.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -43,6 +44,7 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler { protected final String filePrefix; protected final HdfsFileReader indexReader; protected final HdfsFileReader dataReader; + protected final boolean offHeapEnabled; public HdfsShuffleReadHandler( String appId, @@ -54,12 +56,14 @@ public HdfsShuffleReadHandler( Roaring64NavigableMap processBlockIds, Configuration conf, ShuffleDataDistributionType distributionType, - Roaring64NavigableMap expectTaskIds) throws Exception { + Roaring64NavigableMap expectTaskIds, + boolean offHeapEnabled) throws Exception { super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds, distributionType, expectTaskIds); this.filePrefix = filePrefix; this.indexReader = createHdfsReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf); this.dataReader = createHdfsReader(ShuffleStorageUtils.generateDataFileName(filePrefix), conf); + this.offHeapEnabled = offHeapEnabled; } // Only for test @@ -73,7 +77,7 @@ public HdfsShuffleReadHandler( Roaring64NavigableMap processBlockIds, Configuration conf) throws Exception { this(appId, shuffleId, partitionId, filePrefix, readBufferSize, expectBlockIds, - processBlockIds, conf, ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf()); + processBlockIds, conf, ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf(), false); } @Override @@ -106,17 +110,23 @@ protected ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegmen return null; } - byte[] data = readShuffleData(shuffleDataSegment.getOffset(), expectedLength); - if (data.length == 0) { + ByteBuffer data; + if (offHeapEnabled) { + data = readShuffleDataByteBuffer(shuffleDataSegment.getOffset(), expectedLength); + } else { + data = ByteBuffer.wrap(readShuffleData(shuffleDataSegment.getOffset(), expectedLength)); + } + int length = data.limit() - data.position(); + if (length == 0) { LOG.warn("Fail to read expected[{}] data, actual[{}] and segment is {} from file {}.data", - expectedLength, data.length, shuffleDataSegment, filePrefix); + expectedLength, length, shuffleDataSegment, filePrefix); return null; } ShuffleDataResult shuffleDataResult = new ShuffleDataResult(data, shuffleDataSegment.getBufferSegments()); if (shuffleDataResult.isEmpty()) { LOG.warn("Shuffle data is empty, expected length {}, data length {}, segment {} in file {}.data", - expectedLength, data.length, shuffleDataSegment, filePrefix); + expectedLength, length, shuffleDataSegment, filePrefix); return null; } @@ -133,6 +143,17 @@ protected byte[] readShuffleData(long offset, int expectedLength) { return data; } + private ByteBuffer readShuffleDataByteBuffer(long offset, int expectedLength) { + ByteBuffer data = dataReader.readAsByteBuffer(offset, expectedLength); + int length = data.limit() - data.position(); + if (length != expectedLength) { + LOG.warn("Fail to read byte buffer expected[{}] data, actual[{}] from file {}.data", + expectedLength, length, filePrefix); + return ByteBuffer.allocateDirect(0); + } + return data; + } + private long getDataFileLen() { try { return dataReader.getFileLen(); @@ -168,7 +189,4 @@ public List getShuffleDataSegments() { return shuffleDataSegments; } - public String getFilePrefix() { - return filePrefix; - } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java index 99eb0cd0b6..cceab2e74c 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java @@ -21,6 +21,7 @@ import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; @@ -76,6 +77,16 @@ public byte[] read() { } } + @Override + public ByteBuffer readAsByteBuffer(long offset, int length) { + throw new UnsupportedOperationException("Local file reader don't support off heap read now"); + } + + @Override + public ByteBuffer readAsByteBuffer() { + throw new UnsupportedOperationException("Local file reader don't support off heap read now"); + } + @Override public synchronized void close() { if (dataInputStream != null) { diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java index 0801893a79..a3aaab7251 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java +++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java @@ -46,6 +46,7 @@ public class CreateShuffleReadHandlerRequest { private ShuffleDataDistributionType distributionType; private Roaring64NavigableMap expectTaskIds; private boolean expectedTaskIdsBitmapFilterEnable; + private boolean offHeapEnabled; private IdHelper idHelper; @@ -195,4 +196,12 @@ public IdHelper getIdHelper() { public void setIdHelper(IdHelper idHelper) { this.idHelper = idHelper; } + + public void enableOffHeap() { + this.offHeapEnabled = true; + } + + public boolean isOffHeapEnabled() { + return offHeapEnabled; + } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java index e9abec1597..d268775c1d 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java +++ b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java @@ -120,14 +120,6 @@ public static String getShuffleDataPath(String appId, int shuffleId, int start, String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(start), String.valueOf(end))); } - public static String getUploadShuffleDataPath(String appId, int shuffleId, int partitionId) { - return String.join( - HDFS_PATH_SEPARATOR, - appId, - String.valueOf(shuffleId), - String.valueOf(partitionId)); - } - public static String getCombineDataPath(String appId, int shuffleId) { return String.join( HDFS_PATH_SEPARATOR, @@ -193,10 +185,6 @@ public static void createDirIfNotExist(FileSystem fileSystem, String pathString) } } - // index file header is $PartitionNum | [($PartitionId | $PartitionFileLength | $PartitionDataFileLength), ] | $CRC - public static long getIndexFileHeaderLen(int partitionNum) { - return 4 + (4 + 8 + 8) * (long) partitionNum + 8; - } public static long uploadFile(File file, HdfsFileWriter writer, int bufferSize) throws IOException { try (FileInputStream inputStream = new FileInputStream(file)) {