From e448c0eba432aa4b6fd1d6c0b3d09695531f65c5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 24 Sep 2021 00:28:06 +0800 Subject: [PATCH 1/9] [NSE-518] Arrow buffer cleanup: Support both manual release and auto release as a hybrid mode --- .../v2/arrow/SparkMemoryUtils.scala | 74 +++++-------------- 1 file changed, 20 insertions(+), 54 deletions(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index 4aeafe87b..1cb1f46a5 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -20,24 +20,20 @@ package org.apache.spark.sql.execution.datasources.v2.arrow import java.io.PrintWriter import java.util import java.util.UUID - import scala.collection.JavaConverters._ - import com.intel.oap.spark.sql.execution.datasources.v2.arrow._ import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream import org.apache.arrow.dataset.jni.NativeMemoryPool import org.apache.arrow.memory.AllocationListener -import org.apache.arrow.memory.AllocationOutcome -import org.apache.arrow.memory.AutoBufferLedger +import org.apache.arrow.memory.BaseAllocator.configBuilder import org.apache.arrow.memory.BufferAllocator -import org.apache.arrow.memory.BufferLedger -import org.apache.arrow.memory.DirectAllocationListener import org.apache.arrow.memory.ImmutableConfig -import org.apache.arrow.memory.LegacyBufferLedger +import org.apache.arrow.memory.MemoryChunkCleaner +import org.apache.arrow.memory.MemoryChunkManager import org.apache.arrow.memory.RootAllocator - import org.apache.spark.TaskContext import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.TaskCompletionListener @@ -46,36 +42,6 @@ object SparkMemoryUtils extends Logging { private val DEBUG: Boolean = false - class AllocationListenerList(listeners: AllocationListener *) - extends AllocationListener { - override def onPreAllocation(size: Long): Unit = { - listeners.foreach(_.onPreAllocation(size)) - } - - override def onAllocation(size: Long): Unit = { - listeners.foreach(_.onAllocation(size)) - } - - override def onRelease(size: Long): Unit = { - listeners.foreach(_.onRelease(size)) - } - - override def onFailedAllocation(size: Long, outcome: AllocationOutcome): Boolean = { - listeners.forall(_.onFailedAllocation(size, outcome)) - } - - override def onChildAdded(parentAllocator: BufferAllocator, - childAllocator: BufferAllocator): Unit = { - listeners.foreach(_.onChildAdded(parentAllocator, childAllocator)) - - } - - override def onChildRemoved(parentAllocator: BufferAllocator, - childAllocator: BufferAllocator): Unit = { - listeners.foreach(_.onChildRemoved(parentAllocator, childAllocator)) - } - } - class TaskMemoryResources { if (!inSparkTask()) { throw new IllegalStateException("Creating TaskMemoryResources instance out of Spark task") @@ -85,22 +51,21 @@ object SparkMemoryUtils extends Logging { val isArrowAutoReleaseEnabled: Boolean = { SQLConf.get - .getConfString("spark.oap.sql.columnar.autorelease", "false").toBoolean + .getConfString("spark.oap.sql.columnar.autorelease", "true").toBoolean } - val ledgerFactory: BufferLedger.Factory = if (isArrowAutoReleaseEnabled) { - AutoBufferLedger.newFactory() + val memoryChunkManagerFactory: MemoryChunkManager.Factory = if (isArrowAutoReleaseEnabled) { + MemoryChunkCleaner.newFactoryWithManualReleaseEnabled() } else { - LegacyBufferLedger.FACTORY + MemoryChunkManager.FACTORY } val sparkManagedAllocationListener = new SparkManagedAllocationListener( new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP), sharedMetrics) - val directAllocationListener = DirectAllocationListener.INSTANCE val allocListener: AllocationListener = if (isArrowAutoReleaseEnabled) { - new AllocationListenerList(sparkManagedAllocationListener, directAllocationListener) + MemoryChunkCleaner.gcTrigger(sparkManagedAllocationListener) } else { sparkManagedAllocationListener } @@ -122,11 +87,12 @@ object SparkMemoryUtils extends Logging { private val memoryPools = new util.ArrayList[NativeMemoryPoolWrapper]() val defaultAllocator: BufferAllocator = { - val alloc = new RootAllocator(ImmutableConfig.builder() - .maxAllocation(Long.MaxValue) - .bufferLedgerFactory(ledgerFactory) - .listener(allocListener) - .build) + val alloc = new RootAllocator( + configBuilder() + .maxAllocation(Long.MaxValue) + .memoryChunkManagerFactory(memoryChunkManagerFactory) + .listener(allocListener) + .build) allocators.add(alloc) alloc } @@ -198,7 +164,7 @@ object SparkMemoryUtils extends Logging { } def release(): Unit = { - ledgerFactory match { + memoryChunkManagerFactory match { case closeable: AutoCloseable => closeable.close() case _ => @@ -272,10 +238,10 @@ object SparkMemoryUtils extends Logging { } private val allocator = new RootAllocator( - ImmutableConfig.builder() - .maxAllocation(Long.MaxValue) - .bufferLedgerFactory(AutoBufferLedger.newFactory()) - .listener(DirectAllocationListener.INSTANCE) + configBuilder() + .maxAllocation(SQLConf.get.getConf(MEMORY_OFFHEAP_SIZE)) + .memoryChunkManagerFactory(MemoryChunkCleaner.newFactory()) + .listener(MemoryChunkCleaner.gcTrigger()) .build) def globalAllocator(): BufferAllocator = { From 9875fd09209fc1b23f44e22aca6745fcb98de7e9 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 24 Sep 2021 00:29:26 +0800 Subject: [PATCH 2/9] TO BE REVERTED --- arrow-data-source/script/build_arrow.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-data-source/script/build_arrow.sh b/arrow-data-source/script/build_arrow.sh index 410e31070..7ed20fe44 100755 --- a/arrow-data-source/script/build_arrow.sh +++ b/arrow-data-source/script/build_arrow.sh @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}" echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}" mkdir -p $ARROW_SOURCE_DIR mkdir -p $ARROW_INSTALL_DIR -git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR +git clone https://github.com/zhztheplayer/arrow-1.git --branch oap-auto-release-hybrid $ARROW_SOURCE_DIR pushd $ARROW_SOURCE_DIR cmake ./cpp \ From d883575d782ae4b549440d62f844cc9db218893f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 24 Sep 2021 14:36:39 +0800 Subject: [PATCH 3/9] fixup --- .../execution/datasources/v2/arrow/SparkMemoryUtils.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index 1cb1f46a5..cd1da4a80 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -25,7 +25,6 @@ import com.intel.oap.spark.sql.execution.datasources.v2.arrow._ import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream import org.apache.arrow.dataset.jni.NativeMemoryPool import org.apache.arrow.memory.AllocationListener -import org.apache.arrow.memory.BaseAllocator.configBuilder import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.memory.ImmutableConfig import org.apache.arrow.memory.MemoryChunkCleaner @@ -88,7 +87,7 @@ object SparkMemoryUtils extends Logging { val defaultAllocator: BufferAllocator = { val alloc = new RootAllocator( - configBuilder() + RootAllocator.configBuilder() .maxAllocation(Long.MaxValue) .memoryChunkManagerFactory(memoryChunkManagerFactory) .listener(allocListener) @@ -238,7 +237,7 @@ object SparkMemoryUtils extends Logging { } private val allocator = new RootAllocator( - configBuilder() + RootAllocator.configBuilder() .maxAllocation(SQLConf.get.getConf(MEMORY_OFFHEAP_SIZE)) .memoryChunkManagerFactory(MemoryChunkCleaner.newFactory()) .listener(MemoryChunkCleaner.gcTrigger()) From 8a7968dea7fe06629f7af3b1ef11f16b67d8e61e Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 24 Sep 2021 16:57:54 +0800 Subject: [PATCH 4/9] fix --- .../sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index cd1da4a80..d44456b28 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -50,11 +50,11 @@ object SparkMemoryUtils extends Logging { val isArrowAutoReleaseEnabled: Boolean = { SQLConf.get - .getConfString("spark.oap.sql.columnar.autorelease", "true").toBoolean + .getConfString("spark.oap.sql.columnar.autorelease", "false").toBoolean } val memoryChunkManagerFactory: MemoryChunkManager.Factory = if (isArrowAutoReleaseEnabled) { - MemoryChunkCleaner.newFactoryWithManualReleaseEnabled() + MemoryChunkCleaner.newFactory(MemoryChunkCleaner.Mode.HYBRID) } else { MemoryChunkManager.FACTORY } From 87fb32e249256bb0990420027f550cc85e529145 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 27 Sep 2021 10:42:16 +0800 Subject: [PATCH 5/9] fixup --- .../datasources/v2/arrow/SparkMemoryUtils.scala | 10 +++++++++- .../main/java/com/intel/oap/vectorized/JniUtils.java | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index d44456b28..88c4330fa 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.datasources.v2.arrow import java.io.PrintWriter import java.util import java.util.UUID + import scala.collection.JavaConverters._ + import com.intel.oap.spark.sql.execution.datasources.v2.arrow._ import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream import org.apache.arrow.dataset.jni.NativeMemoryPool @@ -30,6 +32,8 @@ import org.apache.arrow.memory.ImmutableConfig import org.apache.arrow.memory.MemoryChunkCleaner import org.apache.arrow.memory.MemoryChunkManager import org.apache.arrow.memory.RootAllocator + +import org.apache.spark.SparkContext import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -236,9 +240,13 @@ object SparkMemoryUtils extends Logging { } } + private val maxAllocationSize = { + SparkContext.getOrCreate().getConf.get(MEMORY_OFFHEAP_SIZE) + } + private val allocator = new RootAllocator( RootAllocator.configBuilder() - .maxAllocation(SQLConf.get.getConf(MEMORY_OFFHEAP_SIZE)) + .maxAllocation(maxAllocationSize) .memoryChunkManagerFactory(MemoryChunkCleaner.newFactory()) .listener(MemoryChunkCleaner.gcTrigger()) .build) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java index 9f55054ce..48d84a85a 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/JniUtils.java @@ -61,7 +61,7 @@ public static JniUtils getInstance(String tmp_dir) throws IOException { try { INSTANCE = new JniUtils(tmp_dir); } catch (IllegalAccessException ex) { - throw new IOException("IllegalAccess"); + throw new IOException("IllegalAccess", ex); } } } From e28bcb6c1242acc5196a87fe6a26bf074aee9438 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 27 Sep 2021 18:10:22 +0800 Subject: [PATCH 6/9] fixup --- .../datasources/v2/arrow/SparkMemoryUtils.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index 88c4330fa..b29f73c1a 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -28,12 +28,11 @@ import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream import org.apache.arrow.dataset.jni.NativeMemoryPool import org.apache.arrow.memory.AllocationListener import org.apache.arrow.memory.BufferAllocator -import org.apache.arrow.memory.ImmutableConfig import org.apache.arrow.memory.MemoryChunkCleaner import org.apache.arrow.memory.MemoryChunkManager import org.apache.arrow.memory.RootAllocator -import org.apache.spark.SparkContext +import org.apache.spark.SparkEnv import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -89,7 +88,7 @@ object SparkMemoryUtils extends Logging { private val memoryPools = new util.ArrayList[NativeMemoryPoolWrapper]() - val defaultAllocator: BufferAllocator = { + val taskDefaultAllocator: BufferAllocator = { val alloc = new RootAllocator( RootAllocator.configBuilder() .maxAllocation(Long.MaxValue) @@ -123,7 +122,7 @@ object SparkMemoryUtils extends Logging { val al = new SparkManagedAllocationListener( new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller), sharedMetrics) - val parent = defaultAllocator + val parent = taskDefaultAllocator val alloc = parent.newChildAllocator("Spark Managed Allocator - " + UUID.randomUUID().toString, al, 0, parent.getLimit).asInstanceOf[BufferAllocator] allocators.add(alloc) @@ -241,10 +240,10 @@ object SparkMemoryUtils extends Logging { } private val maxAllocationSize = { - SparkContext.getOrCreate().getConf.get(MEMORY_OFFHEAP_SIZE) + SparkEnv.get.conf.get(MEMORY_OFFHEAP_SIZE) } - private val allocator = new RootAllocator( + private val globalAlloc = new RootAllocator( RootAllocator.configBuilder() .maxAllocation(maxAllocationSize) .memoryChunkManagerFactory(MemoryChunkCleaner.newFactory()) @@ -252,7 +251,7 @@ object SparkMemoryUtils extends Logging { .build) def globalAllocator(): BufferAllocator = { - allocator + globalAlloc } def globalMemoryPool(): NativeMemoryPool = { @@ -277,7 +276,7 @@ object SparkMemoryUtils extends Logging { if (!inSparkTask()) { return globalAllocator() } - getTaskMemoryResources().defaultAllocator + getTaskMemoryResources().taskDefaultAllocator } def contextMemoryPool(): NativeMemoryPool = { From 655ff2a3f15a962116845c579466636f68fd5034 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 27 Sep 2021 21:30:19 +0800 Subject: [PATCH 7/9] fixup --- .../sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index b29f73c1a..336591edb 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -176,7 +176,8 @@ object SparkMemoryUtils extends Logging { if (allocated == 0L) { close(allocator) } else { - softClose(allocator) +// softClose(allocator) + close(allocator) } } for (pool <- memoryPools.asScala) { From 9ca6c25161abd0475c15c7721ba3636ec88381be Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 27 Sep 2021 23:40:47 +0800 Subject: [PATCH 8/9] fixup --- .../execution/datasources/v2/arrow/SparkMemoryUtils.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index 336591edb..c5d28b18d 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -176,8 +176,11 @@ object SparkMemoryUtils extends Logging { if (allocated == 0L) { close(allocator) } else { -// softClose(allocator) - close(allocator) + if (isArrowAutoReleaseEnabled) { + close(allocator) + } else { + softClose(allocator) + } } } for (pool <- memoryPools.asScala) { From 084eeda25d18d10d7424de4e2bed065020f660ab Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 29 Sep 2021 10:40:37 +0800 Subject: [PATCH 9/9] fixup --- .../sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index c5d28b18d..a5067098e 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -57,7 +57,7 @@ object SparkMemoryUtils extends Logging { } val memoryChunkManagerFactory: MemoryChunkManager.Factory = if (isArrowAutoReleaseEnabled) { - MemoryChunkCleaner.newFactory(MemoryChunkCleaner.Mode.HYBRID) + MemoryChunkCleaner.newFactory(MemoryChunkCleaner.Mode.HYBRID_WITH_LOG) } else { MemoryChunkManager.FACTORY }