From ca76991436326464ce3960464bea0cd6688d8fc5 Mon Sep 17 00:00:00 2001 From: stczwd Date: Fri, 28 Jan 2022 16:59:48 +0800 Subject: [PATCH] support test with local --- .../apache/spark/util/ExecutorManager.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala index 4b40519a8..e15a88443 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala @@ -16,21 +16,19 @@ */ package org.apache.spark.util -import org.apache.spark.{SparkConf, SparkContext, SparkEnv} -import java.io.File -import java.nio.file.Files -import java.nio.file.LinkOption -import java.nio.file.Path -import java.nio.file.Paths -import java.io.{InputStreamReader, BufferedReader} -import scala.collection.mutable.ListBuffer import java.lang.management.ManagementFactory + +import scala.util.Random + import com.intel.oap._ -object ExecutorManager { +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.internal.Logging + +object ExecutorManager extends Logging { def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds var isTaskSet: Boolean = false - def tryTaskSet(numaInfo: GazelleNumaBindingInfo) = synchronized { + def tryTaskSet(numaInfo: GazelleNumaBindingInfo): Unit = synchronized { if (numaInfo.enableNumaBinding && !isTaskSet) { val cmd_output = Utils.executeAndGetOutput( @@ -41,19 +39,21 @@ object ExecutorManager { tmp.toList.distinct } val executorId = SparkEnv.get.executorId - val numCorePerExecutor = numaInfo.numCoresPerExecutor val coreRange = numaInfo.totalCoreRange - val shouldBindNumaIdx = executorIdOnLocalNode.indexOf(executorId) % coreRange.size - //val coreStartIdx = coreRange(shouldBindNumaIdx)._1 - //val coreEndIdx = coreRange(shouldBindNumaIdx)._2 - System.out.println( - s"executorId is ${executorId}, executorIdOnLocalNode is ${executorIdOnLocalNode}") + val shouldBindNumaIdx = if (executorIdOnLocalNode.isEmpty) { + // support run with out yarn, such as local + Random.nextInt(coreRange.length - 1) + } else { + executorIdOnLocalNode.indexOf(executorId) % coreRange.length + } + logInfo(s"executorId is $executorId, executorIdOnLocalNode is $executorIdOnLocalNode") val taskSetCmd = s"taskset -cpa ${coreRange(shouldBindNumaIdx)} ${getProcessId()}" - System.out.println(taskSetCmd) + logInfo(s"taskSetCmd is $taskSetCmd") isTaskSet = true Utils.executeCommand(Seq("bash", "-c", taskSetCmd)) } } + def getProcessId(): Int = { val runtimeMXBean = ManagementFactory.getRuntimeMXBean() runtimeMXBean.getName().split("@")(0).toInt