Skip to content

Commit

Permalink
KE-17215 clean SQLExecution broadcast
Browse files Browse the repository at this point in the history
cherry pick from Kyligence@8963cb1
  • Loading branch information
wuzhim committed Sep 17, 2020
1 parent 6649706 commit 56eb839
Show file tree
Hide file tree
Showing 44 changed files with 150 additions and 77 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
67 changes: 43 additions & 24 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.lang.ref.{ReferenceQueue, WeakReference}
import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ExecutorService, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters._

Expand All @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils}


/**
* Classes that represent cleaning tasks.
*/
Expand Down Expand Up @@ -112,6 +113,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking.shuffle", false)

/**
* The cleaning thread size.
*/
private val cleanupTaskThreads = sc.conf.getInt(
"spark.cleaner.referenceTracking.cleanupThreadNumber", 100)

private val cleanupExecutorPool: ExecutorService =
ThreadUtils.newDaemonFixedThreadPool(cleanupTaskThreads, "cleanup")

@volatile private var stopped = false

/** Attach a listener object to get information of when objects are cleaned. */
Expand Down Expand Up @@ -178,32 +188,41 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference]).foreach {
r =>
referenceBuffer.remove(r)
runtCleanTask(r)
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning main thread", e)
}
}
}

private def runtCleanTask(ref: CleanupTaskWeakReference) = {
cleanupExecutorPool.submit(new Runnable {
override def run(): Unit = {
try {
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
})
}

/** Perform RDD cleanup. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ private[spark] object MapOutputTracker extends Logging {
if (arr.length >= minBroadcastSize) {
// Use broadcast instead.
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
val bcast = broadcastManager.newBroadcast(arr, isLocal)
val bcast = broadcastManager.newBroadcast(arr, isLocal, null)
// toByteArray creates copy, so we can reuse out
out.reset()
out.write(BROADCAST)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1485,10 +1485,13 @@ class SparkContext(config: SparkConf) extends Logging {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val executionId = getLocalProperty("spark.sql.execution.id")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal, executionId)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
if (executionId == null) {
cleaner.foreach(_.registerBroadcastForCleanup(bc))
}
bc
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.spark.broadcast

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag

import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging


private[spark] class BroadcastManager(
val isDriver: Boolean,
conf: SparkConf,
Expand All @@ -32,6 +37,7 @@ private[spark] class BroadcastManager(

private var initialized = false
private var broadcastFactory: BroadcastFactory = null
var cachedBroadcast = new ConcurrentHashMap[String, ListBuffer[Long]]()

initialize()

Expand All @@ -52,8 +58,27 @@ private[spark] class BroadcastManager(

private val nextBroadcastId = new AtomicLong(0)

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
private[spark] def currentBroadcastId: Long = nextBroadcastId.get()

def cleanBroadCast(executionId: String): Unit = {
if (cachedBroadcast.containsKey(executionId)) {
cachedBroadcast.get(executionId).foreach(broadcastId => unbroadcast(broadcastId, true, false))
cachedBroadcast.remove(executionId)
}
}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, executionId: String): Broadcast[T] = {
val broadcastId = nextBroadcastId.getAndIncrement()
if (executionId != null) {
if (cachedBroadcast.containsKey(executionId)) {
cachedBroadcast.get(executionId) += broadcastId
} else {
val list = new scala.collection.mutable.ListBuffer[Long]
list += broadcastId
cachedBroadcast.put(executionId, list)
}
}
broadcastFactory.newBroadcast[T](value_, isLocal, broadcastId)
}

def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.rdd.RDD
private[spark] class ResultTask[T, U](
stageId: Int,
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
val taskBinary: Broadcast[Array[Byte]],
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.shuffle.ShuffleWriter
private[spark] class ShuffleMapTask(
stageId: Int,
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
val taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
localProperties: Properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,13 @@ private[spark] class TaskSetManager(
private def maybeFinishTaskSet() {
if (isZombie && runningTasks == 0) {
sched.taskSetFinished(this)
val broadcastId = taskSet.tasks.head match {
case resultTask: ResultTask[Any, Any] =>
resultTask.taskBinary.id
case shuffleMapTask: ShuffleMapTask =>
shuffleMapTask.taskBinary.id
}
SparkEnv.get.broadcastManager.unbroadcast(broadcastId, true, false)
if (tasksSuccessful == numTasks) {
blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(
taskSet.stageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.storage

import java.io.IOException
import java.util.{HashMap => JHashMap}

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random

Expand Down Expand Up @@ -197,11 +198,17 @@ class BlockManagerMasterEndpoint(
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
// The dispatcher is used as an implicit argument into the Future sequence construction.
val removeMsg = RemoveRdd(rddId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)

val futures = blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
e)
0 // zero blocks were removed
}
}.toSeq

Future.sequence(futures)
}

private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
Expand All @@ -224,11 +231,21 @@ class BlockManagerMasterEndpoint(
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
val futures = requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
s"${bm.blockManagerId}", e)
0 // zero blocks were removed
}
}.toSeq
val blocksToRemove = blockLocations.keySet().asScala
.collect {
case broadcastId@BroadcastBlockId(`broadcastId`, _) =>
broadcastId
}
blocksToRemove.foreach(blockLocations.remove)
Future.sequence(futures)
}

private def removeBlockManager(blockManagerId: BlockManagerId) {
Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/flume-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/flume/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.2.1-kylin-r14</version>
<version>2.2.1-kylin-r15</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Loading

0 comments on commit 56eb839

Please sign in to comment.