Skip to content

Commit

Permalink
[SC-5937][ES-2485] Use parallel file reads in directory commit read p…
Browse files Browse the repository at this point in the history
…rotocol

## What changes were proposed in this pull request?

This avoids being latency bound on file reads. Initial testing shows this to be at least 5x faster than without the thread pool.

## How was this patch tested?

Existing tests; perf test on dogfood.

Author: Eric Liang <[email protected]>

Closes apache#232 from ericl/sc-5937.
  • Loading branch information
ericl committed Feb 17, 2017
1 parent fa4fca9 commit 2a5eab5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.ThreadUtils

/**
* File commit protocol optimized for cloud storage. Files are written directly to their final
Expand Down Expand Up @@ -153,7 +152,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)

// Commit each updated directory in parallel.
val dirs = (addedByDir.keys ++ removedByDir.keys).toSet.par
dirs.tasksupport = DatabricksAtomicCommitProtocol.tasksupport
dirs.tasksupport = tasksupport
dirs.foreach { dir =>
val commitMarker = new Path(dir, s"_committed_$txnId")
val output = fs.create(commitMarker)
Expand Down Expand Up @@ -207,11 +206,6 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
object DatabricksAtomicCommitProtocol extends Logging {
import DatabricksAtomicReadProtocol._

import scala.collection.parallel.ThreadPoolTaskSupport

private lazy val tasksupport = new ThreadPoolTaskSupport(
ThreadUtils.newDaemonCachedThreadPool("db-atomic-commit-worker", 100))

/**
* Traverses the given directories and cleans up uncommitted or garbage files and markers. A
* horizon may be specified beyond which we assume pending jobs have failed. Files written by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import java.io.{File, FileNotFoundException, InputStream, InputStreamReader, IOE
import java.nio.charset.StandardCharsets

import scala.collection.mutable
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.fs._
Expand All @@ -21,14 +22,19 @@ import org.json4s.jackson.Serialization

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.util.{Clock, SystemClock}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}

/**
* Read-side support for DatabricksAtomicCommitProtocol.
*/
object DatabricksAtomicReadProtocol extends Logging {
type TxnId = String

import scala.collection.parallel.ThreadPoolTaskSupport

lazy val tasksupport = new ThreadPoolTaskSupport(
ThreadUtils.newDaemonCachedThreadPool("db-atomic-commit-worker", 100))

val STARTED_MARKER = "_started_(.*)".r
val COMMITTED_MARKER = "_committed_(.*)".r
val FILE_WITH_TXN_ID = "[^_].*-tid-([a-z0-9]+)-.*".r
Expand Down Expand Up @@ -218,6 +224,24 @@ object DatabricksAtomicReadProtocol extends Logging {
val corruptCommitMarkers = mutable.Set[TxnId]()
val deletedFiles = mutable.Map[String, Long]()

// Retrieve all file contents in parallel to hide the IO latency.
val fileContents: Map[TxnId, Try[(Seq[String], Seq[String])]] = {
val pcol = filesAndMarkers.par
pcol.tasksupport = tasksupport
pcol.flatMap { stat =>
stat.getPath.getName match {
// We ignore zero-length commit markers (this is a commonly observed symptom of DBFS
// cancellation bugs in practice).
case COMMITTED_MARKER(txnId) if stat.getLen > 0 =>
val commitFile = new Path(dir, "_committed_" + txnId)
val result = Try(deserializeFileChanges(fs.open(commitFile)))
Some((txnId, result))
case _ =>
None
}
}.toList.toMap
}

filesAndMarkers.foreach { stat =>
if (stat.getModificationTime > lastModified) {
lastModified = stat.getModificationTime
Expand All @@ -226,9 +250,8 @@ object DatabricksAtomicReadProtocol extends Logging {
// We ignore zero-length commit markers (this is a commonly observed symptom of DBFS
// cancellation bugs in practice).
case COMMITTED_MARKER(txnId) if stat.getLen > 0 =>
val commitFile = new Path(dir, "_committed_" + txnId)
try {
val (filesAdded, filesRemoved) = deserializeFileChanges(fs.open(commitFile))
val (filesAdded, filesRemoved) = fileContents(txnId).get
filesRemoved.foreach { file =>
assert(stat.getModificationTime > 0)
deletedFiles(file) = stat.getModificationTime
Expand Down

0 comments on commit 2a5eab5

Please sign in to comment.