Skip to content

Commit

Permalink
Add in more explicit synchronized calls to go along with GuardedBy to
Browse files Browse the repository at this point in the history
make things more readable. This doesn't change the actual locking
because all of these places were already synchronized by the calling
functions.
  • Loading branch information
tgravescs committed Feb 27, 2020
1 parent f9c1a05 commit 9e79f1a
Showing 1 changed file with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ private[yarn] class YarnAllocator(
*
* @see SPARK-12864
*/
@GuardedBy("this")
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)

Expand Down Expand Up @@ -224,7 +225,7 @@ private[yarn] class YarnAllocator(
numLocalityAwareTasksPerResourceProfileId.values.sum
}

def getNumExecutorsStarting: Int = {
def getNumExecutorsStarting: Int = synchronized {
numExecutorsStartingPerResourceProfileId.values.map(_.get()).sum
}

Expand Down Expand Up @@ -263,20 +264,20 @@ private[yarn] class YarnAllocator(
}

private def getOrUpdateAllocatedHostToContainersMapForRPId(
rpId: Int): HashMap[String, collection.mutable.Set[ContainerId]] = {
rpId: Int): HashMap[String, collection.mutable.Set[ContainerId]] = synchronized {
allocatedHostToContainersMapPerRPId.getOrElseUpdate(rpId,
new HashMap[String, mutable.Set[ContainerId]]())
}

private def getOrUpdateRunningExecutorForRPId(rpId: Int): mutable.Set[String] = {
private def getOrUpdateRunningExecutorForRPId(rpId: Int): mutable.Set[String] = synchronized {
runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[String]())
}

private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger = {
private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger = synchronized {
numExecutorsStartingPerResourceProfileId.getOrElseUpdate(rpId, new AtomicInteger(0))
}

private def getOrUpdateTargetNumExecutorsForRPId(rpId: Int): Int = {
private def getOrUpdateTargetNumExecutorsForRPId(rpId: Int): Int = synchronized {
targetNumExecutorsPerResourceProfileId.getOrElseUpdate(rpId,
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf))
}
Expand All @@ -285,7 +286,8 @@ private[yarn] class YarnAllocator(
* A sequence of pending container requests at the given location for each ResourceProfile id
* that have not yet been fulfilled.
*/
private def getPendingAtLocation(location: String): Map[Int, Seq[ContainerRequest]] = {
private def getPendingAtLocation(
location: String): Map[Int, Seq[ContainerRequest]] = synchronized {
val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]]
rpIdToYarnResource.map { case (id, profResource) =>
val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
Expand All @@ -297,7 +299,7 @@ private[yarn] class YarnAllocator(

// if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it
private def createYarnResourceForResourceProfile(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = synchronized {
resourceProfileToTotalExecs.foreach { case (rp, num) =>
if (!rpIdToYarnResource.contains(rp.id)) {
// Start with the application or default settings
Expand Down Expand Up @@ -439,7 +441,7 @@ private[yarn] class YarnAllocator(
*
* Visible for testing.
*/
def updateResourceRequests(): Unit = {
def updateResourceRequests(): Unit = synchronized {
val pendingAllocatePerResourceProfileId = getPendingAllocate
val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) =>
val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get
Expand Down Expand Up @@ -663,7 +665,7 @@ private[yarn] class YarnAllocator(
allocatedContainer: Container,
location: String,
containersToUse: ArrayBuffer[Container],
remaining: ArrayBuffer[Container]): Unit = {
remaining: ArrayBuffer[Container]): Unit = synchronized {
val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
// SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the
// request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested
Expand Down Expand Up @@ -693,7 +695,7 @@ private[yarn] class YarnAllocator(
/**
* Launches executors in the allocated containers.
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
for (container <- containersToUse) {
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
Expand Down Expand Up @@ -769,7 +771,8 @@ private[yarn] class YarnAllocator(
}

// Visible for testing.
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
private[yarn] def processCompletedContainers(
completedContainers: Seq[ContainerStatus]): Unit = synchronized {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
val (_, rpId) = containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId,
Expand Down Expand Up @@ -913,7 +916,7 @@ private[yarn] class YarnAllocator(
}
}

private def internalReleaseContainer(container: Container): Unit = {
private def internalReleaseContainer(container: Container): Unit = synchronized {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
Expand Down

0 comments on commit 9e79f1a

Please sign in to comment.