Skip to content

Commit

Permalink
add test to ensure we don't cleanup files when config=false.
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangxb1987 committed May 24, 2018
1 parent 0df8e4e commit 4a4ab59
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.text.SimpleDateFormat
import java.util.{Date, Locale, UUID}
import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import java.util.function.Supplier

import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
Expand All @@ -49,7 +50,8 @@ private[deploy] class Worker(
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
val securityMgr: SecurityManager,
externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
extends ThreadSafeRpcEndpoint with Logging {

private val host = rpcEnv.address.host
Expand Down Expand Up @@ -97,9 +99,9 @@ private[deploy] class Worker(
private val APP_DATA_RETENTION_SECONDS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

// Whether or not cleanup the non-shuffle files on executor finishes.
// Whether or not cleanup the non-shuffle files on executor death.
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
conf.getBoolean("spark.worker.cleanup.nonShuffleFiles.enabled", true)
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorDeath", true)

private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None
Expand Down Expand Up @@ -146,7 +148,11 @@ private[deploy] class Worker(
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)

// The shuffle service is not actually started unless configured.
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val shuffleService = if (externalShuffleServiceSupplier != null) {
externalShuffleServiceSupplier.get()
} else {
new ExternalShuffleService(conf, securityMgr)
}

private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

package org.apache.spark.deploy.worker

import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier

import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
Expand All @@ -29,22 +38,30 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {

import org.apache.spark.deploy.DeployTestUtils._

@Mock(answer = RETURNS_SMART_NULLS) private var shuffleService: ExternalShuffleService = _

def cmd(javaOpts: String*): Command = {
Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*))
}
def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)

private var _worker: Worker = _

private def makeWorker(conf: SparkConf): Worker = {
private def makeWorker(
conf: SparkConf,
shuffleServiceSupplier: Supplier[ExternalShuffleService] = null): Worker = {
assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, securityMgr)
"Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
_worker
}

before {
MockitoAnnotations.initMocks(this)
}

after {
if (_worker != null) {
_worker.rpcEnv.shutdown()
Expand Down Expand Up @@ -194,4 +211,36 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
assert(worker.finishedDrivers.size === expectedValue)
}
}

test("cleanup non-shuffle files after executor death when config " +
"spark.storage.cleanupFilesAfterExecutorDeath=true") {
testCleanupFilesWithConfig(true)
}

test("don't cleanup non-shuffle files after executor death when config " +
"spark.storage.cleanupFilesAfterExecutorDeath=false") {
testCleanupFilesWithConfig(false)
}

private def testCleanupFilesWithConfig(value: Boolean) = {
val conf = new SparkConf().set("spark.storage.cleanupFilesAfterExecutorDeath", value.toString)

val cleanupCalled = new AtomicBoolean(false)
when(shuffleService.executorRemoved(any[String], any[String])).thenAnswer(new Answer[Unit] {
override def answer(invocations: InvocationOnMock): Unit = {
cleanupCalled.set(true)
}
})
val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
override def get: ExternalShuffleService = shuffleService
}
val worker = makeWorker(conf, externalShuffleServiceSupplier)
// initialize workers
for (i <- 0 until 10) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
}
worker.handleExecutorStateChanged(
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(cleanupCalled.get() == value)
}
}
2 changes: 1 addition & 1 deletion docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ SPARK_WORKER_OPTS supports the following system properties:
</td>
</tr>
<tr>
<td><code>spark.worker.cleanup.nonShuffleFiles.enabled</code></td>
<td><code>spark.storage.cleanupFilesAfterExecutorDeath</code></td>
<td>true</td>
<td>
Enable cleanup non-shuffle files(such as temp. shuffle blocks, cached RDD/broadcast blocks,
Expand Down

0 comments on commit 4a4ab59

Please sign in to comment.