Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #39 from jikunshang/plasma_dependency
Browse files Browse the repository at this point in the history
[SQL-DS-CACHE-38][POAE7-892]update Plasma dependency
  • Loading branch information
Ferdinand Xu authored Mar 12, 2021
2 parents 5393388 + 20b4885 commit f0cd386
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
6 changes: 5 additions & 1 deletion Plasma-based-cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</parent>
<artifactId>plasma-sql-ds-cache</artifactId>
<version>1.1.0</version>
<name>Plasma Based SQL DS Cacahe</name>
<name>Plasma Based SQL DS Cache</name>
<packaging>jar</packaging>


Expand Down Expand Up @@ -274,6 +274,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--
We've changed something based on Apache Arrow. For mvn build, will use this dependency
since it's been published on central maven repo.
-->
<dependency>
<groupId>com.intel.arrow</groupId>
<artifactId>arrow-plasma</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ class ExternalCache(fiberType: FiberType) extends OapCache with Logging {
val objectId = hash(fiberId.toString)
val plasmaClient = plasmaClientPool(clientRoundRobin.getAndAdd(1) % clientPoolSize)
try {
val buf: ByteBuffer = plasmaClient.create(objectId, fiberLength.toInt)
val buf: ByteBuffer = plasmaClient.create(objectId, fiberLength.toInt, null)
ExternalDataFiber(buf, objectId, plasmaClient)
}
catch {
Expand Down Expand Up @@ -1065,19 +1065,18 @@ class ExternalCache(fiberType: FiberType) extends OapCache with Logging {
val objectId = hash(fiberId.toString)
if(contains(fiberId)) {
var fiberCache : FiberCache = null
try{
logDebug(s"Cache hit, get from external cache.")
val plasmaClient = plasmaClientPool(clientRoundRobin.getAndAdd(1) % clientPoolSize)
val buf: ByteBuffer = plasmaClient.getObjAsByteBuffer(objectId, -1, false)
logDebug(s"Cache hit, get from external cache.")
val plasmaClient = plasmaClientPool(clientRoundRobin.getAndAdd(1) % clientPoolSize)
val buf: ByteBuffer = plasmaClient.getObjAsByteBuffer(objectId, -1, false)
if(buf.asInstanceOf[DirectBuffer].address() == 0) {
logWarning("Get return an invalid value.")
fiberCache = cache(fiberId)
cacheMissCount.addAndGet(1)
} else {
cacheHitCount.addAndGet(1)
fiberCache = ExternalDataFiber(buf, objectId, plasmaClient)
}
catch {
case getException : plasma.exceptions.PlasmaGetException =>
logWarning("Get exception: " + getException.getMessage)
fiberCache = cache(fiberId)
cacheMissCount.addAndGet(1)
}

fiberCache.fiberId = fiberId
fiberCache.occupy()
cacheGuardian.addRemovalFiber(fiberId, fiberCache)
Expand Down

0 comments on commit f0cd386

Please sign in to comment.