diff --git a/README.md b/README.md index 502a0eb..14b06b9 100644 --- a/README.md +++ b/README.md @@ -26,33 +26,39 @@ These configuration values need to be passed to Spark to load and configure the - `s3a://zrlio-tmp/` (Hadoop-AWS + AWS-SDK) - `cos://zrlio-tmp.resources/` (Hadoop-Cloud + Stocator) - Individual blocks are hashed in order to get improved performance when accessing them on the remote filesystem. - The generated paths look like this: `${rootDir}/${mapId % 10}/${appDir}/ShuffleBlock{.data / .index}` + Individual blocks are prefixed in order to get improved performance when accessing them on the remote filesystem. + The generated paths look like this: `${rootDir}/${mapId % 10}/${appDir}/ShuffleBlock{.data / .index}`. + + The number of prefixes can be controlled with the option `spark.shuffle.s3.folderPrefixes`. ### Features Changing these values might have an impact on performance. +- `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`) +- `spark.shuffle.s3.folderPrefixes`: The number of prefixes to use when storing files on S3 + (default: `10`, minimum: `1`). + + **Note**: This option can be used to optimize performance on object stores which have a prefix ratelimit. +- `spark.shuffle.s3.prefetchBatchSize`: Prefetch batch size (default: `25`). Controls how many partitions are prefetched + concurrently per task. +- `spark.shuffle.s3.prefetchThreadPoolSize`: Prefetch thread pool size (default: `100`). The total size of the thread + pool used for prefetching the shuffle blocks. - `spark.shuffle.checksum.algorithm`: Checksum algorithm (default: `ADLER32`, supported: `ADLER32`, `CRC32`), backport from Spark 3.2.0. - `spark.shuffle.checksum.enabled`: Enables checksums on Shuffle files (default: `false`), backport from Spark 3.2.0. **Note**: Creates additional overhead if active. -- `spark.shuffle.s3.alwaysCreateIndex`: Always create an index file, even if all partitions have empty length ( - default: `false`) - **Note**: Creates additional overhead if active. +### Debug configuration options -- `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`) -- `spark.shuffle.s3.prefetchBatchSize`: Prefetch batch size (default: `25`). Controls how many partitions are prefetched - concurrently per task. -- `spark.shuffle.s3.prefetchThreadPoolSize`: Prefetch thread pool size (default: `100`). The total size of the thread - pool used for prefetching the shuffle blocks. +Configuration options used for debugging: -### Unsafe configuration options +- `spark.shuffle.s3.alwaysCreateIndex`: Always create an index file, even if all partitions have empty length ( + default: `false`) -Unsafe configurations options: + **Note**: Creates additional overhead if active. - `spark.shuffle.s3.useBlockManager`: Use the Spark block manager to compute blocks (default: `true`). diff --git a/examples/sql/run_benchmark.sh b/examples/sql/run_benchmark.sh index a41d34c..0bc2a2c 100755 --- a/examples/sql/run_benchmark.sh +++ b/examples/sql/run_benchmark.sh @@ -49,7 +49,7 @@ SPARK_S3_SHUFFLE_CONFIG=( --conf spark.hadoop.fs.s3a.access.key=${S3A_ACCESS_KEY} --conf spark.hadoop.fs.s3a.secret.key=${S3A_SECRET_KEY} --conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT} - --conf spark.shuffle.s3.useBlockManager=${USE_BLOCK_MANAGER:-false} + --conf spark.shuffle.s3.useBlockManager=${USE_BLOCK_MANAGER:-true} --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager" --conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO --conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED} diff --git a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala index 175ffd8..fef062a 100644 --- a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala +++ b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala @@ -26,39 +26,47 @@ class S3ShuffleDispatcher extends Logging { val appId: String = conf.getAppId val startTime: String = conf.get("spark.app.startTime") + // Required val rootDir = conf.get("spark.shuffle.s3.rootDir", defaultValue = "sparkS3shuffle") private val isCOS = rootDir.startsWith("cos://") private val isS3A = rootDir.startsWith("s3a://") - val checksumAlgorithm: String = conf.get("spark.shuffle.checksum.algorithm", defaultValue = "ADLER32") - val checksumEnabled: Boolean = conf.getBoolean("spark.shuffle.checksum.enabled", defaultValue = false) + // Optional + val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true) + val folderPrefixes: Int = conf.getInt("spark.shuffle.s3.folderPrefixes", defaultValue = 10) val prefetchBatchSize: Int = conf.getInt("spark.shuffle.s3.prefetchBatchSize", defaultValue = 25) val prefetchThreadPoolSize: Int = conf.getInt("spark.shuffle.s3.prefetchThreadPoolSize", defaultValue = 100) - val alwaysCreateIndex: Boolean = conf.getBoolean("spark.shuffle.s3.alwaysCreateIndex", defaultValue = false) - val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true) + // Debug + val alwaysCreateIndex: Boolean = conf.getBoolean("spark.shuffle.s3.alwaysCreateIndex", defaultValue = false) val useBlockManager: Boolean = conf.getBoolean("spark.shuffle.s3.useBlockManager", defaultValue = true) val forceBatchFetch: Boolean = conf.getBoolean("spark.shuffle.s3.forceBatchFetch", defaultValue = false) + // Backports + val checksumAlgorithm: String = conf.get("spark.shuffle.checksum.algorithm", defaultValue = "ADLER32") + val checksumEnabled: Boolean = conf.getBoolean("spark.shuffle.checksum.enabled", defaultValue = false) + val appDir = f"/${startTime}-${appId}/" val fs: FileSystem = FileSystem.get(URI.create(rootDir), { SparkHadoopUtil.newConfiguration(conf) }) - logInfo(s"- spark.shuffle.checksum.algorithm=${checksumAlgorithm} (backported from Spark 3.2.0)") - logInfo(s"- spark.shuffle.checksum.enabled=${checksumEnabled} (backported from Spark 3.2.0)") + logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir})") logInfo(s"- spark.shuffle.s3.cleanup=${cleanupShuffleFiles}") logInfo(s"- spark.shuffle.s3.prefetchBlockSize=${prefetchBatchSize}") logInfo(s"- spark.shuffle.s3.prefetchThreadPoolSize=${prefetchThreadPoolSize}") - logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir})") + logInfo(s"- spark.shuffle.s3.folderPrefixes=${folderPrefixes}") + // Debug logInfo(s"- spark.shuffle.s3.alwaysCreateIndex=${alwaysCreateIndex} (default: false)") logInfo(s"- spark.shuffle.s3.useBlockManager=${useBlockManager} (default: true)") logInfo(s"- spark.shuffle.s3.forceBatchFetch=${forceBatchFetch} (default: false)") - + // Backports + logInfo(s"- spark.shuffle.checksum.algorithm=${checksumAlgorithm} (backported from Spark 3.2.0)") + logInfo(s"- spark.shuffle.checksum.enabled=${checksumEnabled} (backported from Spark 3.2.0)") def removeRoot(): Boolean = { - Range(0, 10).map(idx => { + Range(0, folderPrefixes).map(idx => { Future { fs.delete(new Path(f"${rootDir}/${idx}${appDir}"), true) } @@ -75,12 +83,13 @@ class S3ShuffleDispatcher extends Logging { case ShuffleIndexBlockId(_, mapId, _) => mapId case _ => 0 - }) % 10 + }) % folderPrefixes new Path(f"${rootDir}/${idx}${appDir}/${blockId.name}") } def getPath(blockId: ShuffleChecksumBlockId): Path = { - new Path(f"${rootDir}/${blockId.mapId % 10}${appDir}/${blockId.name}") + val idx = blockId.mapId % folderPrefixes + new Path(f"${rootDir}/${idx}${appDir}/${blockId.name}") } /** diff --git a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala index dc98b62..5b0deb0 100644 --- a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala +++ b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala @@ -7,7 +7,7 @@ import org.apache.spark.shuffle.ConcurrentObjectMap import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage.{BlockId, ShuffleIndexBlockId} -import java.io.{BufferedInputStream, BufferedOutputStream} +import java.io.{BufferedInputStream, BufferedOutputStream, IOException} import java.nio.ByteBuffer import java.util import java.util.zip.{Adler32, CRC32, Checksum} @@ -92,12 +92,16 @@ object S3ShuffleHelper extends Logging { regex.matcher(path.getName).matches() } } - Range(0, 10).map(idx => { + Range(0, dispatcher.folderPrefixes).map(idx => { Future { val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}") - dispatcher.fs.listStatus(path, shuffleIndexFilter).map(v => { - BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId] - }) + try { + dispatcher.fs.listStatus(path, shuffleIndexFilter).map(v => { + BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId] + }) + } catch { + case _: IOException => Array.empty[ShuffleIndexBlockId] + } } }).flatMap(Await.result(_, Duration.Inf)).toArray } diff --git a/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala b/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala index afefdb5..82be178 100644 --- a/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala +++ b/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala @@ -30,6 +30,7 @@ import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper} import org.apache.spark.storage.S3ShuffleReader +import java.io.IOException import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.ExecutionContext.Implicits.global @@ -153,13 +154,20 @@ private[spark] class S3ShuffleManager(conf: SparkConf) extends ShuffleManager wi } } - Range(0, 10).flatMap(idx => { + Range(0, dispatcher.folderPrefixes).flatMap(idx => { val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}") - dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => { - Future { - dispatcher.fs.delete(f.getPath, false) + try { + dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => { + Future { + dispatcher.fs.delete(f.getPath, false) + } + }) + } catch { + case _: IOException => { + logDebug(s"Unable to delete ${path.getName}") + List() } - }) + } }).foreach(Await.result(_, Duration.Inf)) } true @@ -167,7 +175,7 @@ private[spark] class S3ShuffleManager(conf: SparkConf) extends ShuffleManager wi /** Shut down this ShuffleManager. */ override def stop(): Unit = { - val cleanupRequired = registeredShuffleIds.size > 0 + val cleanupRequired = registeredShuffleIds.nonEmpty registeredShuffleIds.foreach( shuffleId => { purgeCaches(shuffleId)