Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
support test with local
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylee-ch committed Jan 28, 2022
1 parent 35caf0a commit ca76991
Showing 1 changed file with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
*/
package org.apache.spark.util

import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import java.io.File
import java.nio.file.Files
import java.nio.file.LinkOption
import java.nio.file.Path
import java.nio.file.Paths
import java.io.{InputStreamReader, BufferedReader}
import scala.collection.mutable.ListBuffer
import java.lang.management.ManagementFactory

import scala.util.Random

import com.intel.oap._

object ExecutorManager {
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.internal.Logging

object ExecutorManager extends Logging {
def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds
var isTaskSet: Boolean = false
def tryTaskSet(numaInfo: GazelleNumaBindingInfo) = synchronized {
def tryTaskSet(numaInfo: GazelleNumaBindingInfo): Unit = synchronized {
if (numaInfo.enableNumaBinding && !isTaskSet) {
val cmd_output =
Utils.executeAndGetOutput(
Expand All @@ -41,19 +39,21 @@ object ExecutorManager {
tmp.toList.distinct
}
val executorId = SparkEnv.get.executorId
val numCorePerExecutor = numaInfo.numCoresPerExecutor
val coreRange = numaInfo.totalCoreRange
val shouldBindNumaIdx = executorIdOnLocalNode.indexOf(executorId) % coreRange.size
//val coreStartIdx = coreRange(shouldBindNumaIdx)._1
//val coreEndIdx = coreRange(shouldBindNumaIdx)._2
System.out.println(
s"executorId is ${executorId}, executorIdOnLocalNode is ${executorIdOnLocalNode}")
val shouldBindNumaIdx = if (executorIdOnLocalNode.isEmpty) {
// support run with out yarn, such as local
Random.nextInt(coreRange.length - 1)
} else {
executorIdOnLocalNode.indexOf(executorId) % coreRange.length
}
logInfo(s"executorId is $executorId, executorIdOnLocalNode is $executorIdOnLocalNode")
val taskSetCmd = s"taskset -cpa ${coreRange(shouldBindNumaIdx)} ${getProcessId()}"
System.out.println(taskSetCmd)
logInfo(s"taskSetCmd is $taskSetCmd")
isTaskSet = true
Utils.executeCommand(Seq("bash", "-c", taskSetCmd))
}
}

def getProcessId(): Int = {
val runtimeMXBean = ManagementFactory.getRuntimeMXBean()
runtimeMXBean.getName().split("@")(0).toInt
Expand Down

0 comments on commit ca76991

Please sign in to comment.