Skip to content

Commit

Permalink
Spark 3.1: Configure the number of prefixes.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Jul 14, 2023
1 parent ed0b7c5 commit d440c16
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 35 deletions.
30 changes: 18 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,39 @@ These configuration values need to be passed to Spark to load and configure the
- `s3a://zrlio-tmp/` (Hadoop-AWS + AWS-SDK)
- `cos://zrlio-tmp.resources/` (Hadoop-Cloud + Stocator)

Individual blocks are hashed in order to get improved performance when accessing them on the remote filesystem.
The generated paths look like this: `${rootDir}/${mapId % 10}/${appDir}/ShuffleBlock{.data / .index}`
Individual blocks are prefixed in order to get improved performance when accessing them on the remote filesystem.
The generated paths look like this: `${rootDir}/${mapId % 10}/${appDir}/ShuffleBlock{.data / .index}`.

The number of prefixes can be controlled with the option `spark.shuffle.s3.folderPrefixes`.

### Features

Changing these values might have an impact on performance.

- `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`)
- `spark.shuffle.s3.folderPrefixes`: The number of prefixes to use when storing files on S3
(default: `10`, minimum: `1`).

**Note**: This option can be used to optimize performance on object stores which have a prefix ratelimit.
- `spark.shuffle.s3.prefetchBatchSize`: Prefetch batch size (default: `25`). Controls how many partitions are prefetched
concurrently per task.
- `spark.shuffle.s3.prefetchThreadPoolSize`: Prefetch thread pool size (default: `100`). The total size of the thread
pool used for prefetching the shuffle blocks.
- `spark.shuffle.checksum.algorithm`: Checksum algorithm (default: `ADLER32`, supported: `ADLER32`, `CRC32`), backport
from Spark 3.2.0.
- `spark.shuffle.checksum.enabled`: Enables checksums on Shuffle files (default: `false`), backport from Spark 3.2.0.

**Note**: Creates additional overhead if active.

- `spark.shuffle.s3.alwaysCreateIndex`: Always create an index file, even if all partitions have empty length (
default: `false`)

**Note**: Creates additional overhead if active.
### Debug configuration options

- `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`)
- `spark.shuffle.s3.prefetchBatchSize`: Prefetch batch size (default: `25`). Controls how many partitions are prefetched
concurrently per task.
- `spark.shuffle.s3.prefetchThreadPoolSize`: Prefetch thread pool size (default: `100`). The total size of the thread
pool used for prefetching the shuffle blocks.
Configuration options used for debugging:

### Unsafe configuration options
- `spark.shuffle.s3.alwaysCreateIndex`: Always create an index file, even if all partitions have empty length (
default: `false`)

Unsafe configurations options:
**Note**: Creates additional overhead if active.

- `spark.shuffle.s3.useBlockManager`: Use the Spark block manager to compute blocks (default: `true`).

Expand Down
2 changes: 1 addition & 1 deletion examples/sql/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ SPARK_S3_SHUFFLE_CONFIG=(
--conf spark.hadoop.fs.s3a.access.key=${S3A_ACCESS_KEY}
--conf spark.hadoop.fs.s3a.secret.key=${S3A_SECRET_KEY}
--conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT}
--conf spark.shuffle.s3.useBlockManager=${USE_BLOCK_MANAGER:-false}
--conf spark.shuffle.s3.useBlockManager=${USE_BLOCK_MANAGER:-true}
--conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager"
--conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO
--conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,47 @@ class S3ShuffleDispatcher extends Logging {
val appId: String = conf.getAppId
val startTime: String = conf.get("spark.app.startTime")

// Required
val rootDir = conf.get("spark.shuffle.s3.rootDir", defaultValue = "sparkS3shuffle")
private val isCOS = rootDir.startsWith("cos://")
private val isS3A = rootDir.startsWith("s3a://")

val checksumAlgorithm: String = conf.get("spark.shuffle.checksum.algorithm", defaultValue = "ADLER32")
val checksumEnabled: Boolean = conf.getBoolean("spark.shuffle.checksum.enabled", defaultValue = false)
// Optional
val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true)
val folderPrefixes: Int = conf.getInt("spark.shuffle.s3.folderPrefixes", defaultValue = 10)
val prefetchBatchSize: Int = conf.getInt("spark.shuffle.s3.prefetchBatchSize", defaultValue = 25)
val prefetchThreadPoolSize: Int = conf.getInt("spark.shuffle.s3.prefetchThreadPoolSize", defaultValue = 100)
val alwaysCreateIndex: Boolean = conf.getBoolean("spark.shuffle.s3.alwaysCreateIndex", defaultValue = false)
val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true)

// Debug
val alwaysCreateIndex: Boolean = conf.getBoolean("spark.shuffle.s3.alwaysCreateIndex", defaultValue = false)
val useBlockManager: Boolean = conf.getBoolean("spark.shuffle.s3.useBlockManager", defaultValue = true)
val forceBatchFetch: Boolean = conf.getBoolean("spark.shuffle.s3.forceBatchFetch", defaultValue = false)

// Backports
val checksumAlgorithm: String = conf.get("spark.shuffle.checksum.algorithm", defaultValue = "ADLER32")
val checksumEnabled: Boolean = conf.getBoolean("spark.shuffle.checksum.enabled", defaultValue = false)

val appDir = f"/${startTime}-${appId}/"
val fs: FileSystem = FileSystem.get(URI.create(rootDir), {
SparkHadoopUtil.newConfiguration(conf)
})

logInfo(s"- spark.shuffle.checksum.algorithm=${checksumAlgorithm} (backported from Spark 3.2.0)")
logInfo(s"- spark.shuffle.checksum.enabled=${checksumEnabled} (backported from Spark 3.2.0)")
logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir})")
logInfo(s"- spark.shuffle.s3.cleanup=${cleanupShuffleFiles}")
logInfo(s"- spark.shuffle.s3.prefetchBlockSize=${prefetchBatchSize}")
logInfo(s"- spark.shuffle.s3.prefetchThreadPoolSize=${prefetchThreadPoolSize}")
logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir})")
logInfo(s"- spark.shuffle.s3.folderPrefixes=${folderPrefixes}")

// Debug
logInfo(s"- spark.shuffle.s3.alwaysCreateIndex=${alwaysCreateIndex} (default: false)")
logInfo(s"- spark.shuffle.s3.useBlockManager=${useBlockManager} (default: true)")
logInfo(s"- spark.shuffle.s3.forceBatchFetch=${forceBatchFetch} (default: false)")

// Backports
logInfo(s"- spark.shuffle.checksum.algorithm=${checksumAlgorithm} (backported from Spark 3.2.0)")
logInfo(s"- spark.shuffle.checksum.enabled=${checksumEnabled} (backported from Spark 3.2.0)")

def removeRoot(): Boolean = {
Range(0, 10).map(idx => {
Range(0, folderPrefixes).map(idx => {
Future {
fs.delete(new Path(f"${rootDir}/${idx}${appDir}"), true)
}
Expand All @@ -75,12 +83,13 @@ class S3ShuffleDispatcher extends Logging {
case ShuffleIndexBlockId(_, mapId, _) =>
mapId
case _ => 0
}) % 10
}) % folderPrefixes
new Path(f"${rootDir}/${idx}${appDir}/${blockId.name}")
}

def getPath(blockId: ShuffleChecksumBlockId): Path = {
new Path(f"${rootDir}/${blockId.mapId % 10}${appDir}/${blockId.name}")
val idx = blockId.mapId % folderPrefixes
new Path(f"${rootDir}/${idx}${appDir}/${blockId.name}")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.spark.shuffle.ConcurrentObjectMap
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage.{BlockId, ShuffleIndexBlockId}

import java.io.{BufferedInputStream, BufferedOutputStream}
import java.io.{BufferedInputStream, BufferedOutputStream, IOException}
import java.nio.ByteBuffer
import java.util
import java.util.zip.{Adler32, CRC32, Checksum}
Expand Down Expand Up @@ -92,12 +92,16 @@ object S3ShuffleHelper extends Logging {
regex.matcher(path.getName).matches()
}
}
Range(0, 10).map(idx => {
Range(0, dispatcher.folderPrefixes).map(idx => {
Future {
val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}")
dispatcher.fs.listStatus(path, shuffleIndexFilter).map(v => {
BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId]
})
try {
dispatcher.fs.listStatus(path, shuffleIndexFilter).map(v => {
BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId]
})
} catch {
case _: IOException => Array.empty[ShuffleIndexBlockId]
}
}
}).flatMap(Await.result(_, Duration.Inf)).toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper}
import org.apache.spark.storage.S3ShuffleReader

import java.io.IOException
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -153,21 +154,28 @@ private[spark] class S3ShuffleManager(conf: SparkConf) extends ShuffleManager wi
}
}

Range(0, 10).flatMap(idx => {
Range(0, dispatcher.folderPrefixes).flatMap(idx => {
val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}")
dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => {
Future {
dispatcher.fs.delete(f.getPath, false)
try {
dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => {
Future {
dispatcher.fs.delete(f.getPath, false)
}
})
} catch {
case _: IOException => {
logDebug(s"Unable to delete ${path.getName}")
List()
}
})
}
}).foreach(Await.result(_, Duration.Inf))
}
true
}

/** Shut down this ShuffleManager. */
override def stop(): Unit = {
val cleanupRequired = registeredShuffleIds.size > 0
val cleanupRequired = registeredShuffleIds.nonEmpty
registeredShuffleIds.foreach(
shuffleId => {
purgeCaches(shuffleId)
Expand Down

0 comments on commit d440c16

Please sign in to comment.