Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

Commit

Permalink
[PMEM-SPILL-34][POAE7-1119]Port RDD cache to Spark 3.1.1 as separate …
Browse files Browse the repository at this point in the history
…module (#41)
  • Loading branch information
yma11 authored May 25, 2021
1 parent 2c5450d commit 1516833
Show file tree
Hide file tree
Showing 37 changed files with 199 additions and 7,853 deletions.
145 changes: 145 additions & 0 deletions RDD-Cache/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.intel.oap</groupId>
<artifactId>pmem-spill-parent</artifactId>
<version>1.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>com.intel.oap</groupId>
<artifactId>pmem-rdd-cache</artifactId>
<version>1.1.0</version>
<name>OAP Project PMem RDD Cache</name>
<packaging>jar</packaging>

<properties>
<java.version>1.8</java.version>
<junit.version>4.12</junit.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.internal.version>3.1.1</spark.internal.version>
<maven-patch-plugin.version>1.2</maven-patch-plugin.version>
<maven-resource-plugin.version>2.6</maven-resource-plugin.version>
<jetty.version>9.4.18.v20190429</jetty.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
</dependency>
<dependency>
<groupId>com.intel.oap</groupId>
<artifactId>pmem-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>io.pmem</groupId>
<artifactId>libpmemkv-jni</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-plus</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<scope>compile</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>

<build>
<finalName>${project.artifactId}-${project.version}-with-spark-${spark.internal.version}</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<maxmem>1024m</maxmem>
<fork>true</fork>
<compilerArgs>
<arg>-Xlint:all,-serial,-path</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package org.apache.spark.memory

import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.memory.MemoryStore
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.{MemoryAllocator, MemoryBlock}
import org.apache.spark.unsafe.memory.MemoryAllocator

/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
Expand Down Expand Up @@ -57,8 +55,6 @@ private[spark] abstract class MemoryManager(
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val extendedMemoryPool = new ExtendedMemoryPool(this)

onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
Expand All @@ -75,14 +71,7 @@ private[spark] abstract class MemoryManager(
protected[this] val pmemStorageMemory = (pmemInitialSize * pmemUsableRatio).toLong

pmemStorageMemoryPool.incrementPoolSize(pmemStorageMemory)
protected[this] val extendedMemorySize = conf.get(MEMORY_EXTENDED_SIZE)
extendedMemoryPool.incrementPoolSize((extendedMemorySize * 0.9).toLong)

private[memory] var _pMemPages = new ArrayBuffer[MemoryBlock];

private[memory] def pMemPages: ArrayBuffer[MemoryBlock] = {
_pMemPages
}
/**
* Total available on heap memory for storage, in bytes. This amount can vary over time,
* depending on the MemoryManager implementation.
Expand All @@ -97,9 +86,9 @@ private[spark] abstract class MemoryManager(
def maxOffHeapStorageMemory: Long

/**
* Total available pmem memory for storage, in bytes. This amount can vary over time,
* depending on the MemoryManager implementation.
*/
* Total available pmem memory for storage, in bytes. This amount can vary over time,
* depending on the MemoryManager implementation.
*/
def maxPMemStorageMemory: Long

/**
Expand Down Expand Up @@ -145,18 +134,6 @@ private[spark] abstract class MemoryManager(
taskAttemptId: Long,
memoryMode: MemoryMode): Long

/**
* try to acquire numBytes of extended memory for current task and return the number
* of number of bytes obtained, or 0 if non can be allocated.
* @param numBytes
* @param taskAttemptId
* @return
*/
private[memory]
def acquireExtendedMemory(
numBytes: Long,
taskAttemptId: Long): Long

/**
* Release numBytes of execution memory belonging to the given task.
*/
Expand Down Expand Up @@ -208,47 +185,6 @@ private[spark] abstract class MemoryManager(
releaseStorageMemory(numBytes, memoryMode)
}

/**
* release extended memory of given task
* @param numBytes
* @param taskAttemptId
*/
def releaseExtendedMemory(numBytes: Long, taskAttemptId: Long): Unit = synchronized {
extendedMemoryPool.releaseMemory(numBytes, taskAttemptId)
}

/**
* release all extended memory occupied by given task
* @param taskAttemptId
* @return
*/
def releaseAllExtendedMemoryForTask(taskAttemptId: Long): Long = synchronized {
extendedMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

def addPMemPages(pMemPage: MemoryBlock): Unit = synchronized {
pMemPages.append(pMemPage);
}

def freeAllPMemPages(): Unit = synchronized {
for (pMemPage <- pMemPages) {
extendedMemoryAllocator.free(pMemPage);
}
}

/**
* @param size size of current page request
* @return PMem Page that suits for current page request
*/
def getUsablePMemPage(size : Long): MemoryBlock = synchronized {
for (pMemPage <- pMemPages) {
if (pMemPage.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER &&
pMemPage.size() == size) {
return pMemPage;
}
}
return null;
}
/**
* Execution memory currently in use, in bytes.
*/
Expand Down Expand Up @@ -349,6 +285,4 @@ private[spark] abstract class MemoryManager(
case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
}
}

private[memory] final val extendedMemoryAllocator = MemoryAllocator.EXTENDED
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,6 @@ private[spark] class UnifiedMemoryManager(
numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
}

override def acquireExtendedMemory(numBytes: Long, taskAttemptId: Long): Long = synchronized {
if (numBytes > extendedMemoryPool.memoryFree) {
logInfo(s"No PMem Space left, allocation fails.")
return 0;
}
extendedMemoryPool.acquireMemory(numBytes, taskAttemptId);
return numBytes
}

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
Expand Down
Loading

0 comments on commit 1516833

Please sign in to comment.