Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-725] change the code style for ExecutorManger (#724)
Browse files Browse the repository at this point in the history
* support test with local

* change code style
  • Loading branch information
jackylee-ch authored Mar 9, 2022
1 parent fd92182 commit 34ea83e
Showing 1 changed file with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
*/
package org.apache.spark.util

import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import java.io.File
import java.nio.file.Files
import java.nio.file.LinkOption
import java.nio.file.Path
import java.nio.file.Paths
import java.io.{InputStreamReader, BufferedReader}
import scala.collection.mutable.ListBuffer
import java.lang.management.ManagementFactory

import scala.util.Random

import com.intel.oap._

object ExecutorManager {
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.internal.Logging

object ExecutorManager extends Logging {
def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds
var isTaskSet: Boolean = false
def tryTaskSet(numaInfo: GazelleNumaBindingInfo) = synchronized {
def tryTaskSet(numaInfo: GazelleNumaBindingInfo): Unit = synchronized {
if (numaInfo.enableNumaBinding && !isTaskSet) {
val cmd_output =
Utils.executeAndGetOutput(
Expand All @@ -41,19 +39,16 @@ object ExecutorManager {
tmp.toList.distinct
}
val executorId = SparkEnv.get.executorId
val numCorePerExecutor = numaInfo.numCoresPerExecutor
val coreRange = numaInfo.totalCoreRange
val shouldBindNumaIdx = executorIdOnLocalNode.indexOf(executorId) % coreRange.size
//val coreStartIdx = coreRange(shouldBindNumaIdx)._1
//val coreEndIdx = coreRange(shouldBindNumaIdx)._2
System.out.println(
s"executorId is ${executorId}, executorIdOnLocalNode is ${executorIdOnLocalNode}")
logInfo(s"executorId is $executorId, executorIdOnLocalNode is $executorIdOnLocalNode")
val taskSetCmd = s"taskset -cpa ${coreRange(shouldBindNumaIdx)} ${getProcessId()}"
System.out.println(taskSetCmd)
logInfo(s"taskSetCmd is $taskSetCmd")
isTaskSet = true
Utils.executeCommand(Seq("bash", "-c", taskSetCmd))
}
}

def getProcessId(): Int = {
val runtimeMXBean = ManagementFactory.getRuntimeMXBean()
runtimeMXBean.getName().split("@")(0).toInt
Expand Down

0 comments on commit 34ea83e

Please sign in to comment.