From 8dffa8d7deacdcd3a44e4aef2f2e61d04429d03b Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Thu, 11 Mar 2021 13:57:53 +0800 Subject: [PATCH 1/3] use standard Plasma API --- .../datasources/oap/filecache/OapCache.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala index ad36ab808..fad836095 100644 --- a/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala +++ b/Plasma-based-cache/src/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala @@ -996,7 +996,7 @@ class ExternalCache(fiberType: FiberType) extends OapCache with Logging { val objectId = hash(fiberId.toString) val plasmaClient = plasmaClientPool(clientRoundRobin.getAndAdd(1) % clientPoolSize) try { - val buf: ByteBuffer = plasmaClient.create(objectId, fiberLength.toInt) + val buf: ByteBuffer = plasmaClient.create(objectId, fiberLength.toInt, null) ExternalDataFiber(buf, objectId, plasmaClient) } catch { @@ -1065,19 +1065,18 @@ class ExternalCache(fiberType: FiberType) extends OapCache with Logging { val objectId = hash(fiberId.toString) if(contains(fiberId)) { var fiberCache : FiberCache = null - try{ - logDebug(s"Cache hit, get from external cache.") - val plasmaClient = plasmaClientPool(clientRoundRobin.getAndAdd(1) % clientPoolSize) - val buf: ByteBuffer = plasmaClient.getObjAsByteBuffer(objectId, -1, false) + logDebug(s"Cache hit, get from external cache.") + val plasmaClient = plasmaClientPool(clientRoundRobin.getAndAdd(1) % clientPoolSize) + val buf: ByteBuffer = plasmaClient.getObjAsByteBuffer(objectId, -1, false) + if(buf.asInstanceOf[DirectBuffer].address() == 0) { + logWarning("Get return an invalid value.") + fiberCache = cache(fiberId) + cacheMissCount.addAndGet(1) + } else { cacheHitCount.addAndGet(1) fiberCache = ExternalDataFiber(buf, objectId, plasmaClient) } - catch { - case getException : plasma.exceptions.PlasmaGetException => - logWarning("Get exception: " + getException.getMessage) - fiberCache = cache(fiberId) - cacheMissCount.addAndGet(1) - } + fiberCache.fiberId = fiberId fiberCache.occupy() cacheGuardian.addRemovalFiber(fiberId, fiberCache) From 6ce2375c496296da3173814ebf97ed0ebd9bdd70 Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Thu, 11 Mar 2021 14:22:29 +0800 Subject: [PATCH 2/3] add comments in pom file to illustrate dependency. --- Plasma-based-cache/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Plasma-based-cache/pom.xml b/Plasma-based-cache/pom.xml index b9c27cd19..e90712fc0 100644 --- a/Plasma-based-cache/pom.xml +++ b/Plasma-based-cache/pom.xml @@ -274,6 +274,10 @@ test-jar test + com.intel.arrow arrow-plasma From 20b48859f5475bbf6a6ef1c8c664f22f44a8bebd Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Thu, 11 Mar 2021 14:23:11 +0800 Subject: [PATCH 3/3] fix typo --- Plasma-based-cache/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Plasma-based-cache/pom.xml b/Plasma-based-cache/pom.xml index e90712fc0..ca6acb1b1 100644 --- a/Plasma-based-cache/pom.xml +++ b/Plasma-based-cache/pom.xml @@ -23,7 +23,7 @@ plasma-sql-ds-cache 1.1.0 - Plasma Based SQL DS Cacahe + Plasma Based SQL DS Cache jar