From 2a5eab505e9621b578da6a5e56e9b9024fb330b4 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 16 Feb 2017 17:38:33 -0800 Subject: [PATCH] [SC-5937][ES-2485] Use parallel file reads in directory commit read protocol ## What changes were proposed in this pull request? This avoids being latency bound on file reads. Initial testing shows this to be at least 5x faster than without the thread pool. ## How was this patch tested? Existing tests; perf test on dogfood. Author: Eric Liang Closes #232 from ericl/sc-5937. --- .../DatabricksAtomicCommitProtocol.scala | 8 +---- .../DatabricksAtomicReadProtocol.scala | 29 +++++++++++++++++-- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala index db1bb6ff5acc4..3aa73fdd59479 100644 --- a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala +++ b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala @@ -24,7 +24,6 @@ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession -import org.apache.spark.util.ThreadUtils /** * File commit protocol optimized for cloud storage. Files are written directly to their final @@ -153,7 +152,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String) // Commit each updated directory in parallel. val dirs = (addedByDir.keys ++ removedByDir.keys).toSet.par - dirs.tasksupport = DatabricksAtomicCommitProtocol.tasksupport + dirs.tasksupport = tasksupport dirs.foreach { dir => val commitMarker = new Path(dir, s"_committed_$txnId") val output = fs.create(commitMarker) @@ -207,11 +206,6 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String) object DatabricksAtomicCommitProtocol extends Logging { import DatabricksAtomicReadProtocol._ - import scala.collection.parallel.ThreadPoolTaskSupport - - private lazy val tasksupport = new ThreadPoolTaskSupport( - ThreadUtils.newDaemonCachedThreadPool("db-atomic-commit-worker", 100)) - /** * Traverses the given directories and cleans up uncommitted or garbage files and markers. A * horizon may be specified beyond which we assume pending jobs have failed. Files written by diff --git a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala index 163c58050a46c..4886b666f2c6a 100644 --- a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala +++ b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala @@ -12,6 +12,7 @@ import java.io.{File, FileNotFoundException, InputStream, InputStreamReader, IOE import java.nio.charset.StandardCharsets import scala.collection.mutable +import scala.util.Try import scala.util.control.NonFatal import org.apache.hadoop.fs._ @@ -21,7 +22,7 @@ import org.json4s.jackson.Serialization import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** * Read-side support for DatabricksAtomicCommitProtocol. @@ -29,6 +30,11 @@ import org.apache.spark.util.{Clock, SystemClock} object DatabricksAtomicReadProtocol extends Logging { type TxnId = String + import scala.collection.parallel.ThreadPoolTaskSupport + + lazy val tasksupport = new ThreadPoolTaskSupport( + ThreadUtils.newDaemonCachedThreadPool("db-atomic-commit-worker", 100)) + val STARTED_MARKER = "_started_(.*)".r val COMMITTED_MARKER = "_committed_(.*)".r val FILE_WITH_TXN_ID = "[^_].*-tid-([a-z0-9]+)-.*".r @@ -218,6 +224,24 @@ object DatabricksAtomicReadProtocol extends Logging { val corruptCommitMarkers = mutable.Set[TxnId]() val deletedFiles = mutable.Map[String, Long]() + // Retrieve all file contents in parallel to hide the IO latency. + val fileContents: Map[TxnId, Try[(Seq[String], Seq[String])]] = { + val pcol = filesAndMarkers.par + pcol.tasksupport = tasksupport + pcol.flatMap { stat => + stat.getPath.getName match { + // We ignore zero-length commit markers (this is a commonly observed symptom of DBFS + // cancellation bugs in practice). + case COMMITTED_MARKER(txnId) if stat.getLen > 0 => + val commitFile = new Path(dir, "_committed_" + txnId) + val result = Try(deserializeFileChanges(fs.open(commitFile))) + Some((txnId, result)) + case _ => + None + } + }.toList.toMap + } + filesAndMarkers.foreach { stat => if (stat.getModificationTime > lastModified) { lastModified = stat.getModificationTime @@ -226,9 +250,8 @@ object DatabricksAtomicReadProtocol extends Logging { // We ignore zero-length commit markers (this is a commonly observed symptom of DBFS // cancellation bugs in practice). case COMMITTED_MARKER(txnId) if stat.getLen > 0 => - val commitFile = new Path(dir, "_committed_" + txnId) try { - val (filesAdded, filesRemoved) = deserializeFileChanges(fs.open(commitFile)) + val (filesAdded, filesRemoved) = fileContents(txnId).get filesRemoved.foreach { file => assert(stat.getModificationTime > 0) deletedFiles(file) = stat.getModificationTime