Skip to content

Commit

Permalink
[apache#596] Use off heap memory to read HDFS data
Browse files Browse the repository at this point in the history
  • Loading branch information
jerqi committed Apr 10, 2023
1 parent cae7cd9 commit e302695
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.uniffle.common.util.RssUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Product2;
Expand Down Expand Up @@ -73,9 +74,9 @@ public RssShuffleDataIterator(
this.codec = compress ? Codec.newInstance(rssConf) : null;
}

public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data, int size) {
public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data) {
clearDeserializationStream();
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size), true);
byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data), codec != null);
deserializationStream = serializerInstance.deserializeStream(byteBufInputStream);
return deserializationStream.asKeyValueIterator();
}
Expand Down Expand Up @@ -108,10 +109,10 @@ public boolean hasNext() {
long fetchDuration = System.currentTimeMillis() - startFetch;
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (rawData != null) {
int uncompressedLen = uncompress(rawBlock, rawData);
uncompress(rawBlock, rawData);
// create new iterator for shuffle data
long startSerialization = System.currentTimeMillis();
recordsIterator = createKVIterator(uncompressedData, uncompressedLen);
recordsIterator = createKVIterator(uncompressedData);
long serializationDuration = System.currentTimeMillis() - startSerialization;
readTime += fetchDuration;
serializeTime += serializationDuration;
Expand All @@ -137,11 +138,7 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {

int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
// todo: support off-heap bytebuffer
uncompressedData = ByteBuffer.allocate(uncompressedLen);
}
uncompressedData.clear();
uncompressedData = ByteBuffer.allocateDirect(uncompressedLen);
long startDecompress = System.currentTimeMillis();
codec.decompress(rawData, uncompressedLen, uncompressedData, 0);
unCompressedBytesLength += uncompressedLen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
private final List<ShuffleServerInfo> shuffleServerInfoList;
private int shuffleId;
private int partitionId;
private byte[] readBuffer;
private ByteBuffer readBuffer;
private Roaring64NavigableMap blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private Roaring64NavigableMap pendingBlockIds;
Expand Down Expand Up @@ -219,8 +219,10 @@ public CompressedShuffleBlock readShuffleBlockData() {
}

if (bs != null) {
return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer,
bs.getOffset(), bs.getLength()), bs.getUncompressLength());
ByteBuffer compressedBuffer = readBuffer.duplicate();
compressedBuffer.position(bs.getOffset());
compressedBuffer.limit(bs.getOffset() + bs.getLength());
return new CompressedShuffleBlock(compressedBuffer, bs.getUncompressLength());
}
// current segment hasn't data, try next segment
return readShuffleBlockData();
Expand All @@ -238,8 +240,14 @@ private int read() {
if (sdr == null) {
return 0;
}
readBuffer = sdr.getData();
if (readBuffer == null || readBuffer.length == 0) {
if (readBuffer != null) {
boolean isReleased = RssUtils.releaseByteBuffer(readBuffer);
if (!isReleased) {
LOG.warn("release read byte buffer fail, it shouldn't happen frequently");
}
}
readBuffer = sdr.getDataBuffer();
if (readBuffer == null || readBuffer.capacity() == 0) {
return 0;
}
bufferSegmentQueue.addAll(sdr.getBufferSegments());
Expand All @@ -253,6 +261,12 @@ public void checkProcessedBlockIds() {

@Override
public void close() {
if (readBuffer != null) {
boolean isReleased = RssUtils.releaseByteBuffer(readBuffer);
if (!isReleased) {
LOG.warn("release read byte buffer fail when the read client is closed");
}
}
if (clientReadHandler != null) {
clientReadHandler.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.uniffle.common;

import java.nio.ByteBuffer;
import java.util.List;

import com.google.common.collect.Lists;

public class ShuffleDataResult {

private final byte[] data;
private final ByteBuffer data;
private final List<BufferSegment> bufferSegments;

public ShuffleDataResult() {
Expand All @@ -35,11 +36,15 @@ public ShuffleDataResult(byte[] data) {
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
this.data = data;
this.data = ByteBuffer.wrap(data);
this.bufferSegments = bufferSegments;
}

public byte[] getData() {
return data.array();
}

public ByteBuffer getDataBuffer() {
return data;
}

Expand All @@ -48,7 +53,7 @@ public List<BufferSegment> getBufferSegments() {
}

public boolean isEmpty() {
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.length == 0;
return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.capacity() == 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;
Expand All @@ -46,6 +47,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import io.netty.buffer.Unpooled;
import io.netty.channel.unix.Errors;
import org.eclipse.jetty.util.MultiException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
Expand Down Expand Up @@ -364,4 +366,8 @@ public static List<String> getConfiguredLocalDirs(RssConf conf) {
return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
}
}

public static boolean releaseByteBuffer(ByteBuffer byteBuffer) {
return Unpooled.wrappedBuffer(byteBuffer).release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

package org.apache.uniffle.storage.api;

import java.nio.ByteBuffer;

public interface FileReader {

byte[] read(long offset, int length);

byte[] read();

ByteBuffer readByteBuffer(long offset, int length);

ByteBuffer readByteBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -79,6 +80,34 @@ public byte[] read() {
}
}

@Override
public ByteBuffer readByteBuffer(long offset, int length) {
try {
fsDataInputStream.seek(offset);
ByteBuffer buffer = ByteBuffer.allocateDirect(length);
readFully(buffer);
buffer.flip();
return buffer;
} catch (Exception e) {
LOG.warn("Can't read buffer data for path:" + path + " with offset[" + offset + "], length[" + length + "]", e);
return ByteBuffer.allocateDirect(0);
}
}

private void readFully(ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
int result = fsDataInputStream.read(buffer);
if (result < 0) {
return;
}
}
}

@Override
public ByteBuffer readByteBuffer() {
return null;
}

public long getOffset() throws IOException {
return fsDataInputStream.getPos();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,4 @@ public List<ShuffleDataSegment> getShuffleDataSegments() {
return shuffleDataSegments;
}

public String getFilePrefix() {
return filePrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,6 +77,16 @@ public byte[] read() {
}
}

@Override
public ByteBuffer readByteBuffer(long offset, int length) {
throw new UnsupportedOperationException("Local file reader don't support off heap read now");
}

@Override
public ByteBuffer readByteBuffer() {
throw new UnsupportedOperationException("Local file reader don't support off heap read now");
}

@Override
public synchronized void close() {
if (dataInputStream != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,6 @@ public static String getShuffleDataPath(String appId, int shuffleId, int start,
String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(start), String.valueOf(end)));
}

public static String getUploadShuffleDataPath(String appId, int shuffleId, int partitionId) {
return String.join(
HDFS_PATH_SEPARATOR,
appId,
String.valueOf(shuffleId),
String.valueOf(partitionId));
}

public static String getCombineDataPath(String appId, int shuffleId) {
return String.join(
HDFS_PATH_SEPARATOR,
Expand Down Expand Up @@ -193,10 +185,6 @@ public static void createDirIfNotExist(FileSystem fileSystem, String pathString)
}
}

// index file header is $PartitionNum | [($PartitionId | $PartitionFileLength | $PartitionDataFileLength), ] | $CRC
public static long getIndexFileHeaderLen(int partitionNum) {
return 4 + (4 + 8 + 8) * (long) partitionNum + 8;
}

public static long uploadFile(File file, HdfsFileWriter writer, int bufferSize) throws IOException {
try (FileInputStream inputStream = new FileInputStream(file)) {
Expand Down

0 comments on commit e302695

Please sign in to comment.