Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
[SQL-DS-CACHE-188][POAE7-1253] improvement of fallback from plasma ca…
Browse files Browse the repository at this point in the history
…che to simple cache (#189)
  • Loading branch information
yma11 authored Jul 30, 2021
1 parent 1c497b8 commit 6bc51ce
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -597,18 +597,18 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options)
}

private final InputFile file;
protected final SeekableInputStream f;
public final SeekableInputStream f;
private final ParquetReadOptions options;
protected final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
public final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
private final FileMetaData fileMetaData; // may be null
protected final List<BlockMetaData> blocks;
public final List<BlockMetaData> blocks;

// not final. in some cases, this may be lazily loaded for backward-compat.
private ParquetMetadata footer;

protected int currentBlock = 0;
protected ColumnChunkPageReadStore currentRowGroup = null;
protected DictionaryPageReader nextDictionaryReader = null;
public int currentBlock = 0;
public ColumnChunkPageReadStore currentRowGroup = null;
public DictionaryPageReader nextDictionaryReader = null;

/**
* @param configuration the Hadoop conf
Expand Down Expand Up @@ -823,7 +823,7 @@ public boolean skipNextRowGroup() {
return advanceToNextBlock();
}

protected boolean advanceToNextBlock() {
public boolean advanceToNextBlock() {
if (currentBlock == blocks.size()) {
return false;
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ public BytesInput readAsBytesInput(int size) throws IOException {
/**
* deals with a now fixed bug where compressedLength was missing a few bytes.
*/
protected class WorkaroundChunk extends Chunk {
public class WorkaroundChunk extends Chunk {

private final SeekableInputStream f;

Expand Down Expand Up @@ -1098,7 +1098,7 @@ public BytesInput readAsBytesInput(int size) throws IOException {
*/
static class ChunkDescriptor {

protected final ColumnDescriptor col;
public final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
private final int size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private[sql] object MemoryManager extends Logging {
case "tmp" => new TmpDramMemoryManager(sparkEnv)
case "kmem" => new DaxKmemMemoryManager(sparkEnv)
case "plasma" =>
if (plasmaServerDetect()) {
if (plasmaServerDetect(sparkEnv)) {
new PlasmaMemoryManager(sparkEnv)
} else {
new OffHeapMemoryManager(sparkEnv)
Expand Down Expand Up @@ -164,7 +164,7 @@ private[sql] object MemoryManager extends Logging {
case "noevict" => new HybridMemoryManager(sparkEnv)
case "vmem" => new TmpDramMemoryManager(sparkEnv)
case "external" =>
if (plasmaServerDetect()) {
if (plasmaServerDetect(sparkEnv)) {
new PlasmaMemoryManager(sparkEnv)
} else {
new OffHeapMemoryManager(sparkEnv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,23 @@ private[filecache] class CacheGuardian(maxMemory: Long) extends Thread with Logg
}

private[filecache] object OapCache extends Logging {
def plasmaServerDetect(): Boolean = {
val command = "ps -ef" #| "grep plasma"
val plasmaServerStatus = command.!!
if (plasmaServerStatus.indexOf("plasma-store-server") == -1) {
logWarning("External cache strategy requires plasma-store-server launched, " +
"failed to detect plasma-store-server, will fallback to simpleCache.")
return false
def plasmaServerDetect(sparkEnv: SparkEnv): Boolean = {
val socket = sparkEnv.conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH)
try {
System.loadLibrary("plasma_java")
} catch {
case e: Exception => logError(s"load plasma jni lib failed " + e.getMessage)
}
true
var plasmaDetected: Boolean = true;
try {
val conn: plasma.PlasmaClient = new plasma.PlasmaClient(socket, "", 0)
} catch {
case e: PlasmaClientException =>
logWarning("External cache strategy requires plasma-store-server launched, " +
"failed to detect plasma-store-server, will fallback to simpleCache." + e.getMessage)
plasmaDetected = false;
}
plasmaDetected
}
def cacheFallBackDetect(sparkEnv: SparkEnv,
fallBackEnabled: Boolean = true,
Expand Down Expand Up @@ -299,7 +307,7 @@ private[filecache] object OapCache extends Logging {

oapCacheOpt match {
case "external" =>
if (plasmaServerDetect()) new ExternalCache(fiberType)
if (plasmaServerDetect(sparkEnv)) new ExternalCache(fiberType)
else new SimpleOapCache()
case "guava" =>
if (cacheFallBackDetect(sparkEnv, fallBackEnabled.toBoolean, fallBackRes.toBoolean)) {
Expand Down Expand Up @@ -956,7 +964,8 @@ class MixCache(dataCacheMemory: Long,

class ExternalCache(fiberType: FiberType) extends OapCache with Logging {
private val conf = SparkEnv.get.conf
private val externalStoreCacheSocket: String = "/tmp/plasmaStore"
private val externalStoreCacheSocket: String =
conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH)
private var cacheInit: Boolean = false
private var externalDBClient: ExternalDBClient = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,4 +772,10 @@ object OapConf {
.stringConf
.createWithDefault("RedisClient")

val OAP_EXTERNAL_CACHE_SOCKET_PATH =
SqlConfAdapter.buildConf("spark.sql.oap.external.cache.socket.path")
.internal()
.doc("The socket path of plasma cache")
.stringConf
.createWithDefault("/tmp/plasmaStore")
}
2 changes: 2 additions & 0 deletions docs/User-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ spark.oap.cache.strategy external
spark.sql.oap.dcpmm.free.wait.threshold 50000000000
# according to your executor core number
spark.executor.sql.oap.cache.external.client.pool.size 10
# The socket path of plasma server, default is /tmp/plasmaStore
spark.sql.oap.external.cache.socket.path /tmp/plasmaStore
```
Start Plasma service manually

Expand Down

0 comments on commit 6bc51ce

Please sign in to comment.