Skip to content

Commit

Permalink
[apache#804] improvement: Optimize CRC calculation of ByteBuffer (apa…
Browse files Browse the repository at this point in the history
…che#805)

### What changes were proposed in this pull request?
1. remove unnecessary methods
2. Optimize crc method

### Why are the changes needed?

Fix: apache#804

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New ut
  • Loading branch information
jerqi authored Apr 10, 2023
1 parent 9752471 commit cae7cd9
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,20 @@ public static long getCrc32(byte[] buf, int offset, int length) {
return crc32.getValue();
}

// you may need to flip at first
public static long getCrc32(ByteBuffer byteBuffer) {
if (byteBuffer.hasArray()) {
return getCrc32(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining());
} else {
byte[] byteArray = new byte[byteBuffer.remaining()];
byteBuffer.get(byteArray);
return getCrc32(byteArray);
return getCrc32(byteBuffer, byteBuffer.position(), byteBuffer.limit() - byteBuffer.position());
}

public static long getCrc32(ByteBuffer byteBuffer, int offset, int length) {
CRC32 crc32 = new CRC32();
ByteBuffer crcBuffer = byteBuffer.duplicate();
crcBuffer.position(offset);
for (int i = 0; i < length; ) {
int len = Math.min(LENGTH_PER_CRC, length - i);
crcBuffer.limit(crcBuffer.position() + len);
crc32.update(crcBuffer);
i += len;
}
return crc32.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public void crc32TestWithByteBuff() throws Exception {
buffer.flip();
long expectedChecksum = ChecksumUtils.getCrc32(data);
assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer));
assertEquals(length, buffer.position());

// test heap ByteBuffer
path = Paths.get(file.getAbsolutePath());
Expand All @@ -89,4 +88,29 @@ public void crc32TestWithByteBuff() throws Exception {
assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer));

}

@Test
public void crc32ByteBufferTest() throws Exception {
int length = 32 * 1024 * 1024;
byte[] data = new byte[length];
Random random = new Random();
random.nextBytes(data);
long expectCrc = ChecksumUtils.getCrc32(data);
ByteBuffer originBuffer = ByteBuffer.allocateDirect(length);
originBuffer.put(data);
originBuffer.flip();
assertEquals(expectCrc, ChecksumUtils.getCrc32(ByteBuffer.wrap(data)));
ByteBuffer directBuffer = ByteBuffer.allocateDirect(length);
directBuffer.put(data);
directBuffer.flip();
assertEquals(expectCrc, ChecksumUtils.getCrc32(directBuffer));
assertEquals(originBuffer, directBuffer);
int offset = random.nextInt(15);
ByteBuffer directOffsetBuffer = ByteBuffer.allocateDirect(length + offset);
byte[] dataOffset = new byte[offset];
random.nextBytes(dataOffset);
directOffsetBuffer.put(dataOffset);
directOffsetBuffer.put(data);
assertEquals(expectCrc, ChecksumUtils.getCrc32(directOffsetBuffer, offset, length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -31,10 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.api.FileWriter;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;

public class HdfsFileWriter implements FileWriter, Closeable {

Expand Down Expand Up @@ -103,30 +100,6 @@ public void writeIndex(FileBasedShuffleSegment segment) throws IOException {
fsDataOutputStream.writeLong(segment.getTaskAttemptId());
}

// index file header is PartitionNum | [(PartitionId | PartitionFileLength | PartitionDataFileLength), ] | CRC
public void writeHeader(List<Integer> partitionList,
List<Long> indexFileSizeList,
List<Long> dataFileSizeList) throws IOException {
ByteBuffer headerContentBuf = ByteBuffer.allocate(
(int)ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size()) - ShuffleStorageUtils.getHeaderCrcLen());
fsDataOutputStream.writeInt(partitionList.size());
headerContentBuf.putInt(partitionList.size());
for (int i = 0; i < partitionList.size(); i++) {
fsDataOutputStream.writeInt(partitionList.get(i));
fsDataOutputStream.writeLong(indexFileSizeList.get(i));
fsDataOutputStream.writeLong(dataFileSizeList.get(i));
headerContentBuf.putInt(partitionList.get(i));
headerContentBuf.putLong(indexFileSizeList.get(i));
headerContentBuf.putLong(dataFileSizeList.get(i));
}
headerContentBuf.flip();
fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf));
long len = ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size());
if (fsDataOutputStream.getPos() != len) {
throw new IOException("Fail to write index header");
}
}

public long nextOffset() {
return nextOffset;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ public static String generateIndexFileName(String fileNamePrefix) {
return fileNamePrefix + Constants.SHUFFLE_INDEX_FILE_SUFFIX;
}

public static String generateAbsoluteFilePrefix(String base, String key, int partition, String id) {
return String.join(
HDFS_PATH_SEPARATOR,
base,
key,
String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(partition), String.valueOf(partition)),
id);
}

public static List<DataFileSegment> mergeSegments(
String path, List<FileBasedShuffleSegment> segments, int readBufferSize) {
List<DataFileSegment> dataFileSegments = Lists.newArrayList();
Expand Down Expand Up @@ -207,10 +198,6 @@ public static long getIndexFileHeaderLen(int partitionNum) {
return 4 + (4 + 8 + 8) * (long) partitionNum + 8;
}

public static int getHeaderCrcLen() {
return 8;
}

public static long uploadFile(File file, HdfsFileWriter writer, int bufferSize) throws IOException {
try (FileInputStream inputStream = new FileInputStream(file)) {
return writer.copy(inputStream, bufferSize);
Expand Down

0 comments on commit cae7cd9

Please sign in to comment.