diff --git a/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java b/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java index f74510c6d8..32ecf86761 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java @@ -40,14 +40,20 @@ public static long getCrc32(byte[] buf, int offset, int length) { return crc32.getValue(); } - // you may need to flip at first public static long getCrc32(ByteBuffer byteBuffer) { - if (byteBuffer.hasArray()) { - return getCrc32(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining()); - } else { - byte[] byteArray = new byte[byteBuffer.remaining()]; - byteBuffer.get(byteArray); - return getCrc32(byteArray); + return getCrc32(byteBuffer, byteBuffer.position(), byteBuffer.limit() - byteBuffer.position()); + } + + public static long getCrc32(ByteBuffer byteBuffer, int offset, int length) { + CRC32 crc32 = new CRC32(); + ByteBuffer crcBuffer = byteBuffer.duplicate(); + crcBuffer.position(offset); + for (int i = 0; i < length; ) { + int len = Math.min(LENGTH_PER_CRC, length - i); + crcBuffer.limit(crcBuffer.position() + len); + crc32.update(crcBuffer); + i += len; } + return crc32.getValue(); } } diff --git a/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java index 7e24ddfef6..fc822f427f 100644 --- a/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java +++ b/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java @@ -76,7 +76,6 @@ public void crc32TestWithByteBuff() throws Exception { buffer.flip(); long expectedChecksum = ChecksumUtils.getCrc32(data); assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer)); - assertEquals(length, buffer.position()); // test heap ByteBuffer path = Paths.get(file.getAbsolutePath()); @@ -89,4 +88,29 @@ public void crc32TestWithByteBuff() throws Exception { assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer)); } + + @Test + public void crc32ByteBufferTest() throws Exception { + int length = 32 * 1024 * 1024; + byte[] data = new byte[length]; + Random random = new Random(); + random.nextBytes(data); + long expectCrc = ChecksumUtils.getCrc32(data); + ByteBuffer originBuffer = ByteBuffer.allocateDirect(length); + originBuffer.put(data); + originBuffer.flip(); + assertEquals(expectCrc, ChecksumUtils.getCrc32(ByteBuffer.wrap(data))); + ByteBuffer directBuffer = ByteBuffer.allocateDirect(length); + directBuffer.put(data); + directBuffer.flip(); + assertEquals(expectCrc, ChecksumUtils.getCrc32(directBuffer)); + assertEquals(originBuffer, directBuffer); + int offset = random.nextInt(15); + ByteBuffer directOffsetBuffer = ByteBuffer.allocateDirect(length + offset); + byte[] dataOffset = new byte[offset]; + random.nextBytes(dataOffset); + directOffsetBuffer.put(dataOffset); + directOffsetBuffer.put(data); + assertEquals(expectCrc, ChecksumUtils.getCrc32(directOffsetBuffer, offset, length)); + } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java index 6750d6ba2e..5e179a6bc2 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java @@ -21,7 +21,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -31,10 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.uniffle.common.util.ChecksumUtils; import org.apache.uniffle.storage.api.FileWriter; import org.apache.uniffle.storage.common.FileBasedShuffleSegment; -import org.apache.uniffle.storage.util.ShuffleStorageUtils; public class HdfsFileWriter implements FileWriter, Closeable { @@ -103,30 +100,6 @@ public void writeIndex(FileBasedShuffleSegment segment) throws IOException { fsDataOutputStream.writeLong(segment.getTaskAttemptId()); } - // index file header is PartitionNum | [(PartitionId | PartitionFileLength | PartitionDataFileLength), ] | CRC - public void writeHeader(List partitionList, - List indexFileSizeList, - List dataFileSizeList) throws IOException { - ByteBuffer headerContentBuf = ByteBuffer.allocate( - (int)ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size()) - ShuffleStorageUtils.getHeaderCrcLen()); - fsDataOutputStream.writeInt(partitionList.size()); - headerContentBuf.putInt(partitionList.size()); - for (int i = 0; i < partitionList.size(); i++) { - fsDataOutputStream.writeInt(partitionList.get(i)); - fsDataOutputStream.writeLong(indexFileSizeList.get(i)); - fsDataOutputStream.writeLong(dataFileSizeList.get(i)); - headerContentBuf.putInt(partitionList.get(i)); - headerContentBuf.putLong(indexFileSizeList.get(i)); - headerContentBuf.putLong(dataFileSizeList.get(i)); - } - headerContentBuf.flip(); - fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf)); - long len = ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size()); - if (fsDataOutputStream.getPos() != len) { - throw new IOException("Fail to write index header"); - } - } - public long nextOffset() { return nextOffset; } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java deleted file mode 100644 index 07edff1739..0000000000 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.storage.handler.impl; - -import java.nio.ByteBuffer; -import java.util.List; - -import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.uniffle.common.util.ChecksumUtils; -import org.apache.uniffle.storage.util.ShuffleStorageUtils; - -public class ShuffleIndexHeader { - - private static final Logger LOG = LoggerFactory.getLogger(ShuffleIndexHeader.class); - - private int partitionNum; - private List indexes = Lists.newArrayList(); - private long crc; - - public ShuffleIndexHeader(int partitionNum, List indexes, long crc) { - this.partitionNum = partitionNum; - this.indexes = indexes; - this.crc = crc; - } - - public void setPartitionNum(int partitionNum) { - this.partitionNum = partitionNum; - } - - public int getPartitionNum() { - return partitionNum; - } - - public List getIndexes() { - return indexes; - } - - public long getCrc() { - return crc; - } - - public void setCrc(long crc) { - this.crc = crc; - } - - public int getHeaderLen() { - return (int) ShuffleStorageUtils.getIndexFileHeaderLen(partitionNum); - } - - // No side effects on byteBuffer - public static ShuffleIndexHeader extractHeader(ByteBuffer byteBuffer) { - try { - int partitionNum = byteBuffer.getInt(); - ByteBuffer headerContentBuf = ByteBuffer.allocate( - (int) ShuffleStorageUtils.getIndexFileHeaderLen(partitionNum) - - ShuffleStorageUtils.getHeaderCrcLen()); - headerContentBuf.putInt(partitionNum); - List entries = Lists.newArrayList(); - - for (int i = 0; i < partitionNum; i++) { - int partitionId = byteBuffer.getInt(); - long partitionLength = byteBuffer.getLong(); - long partitionDataFileLength = byteBuffer.getLong(); - headerContentBuf.putInt(partitionId); - headerContentBuf.putLong(partitionLength); - headerContentBuf.putLong(partitionDataFileLength); - - ShuffleIndexHeader.Entry entry - = new ShuffleIndexHeader.Entry(partitionId, partitionLength, partitionDataFileLength); - entries.add(entry); - } - - headerContentBuf.flip(); - long crc = byteBuffer.getLong(); - long actualCrc = ChecksumUtils.getCrc32(headerContentBuf); - if (crc != actualCrc) { - LOG.error("Read header exception, expected crc[{}] != actual crc[{}]", crc, actualCrc); - return null; - } - // clear the side effect on byteBuffer - byteBuffer.clear(); - return new ShuffleIndexHeader(partitionNum, entries, crc); - } catch (Exception e) { - LOG.error("Fail to extract header from {}, with exception", byteBuffer.toString(), e); - return null; - } - } - - static class Entry { - int partitionId; - long partitionIndexLength; - long partitionDataLength; - - Entry(int partitionId, long partitionIndexLength, long partitionDataLength) { - this.partitionId = partitionId; - this.partitionIndexLength = partitionIndexLength; - this.partitionDataLength = partitionDataLength; - } - - public int getPartitionId() { - return partitionId; - } - - public long getPartitionIndexLength() { - return partitionIndexLength; - } - - public long getPartitionDataLength() { - return partitionDataLength; - } - } -} diff --git a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java index 4b19408f01..e9abec1597 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java +++ b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java @@ -56,15 +56,6 @@ public static String generateIndexFileName(String fileNamePrefix) { return fileNamePrefix + Constants.SHUFFLE_INDEX_FILE_SUFFIX; } - public static String generateAbsoluteFilePrefix(String base, String key, int partition, String id) { - return String.join( - HDFS_PATH_SEPARATOR, - base, - key, - String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(partition), String.valueOf(partition)), - id); - } - public static List mergeSegments( String path, List segments, int readBufferSize) { List dataFileSegments = Lists.newArrayList(); @@ -207,10 +198,6 @@ public static long getIndexFileHeaderLen(int partitionNum) { return 4 + (4 + 8 + 8) * (long) partitionNum + 8; } - public static int getHeaderCrcLen() { - return 8; - } - public static long uploadFile(File file, HdfsFileWriter writer, int bufferSize) throws IOException { try (FileInputStream inputStream = new FileInputStream(file)) { return writer.copy(inputStream, bufferSize);