Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-45] BHJ memory leak #50

Merged
merged 11 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/report_ram_log.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ jobs:
git clone https://github.com/oap-project/arrow-data-source.git
cd arrow-data-source
mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests
- run: |
- name: Run Maven tests
run: |
cd core/
mvn test -B -DmembersOnlySuites=com.intel.oap.tpch -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DtagsToInclude=com.intel.oap.tags.CommentOnContextPR -Dexec.skip=true
env:
Expand Down
10 changes: 6 additions & 4 deletions .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
git clone https://github.com/intel-bigdata/arrow.git
cd arrow && git checkout oap-master && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON && make -j2
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2
sudo make install
cd ../../java
mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -P arrow-jni -am -Darrow.cpp.build.dir=/tmp/arrow/cpp/build/release/ -DskipTests -Dcheckstyle.skip
Expand All @@ -53,11 +53,13 @@ jobs:
git clone https://github.com/oap-project/arrow-data-source.git
cd arrow-data-source
mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
- run: |
- name: Run Maven tests
run: |
cd core/
mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.TestAndWriteLogs
mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.TestAndWriteLogs -DargLine="-Xmx1G -XX:MaxDirectMemorySize=500M -Dio.netty.allocator.numDirectArena=1"
env:
MAVEN_OPTS: "-Xmx2048m"
MALLOC_ARENA_MAX: "4"
MAVEN_OPTS: "-Xmx1G"
COMMENT_TEXT_OUTPUT_PATH: "/tmp/comment_text.txt"
COMMENT_IMAGE_OUTPUT_PATH: "/tmp/comment_image.png"
ENABLE_TPCH_TESTS: "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,12 @@ public final class ArrowWritableColumnVector extends WritableColumnVector {
private int ordinal;
private ValueVector vector;
private ValueVector dictionaryVector;
private static BufferAllocator OffRecordAllocator = null;
private static BufferAllocator OffRecordAllocator = SparkMemoryUtils.globalAllocator();

public static BufferAllocator getAllocator() {
return SparkMemoryUtils.contextAllocator();
}
public static BufferAllocator getOffRecordAllocator() {
if (OffRecordAllocator == null) {
OffRecordAllocator = new RootAllocator(Long.MAX_VALUE);
}
return OffRecordAllocator;
}
public static AtomicLong vectorCount = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.vector.ipc.message.ArrowBuffer;
import org.apache.arrow.vector.ipc.message.MessageSerializer;

public class BatchIterator {
public class BatchIterator implements AutoCloseable {
private native boolean nativeHasNext(long nativeHandler);
private native ArrowRecordBatchBuilder nativeNext(long nativeHandler);
private native MetricsObject nativeFetchMetrics(long nativeHandler);
Expand Down Expand Up @@ -89,7 +89,7 @@ public SerializableObject nextHashRelationObject()
return null;
}
NativeSerializableObject obj = nativeNextHashRelation(nativeHandler);
SerializableObject objImpl = new SerializableObject(obj);
SerializableObject objImpl = new SerializableObject(obj, new AutoCloseable[]{this});
return objImpl;
}

Expand Down Expand Up @@ -196,6 +196,7 @@ public void setDependencies(BatchIterator[] dependencies) {
nativeSetDependencies(nativeHandler, instanceIdList);
}

@Override
public void close() {
if (!closed) {
nativeClose(nativeHandler);
Expand Down
36 changes: 19 additions & 17 deletions core/src/main/java/com/intel/oap/vectorized/SerializableObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.arrow.util.AutoCloseables;

/** ArrowBufBuilder. */
public class SerializableObject implements Externalizable, KryoSerializable {
public int total_size;
public int[] size;
private ByteBuf[] directAddrs;
private ByteBufAllocator allocator;

private transient AutoCloseable[] resources = null;

public SerializableObject() {}

Expand All @@ -44,33 +46,28 @@ public SerializableObject() {}
* @param memoryAddress native ArrowBuf data addr.
* @param size ArrowBuf size.
*/
public SerializableObject(long[] memoryAddress, int[] size) throws IOException {
public SerializableObject(long[] memoryAddress, int[] size, AutoCloseable[] resources) throws IOException {
this.total_size = 0;
this.size = size;
directAddrs = new ByteBuf[size.length];
for (int i = 0; i < size.length; i++) {
this.total_size += size[i];
directAddrs[i] = Unpooled.wrappedBuffer(memoryAddress[i], size[i], false);
}
this.resources = resources;
}

/**
* Create an instance for NativeSerializableObject.
*
* @param memoryAddress native ArrowBuf data addr.
* @param size ArrowBuf size.
*/
public SerializableObject(NativeSerializableObject obj)
public SerializableObject(NativeSerializableObject obj, AutoCloseable[] resources)
throws IOException, ClassNotFoundException {
this(obj.memoryAddress, obj.size);
this(obj.memoryAddress, obj.size, resources);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.total_size = in.readInt();
int size_len = in.readInt();
this.size = (int[]) in.readObject();
allocator = UnpooledByteBufAllocator.DEFAULT;
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
directAddrs = new ByteBuf[size_len];
for (int i = 0; i < size.length; i++) {
directAddrs[i] = allocator.directBuffer(size[i], size[i]);
Expand Down Expand Up @@ -104,7 +101,7 @@ public void read(Kryo kryo, Input in) {
this.total_size = in.readInt();
int size_len = in.readInt();
this.size = in.readInts(size_len);
allocator = UnpooledByteBufAllocator.DEFAULT;
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
directAddrs = new ByteBuf[size_len];
for (int i = 0; i < size.length; i++) {
directAddrs[i] = allocator.directBuffer(size[i], size[i]);
Expand Down Expand Up @@ -141,6 +138,7 @@ public void write(Kryo kryo, Output out) {

public void close() {
releaseDirectMemory();
releaseResources();
}

public long[] getDirectMemoryAddrs() throws IOException {
Expand All @@ -154,18 +152,22 @@ public long[] getDirectMemoryAddrs() throws IOException {
return addrs;
}

public void releaseDirectMemory() {
private void releaseDirectMemory() {
if (directAddrs != null) {
for (int i = 0; i < directAddrs.length; i++) {
directAddrs[i].release();
}
}
}

public int getRefCnt() {
if (directAddrs != null) {
return directAddrs[0].refCnt();
private void releaseResources() {
if (resources == null) {
return;
}
try {
AutoCloseables.close(resources);
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ColumnarPluginConfig(conf: SparkConf) {
conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", defaultValue = false)
val tmpFile: String =
conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null)
val broadcastCacheTimeout: Int =
@deprecated val broadcastCacheTimeout: Int =
conf.getInt("spark.sql.columnar.sort.broadcast.cache.timeout", defaultValue = -1)
val hashCompare: Boolean =
conf.getBoolean("spark.oap.sql.columnar.hashCompare", defaultValue = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

package com.intel.oap.execution

import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.expression.ConverterUtils
import com.intel.oap.vectorized.CloseableColumnBatchIterator
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

private final case class BroadcastColumnarRDDPartition(index: Int) extends Partition
Expand All @@ -41,9 +37,7 @@ case class BroadcastColumnarRDD(
(0 until numPartitioning).map { index => new BroadcastColumnarRDDPartition(index) }.toArray
}
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val timeout: Int = SQLConf.get.broadcastTimeout.toInt
val relation = inputByteBuf.value.asReadOnlyCopy
relation.countDownClose(timeout)
new CloseableColumnBatchIterator(relation.getColumnarBatchAsIter)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,26 @@

package com.intel.oap.execution

import java.io.{ByteArrayInputStream, ObjectInputStream}
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit._

import com.intel.oap.vectorized._
import com.google.common.collect.Lists
import com.intel.oap.ColumnarPluginConfig
import org.apache.spark.TaskContext
import com.intel.oap.expression._
import com.intel.oap.vectorized.{ExpressionEvaluator, _}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.{ArrowType, Field}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetrics

import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.catalyst.expressions.BindReferences._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.arrow.vector.ipc.message.ArrowFieldNode
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.gandiva.evaluator._
import io.netty.buffer.ArrowBuf
import io.netty.buffer.ByteBuf
import com.google.common.collect.Lists
import com.intel.oap.expression._
import com.intel.oap.vectorized.ExpressionEvaluator
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin}
import org.apache.spark.sql.types.{StructField, StructType}

/**
* Performs a hash join of two child relations by first shuffling the data using the join keys.
Expand Down Expand Up @@ -260,8 +239,6 @@ case class ColumnarBroadcastHashJoinExec(
var eval_elapse: Long = 0
val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]()

val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout

streamedPlan.executeColumnar().mapPartitions { iter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
val hashRelationKernel = new ExpressionEvaluator()
Expand All @@ -283,6 +260,7 @@ case class ColumnarBroadcastHashJoinExec(
Field.nullable("result", new ArrowType.Int(32, true)))
hashRelationKernel.build(hash_relation_schema, Lists.newArrayList(hash_relation_expr), true)
val hashRelationResultIterator = hashRelationKernel.finishByIterator()

// we need to set original recordBatch to hashRelationKernel
var numRows = 0
while (depIter.hasNext) {
Expand Down Expand Up @@ -328,7 +306,6 @@ case class ColumnarBroadcastHashJoinExec(
hashRelationResultIterator.close
nativeKernel.close
nativeIterator.close
relation.countDownClose(timeout)
}

// now we can return this wholestagecodegen iter
Expand Down Expand Up @@ -451,7 +428,6 @@ case class ColumnarBroadcastHashJoinExec(
val listJars = uploadAndListJars(signature)
val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]()
val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer()
val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout

streamedPlan.executeColumnar().mapPartitions { streamIter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
Expand Down Expand Up @@ -518,7 +494,6 @@ case class ColumnarBroadcastHashJoinExec(
hashRelationResultIterator.close
nativeKernel.close
nativeIterator.close
relation.countDownClose(timeout)
}
val resultStructType = ArrowUtils.fromArrowSchema(resCtx.outputSchema)
val res = new Iterator[ColumnarBatch] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ case class ColumnarShuffledHashJoinExec(

val signature = getCodeGenSignature(hashTableType)
val listJars = uploadAndListJars(signature)
val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout
val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer()

if (hashTableType == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,25 @@

package com.intel.oap.execution

import java.io.{ByteArrayInputStream, ObjectInputStream}
import com.google.common.collect.Lists
import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.execution._
import com.intel.oap.expression._
import com.intel.oap.vectorized._
import org.apache.spark._
import com.intel.oap.vectorized.{BatchIterator, ExpressionEvaluator, _}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.pojo.Schema
import com.intel.oap.vectorized.ExpressionEvaluator
import com.intel.oap.vectorized.BatchIterator
import com.google.common.collect.Lists
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils}

import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

case class ColumnarCodegenContext(inputSchema: Schema, outputSchema: Schema, root: TreeNode) {}

Expand Down Expand Up @@ -264,7 +256,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I

val numOutputBatches = child.longMetric("numOutputBatches")
val pipelineTime = longMetric("pipelineTime")
val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout

var build_elapse: Long = 0
var eval_elapse: Long = 0
Expand Down Expand Up @@ -523,7 +514,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
dependentKernelIterators.foreach(_.close)
nativeKernel.close
nativeIterator.close
relationHolder.foreach(r => r.countDownClose(timeout))
relationHolder.clear()
}
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
close
Expand Down
Loading