Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate support for checksums. #35

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ These configuration values need to be passed to Spark to load and configure the

Individual blocks are hashed in order to get improved performance when accessing them on the remote filesystem.
The generated paths look like this: `${rootDir}/${mapId % 10}/${appDir}/ShuffleBlock{.data / .index}`
- `spark.shuffle.checksum.enabled`: `false` - Disables checksums for Shuffle files. Reason: This is not yet supported.

### Debug options / optimizations

These are optional configuration values that control how s3-shuffle behaves.

- `spark.shuffle.checksum.enabled`: `false` - Disables checksums on Shuffle files (default: `true`, recommended: `false`).
- `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`)
- `spark.shuffle.s3.alwaysCreateIndex`: Always create an index file, even if all partitions have empty length (
default: `false`)
Expand Down
7 changes: 4 additions & 3 deletions examples/sql/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ EXECUTOR_MEM=${EXECUTOR_MEM:-13000M}
EXECUTOR_MEMORY_OVERHEAD=${EXECUTOR_MEMORY_OVERHEAD:-3000M}
INSTANCES=${INSTANCES:-4}

CHECKSUM_ENABLED=${CHECKSUM_ENABLED:-"true"}

EXTRA_CLASSPATHS='/opt/spark/jars/*'
EXECUTOR_JAVA_OPTIONS="-Dsun.nio.PageAlignDirectMemory=true"
DRIVER_JAVA_OPTIONS="-Dsun.nio.PageAlignDirectMemory=true"
Expand Down Expand Up @@ -54,9 +56,8 @@ SPARK_S3_SHUFFLE_CONFIG=(
--conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT}
--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager"
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=false
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
--conf spark.shuffle.s3.rootDir=${SHUFFLE_DESTINATION}
--conf spark.kubernetes.executor.podTemplateFile=${SCRIPT_DIR}/../templates/executor_nfs.yml
)

if (( "$USE_S3_SHUFFLE" == 0 )); then
Expand All @@ -70,7 +71,7 @@ if (( "$USE_NFS_SHUFFLE" == 1 )); then
SPARK_S3_SHUFFLE_CONFIG=(
--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager"
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=false
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
--conf spark.shuffle.s3.rootDir=local:///nfs/
--conf spark.kubernetes.executor.podTemplateFile=${SCRIPT_DIR}/../templates/executor_nfs.yml
--conf spark.kubernetes.driver.podTemplateFile=${SCRIPT_DIR}/../templates/driver_nfs.yml
Expand Down
3 changes: 3 additions & 0 deletions examples/sql/run_tpcds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ set -euo pipefail
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
cd "${SCRIPT_DIR}"

# Shuffle on S3
export USE_S3_SHUFFLE=${USE_S3_SHUFFLE:-1}

export SIZE=${SIZE:-100}
export PROCESS_TAG=tpcds-${SIZE}
./run_single_query.sh tpcds
5 changes: 3 additions & 2 deletions examples/terasort/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ SIZE=${SIZE:-1g}

# Shuffle on S3
USE_S3_SHUFFLE=${USE_S3_SHUFFLE:-1}
CHECKSUM_ENABLED=${CHECKSUM_ENABLED:-"true"}

EXTRA_CLASSPATHS='/opt/spark/jars/*'
EXECUTOR_JAVA_OPTIONS="-Dsun.nio.PageAlignDirectMemory=true"
Expand Down Expand Up @@ -54,7 +55,7 @@ SPARK_S3_SHUFFLE_CONFIG=(
--conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT}
--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager"
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=false
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
--conf spark.shuffle.s3.rootDir=${SHUFFLE_DESTINATION}
)

Expand All @@ -71,7 +72,7 @@ if (( "$USE_NFS_SHUFFLE" == 1 )); then
SPARK_S3_SHUFFLE_CONFIG=(
--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager"
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=false
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
--conf spark.shuffle.s3.rootDir=local:///nfs/
--conf spark.kubernetes.executor.podTemplateFile=${SCRIPT_DIR}/../templates/executor_nfs.yml
--conf spark.kubernetes.driver.podTemplateFile=${SCRIPT_DIR}/../templates/driver_nfs.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@ class S3ShuffleMapOutputWriter(
bufferedStream.close()
}

// Write index
// Write index and checksum.
if (partitionLengths.sum > 0 || S3ShuffleDispatcher.get.alwaysCreateIndex) {
S3ShuffleHelper.writePartitionLengths(shuffleId, mapId, partitionLengths)
if (dispatcher.checksumEnabled) {
S3ShuffleHelper.writeChecksum(shuffleId, mapId, checksums)
}
}
MapOutputCommitMessage.of(partitionLengths)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends S
val out = dispatcher.createBlock(ShuffleDataBlockId(shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID))
// Note: HDFS does not exposed a nio-buffered write interface.
Utils.copyStream(in, out, closeStreams = true)

if (dispatcher.checksumEnabled) {
S3ShuffleHelper.writeChecksum(shuffleId, mapId, checksums)
}
S3ShuffleHelper.writePartitionLengths(shuffleId, mapId, partitionLengths)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.apache.spark.shuffle.helper

import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, config}
import org.apache.spark.shuffle.ConcurrentObjectMap
import org.apache.spark.storage._
import org.apache.spark.{SparkConf, SparkEnv}
Expand Down Expand Up @@ -36,6 +36,7 @@ class S3ShuffleDispatcher extends Logging {
val forceBatchFetch: Boolean = conf.getBoolean("spark.shuffle.s3.forceBatchFetch", defaultValue = false)
val prefetchBatchSize: Int = conf.getInt("spark.shuffle.s3.prefetchBatchSize", defaultValue = 25)
val prefetchThreadPoolSize: Int = conf.getInt("spark.shuffle.s3.prefetchThreadPoolSize", defaultValue = 100)
val checksumEnabled: Boolean = SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED)

val appDir = f"/${startTime}-${appId}/"
val fs: FileSystem = FileSystem.get(URI.create(rootDir), {
Expand All @@ -50,6 +51,7 @@ class S3ShuffleDispatcher extends Logging {
logInfo(s"- spark.shuffle.s3.forceBatchFetch=${forceBatchFetch}")
logInfo(s"- spark.shuffle.s3.prefetchBlockSize=${prefetchBatchSize}")
logInfo(s"- spark.shuffle.s3.prefetchThreadPoolSize=${prefetchThreadPoolSize}")
logInfo(s"- ${config.SHUFFLE_CHECKSUM_ENABLED.key}=${checksumEnabled}")

def removeRoot(): Boolean = {
Range(0, 10).map(idx => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.ConcurrentObjectMap
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage.{BlockId, ShuffleIndexBlockId}
import org.apache.spark.storage.{BlockId, ShuffleChecksumBlockId, ShuffleIndexBlockId}

import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.ByteBuffer
import java.util
import java.util.zip.{Adler32, CRC32, Checksum}
import java.util.regex.Pattern
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.{Await, Future, blocking}

object S3ShuffleHelper extends Logging {
private lazy val serializer = SparkEnv.get.serializer
private lazy val dispatcher = S3ShuffleDispatcher.get

private val cachedChecksums = new ConcurrentObjectMap[ShuffleChecksumBlockId, Array[Long]]()
private val cachedArrayLengths = new ConcurrentObjectMap[ShuffleIndexBlockId, Array[Long]]()
private val cachedIndexBlocks = new ConcurrentObjectMap[Int, Array[ShuffleIndexBlockId]]()

Expand All @@ -42,18 +44,16 @@ object S3ShuffleHelper extends Logging {
* @param partitionLengths
*/
def writePartitionLengths(shuffleId: Int, mapId: Long, partitionLengths: Array[Long]): Unit = {
writePartitionLengths(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), partitionLengths)
writeArrayAsBlock(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), partitionLengths)
}

/**
* Write partitionLengths for blockId.
*
* @param blockId
* @param partitionLengths
*/
def writePartitionLengths(blockId: ShuffleIndexBlockId, partitionLengths: Array[Long]): Unit = {
def writeChecksum(shuffleId: Int, mapId: Long, checksums: Array[Long]): Unit = {
writeArrayAsBlock(ShuffleChecksumBlockId(shuffleId = shuffleId, mapId = mapId, reduceId = 0), checksums)
}

def writeArrayAsBlock(blockId: BlockId, array: Array[Long]): Unit = {
val serializerInstance = serializer.newInstance()
val buffer = serializerInstance.serialize[Array[Long]](partitionLengths)
val buffer = serializerInstance.serialize[Array[Long]](array)
val file = new BufferedOutputStream(dispatcher.createBlock(blockId))
file.write(buffer.array(), buffer.arrayOffset(), buffer.limit())
file.flush()
Expand Down Expand Up @@ -106,10 +106,33 @@ object S3ShuffleHelper extends Logging {
* @return
*/
def getPartitionLengthsCached(blockId: ShuffleIndexBlockId): Array[Long] = {
cachedArrayLengths.getOrElsePut(blockId, getPartitionLengths)
cachedArrayLengths.getOrElsePut(blockId, readBlockAsArray)
}

def getChecksumsCached(shuffleId: Int, mapId: Long): Array[Long] = {
cachedChecksums.getOrElsePut(ShuffleChecksumBlockId(shuffleId, mapId, 0), readBlockAsArray)
}

def getChecksums(shuffleId: Int, mapId: Long): Array[Long] = {
getChecksums(ShuffleChecksumBlockId(shuffleId = shuffleId, mapId = mapId, reduceId = 0))
}

def getChecksums(blockId: ShuffleChecksumBlockId): Array[Long] = {
readBlockAsArray(blockId)
}

def createChecksumAlgorithm(algorithm: String): Checksum = {
algorithm match {
case "ADLER32" =>
new Adler32()
case "CRC32" =>
new CRC32()
case _ =>
throw new UnsupportedOperationException(f"Spark-S3-Shuffle: Unsupported shuffle checksum algorithm: ${algorithm}. Check with Spark.")
}
}

private def getPartitionLengths(blockId: ShuffleIndexBlockId): Array[Long] = {
private def readBlockAsArray(blockId: BlockId) = {
val file = new BufferedInputStream(dispatcher.openBlock(blockId))
var buffer = new Array[Byte](1024)
var numBytes = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright 2023- IBM Inc. All rights reserved
* SPDX-License-Identifier: Apache2.0
*/

package org.apache.spark.storage

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.helper.S3ShuffleHelper

import java.io.InputStream
import java.util.zip.Checksum

/**
* Validates checksum stored for blockId on stream with checksumAlgorithm.
*/
class S3ChecksumValidationStream(
blockId: BlockId,
stream: InputStream,
checksumAlgorithm: String) extends InputStream with Logging {

private val (shuffleId: Int, mapId: Long, startReduceId: Int, endReduceId: Int) = blockId match {
case ShuffleBlockId(shuffleId, mapId, reduceId) => (shuffleId, mapId, reduceId, reduceId + 1)
case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, endReduceId) => (shuffleId, mapId, startReduceId, endReduceId)
case _ => throw new SparkException(s"S3ChecksumValidationStream does not support block type ${blockId}")
}

private val checksum: Checksum = S3ShuffleHelper.createChecksumAlgorithm(checksumAlgorithm)
private val lengths: Array[Long] = S3ShuffleHelper.getPartitionLengthsCached(shuffleId, mapId)
private val referenceChecksums: Array[Long] = S3ShuffleHelper.getChecksumsCached(shuffleId, mapId)

private var pos: Long = 0
private var reduceId: Int = startReduceId
private var blockLength: Long = lengths(reduceId)
private def eof(): Boolean = reduceId > endReduceId

validateChecksum()

override def read(): Int = synchronized {
val res = stream.read()
if (res > 0) {
checksum.update(res)
pos += 1
validateChecksum()
}
if (eof() && res >= 0) {
throw new SparkException(s"Read ${res} bytes even though we're at end of stream.")
}
res
}

override def read(b: Array[Byte], off: Int, len: Int): Int = synchronized {
val l: Int = scala.math.min(len, blockLength - pos).toInt
val res = stream.read(b, off, l)
if (res > 0) {
checksum.update(b, off, res)
pos += res
validateChecksum()
}
if (eof() && res >= 0) {
logError(s"Read ${res} bytes even though we're at end of stream.")
}
res
}

private def validateChecksum(): Unit = synchronized {
if (pos != blockLength) {
return
}
if (checksum.getValue != referenceChecksums(reduceId)) {
throw new SparkException(s"Invalid checksum detected for ${blockId.name}")
}
checksum.reset()
pos = 0
reduceId += 1
if (reduceId < endReduceId) {
blockLength = lengths(reduceId)
if (blockLength == 0) {
validateChecksum()
}
} else {
blockLength = Long.MaxValue
}
}

override def close(): Unit = {
super.close()
stream.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.apache.spark.storage
import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper}

class S3ShuffleBlockIterator(
shuffleBlocks: Iterator[BlockId]
shuffleBlocks: Iterator[BlockId],
) extends Iterator[(BlockId, S3ShuffleBlockStream)] {

private val dispatcher = S3ShuffleDispatcher.get
Expand Down
16 changes: 13 additions & 3 deletions src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReadMetricsReporter,
import org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchBlockInfo
import org.apache.spark.util.{CompletionIterator, ThreadUtils}
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.{InterruptibleIterator, SparkConf, SparkEnv, TaskContext}
import org.apache.spark.{InterruptibleIterator, SparkConf, SparkEnv, SparkException, TaskContext}

import java.io.BufferedInputStream
import java.io.{BufferedInputStream, InputStream}
import java.util.zip.{CheckedInputStream, Checksum}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

Expand All @@ -56,6 +57,9 @@ class S3ShuffleReader[K, C](
private val dep = handle.dependency
private val maxBufferSize = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)

private val checksumEnabled: Boolean = conf.get(config.SHUFFLE_CHECKSUM_ENABLED)
private val checksumAlgorithm: String = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)

private val fetchContinousBlocksInBatch: Boolean = {
val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects
val compressed = conf.get(config.SHUFFLE_COMPRESS)
Expand Down Expand Up @@ -113,8 +117,14 @@ class S3ShuffleReader[K, C](
stream.read()
stream.reset()

val checkedStream = if (checksumEnabled) {
new S3ChecksumValidationStream(blockId, stream, checksumAlgorithm)
} else {
stream
}

serializerInstance
.deserializeStream(serializerManager.wrapStream(blockId, stream))
.deserializeStream(serializerManager.wrapStream(blockId, checkedStream))
.asKeyValueIterator
}(S3ShuffleReader.asyncExecutionContext)
}
Expand Down
Loading