Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
[SQL-DS-CACHE-190]put plasma detector in seperate object to avoid unn…
Browse files Browse the repository at this point in the history
…ecessary dependency of arrow (#191)
  • Loading branch information
yma11 authored Aug 2, 2021
1 parent 6bc51ce commit f225d0a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.execution.datasources.OapException
import org.apache.spark.sql.execution.datasources.oap.filecache.FiberType.FiberType
import org.apache.spark.sql.execution.datasources.oap.filecache.OapCache.plasmaServerDetect
import org.apache.spark.sql.execution.datasources.oap.utils.PersistentMemoryConfigUtils
import org.apache.spark.sql.internal.oap.OapConf
import org.apache.spark.storage.{BlockManager, TestBlockId}
Expand Down Expand Up @@ -135,7 +134,7 @@ private[sql] object MemoryManager extends Logging {
case "tmp" => new TmpDramMemoryManager(sparkEnv)
case "kmem" => new DaxKmemMemoryManager(sparkEnv)
case "plasma" =>
if (plasmaServerDetect(sparkEnv)) {
if (ExternalCacheDetector.plasmaServerDetect(sparkEnv)) {
new PlasmaMemoryManager(sparkEnv)
} else {
new OffHeapMemoryManager(sparkEnv)
Expand Down Expand Up @@ -164,7 +163,7 @@ private[sql] object MemoryManager extends Logging {
case "noevict" => new HybridMemoryManager(sparkEnv)
case "vmem" => new TmpDramMemoryManager(sparkEnv)
case "external" =>
if (plasmaServerDetect(sparkEnv)) {
if (ExternalCacheDetector.plasmaServerDetect(sparkEnv)) {
new PlasmaMemoryManager(sparkEnv)
} else {
new OffHeapMemoryManager(sparkEnv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,24 +217,6 @@ private[filecache] class CacheGuardian(maxMemory: Long) extends Thread with Logg
}

private[filecache] object OapCache extends Logging {
def plasmaServerDetect(sparkEnv: SparkEnv): Boolean = {
val socket = sparkEnv.conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH)
try {
System.loadLibrary("plasma_java")
} catch {
case e: Exception => logError(s"load plasma jni lib failed " + e.getMessage)
}
var plasmaDetected: Boolean = true;
try {
val conn: plasma.PlasmaClient = new plasma.PlasmaClient(socket, "", 0)
} catch {
case e: PlasmaClientException =>
logWarning("External cache strategy requires plasma-store-server launched, " +
"failed to detect plasma-store-server, will fallback to simpleCache." + e.getMessage)
plasmaDetected = false;
}
plasmaDetected
}
def cacheFallBackDetect(sparkEnv: SparkEnv,
fallBackEnabled: Boolean = true,
fallBackRes: Boolean = true): Boolean = {
Expand Down Expand Up @@ -307,7 +289,7 @@ private[filecache] object OapCache extends Logging {

oapCacheOpt match {
case "external" =>
if (plasmaServerDetect(sparkEnv)) new ExternalCache(fiberType)
if (ExternalCacheDetector.plasmaServerDetect(sparkEnv)) new ExternalCache(fiberType)
else new SimpleOapCache()
case "guava" =>
if (cacheFallBackDetect(sparkEnv, fallBackEnabled.toBoolean, fallBackRes.toBoolean)) {
Expand Down Expand Up @@ -962,6 +944,27 @@ class MixCache(dataCacheMemory: Long,
}
}

object ExternalCacheDetector extends Logging {
def plasmaServerDetect(sparkEnv: SparkEnv): Boolean = {
val socket = sparkEnv.conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH)
try {
System.loadLibrary("plasma_java")
} catch {
case e: Exception => logError(s"load plasma jni lib failed " + e.getMessage)
}
var plasmaDetected: Boolean = true;
try {
val conn: plasma.PlasmaClient = new plasma.PlasmaClient(socket, "", 0)
} catch {
case e: plasma.exceptions.PlasmaClientException =>
logWarning("External cache strategy requires plasma-store-server launched, " +
"failed to detect plasma-store-server, will fallback to simpleCache." + e.getMessage)
plasmaDetected = false;
}
plasmaDetected
}
}

class ExternalCache(fiberType: FiberType) extends OapCache with Logging {
private val conf = SparkEnv.get.conf
private val externalStoreCacheSocket: String =
Expand Down

0 comments on commit f225d0a

Please sign in to comment.