diff --git a/.github/workflows/report_ram_log.yml b/.github/workflows/report_ram_log.yml index bb09634b6..6f87167bd 100644 --- a/.github/workflows/report_ram_log.yml +++ b/.github/workflows/report_ram_log.yml @@ -59,7 +59,8 @@ jobs: git clone https://github.com/oap-project/arrow-data-source.git cd arrow-data-source mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests - - run: | + - name: Run Maven tests + run: | cd core/ mvn test -B -DmembersOnlySuites=com.intel.oap.tpch -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DtagsToInclude=com.intel.oap.tags.CommentOnContextPR -Dexec.skip=true env: diff --git a/.github/workflows/tpch.yml b/.github/workflows/tpch.yml index 1d490adf9..819264456 100644 --- a/.github/workflows/tpch.yml +++ b/.github/workflows/tpch.yml @@ -43,7 +43,7 @@ jobs: git clone https://github.com/intel-bigdata/arrow.git cd arrow && git checkout oap-master && cd cpp mkdir build && cd build - cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON && make -j2 + cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2 sudo make install cd ../../java mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -P arrow-jni -am -Darrow.cpp.build.dir=/tmp/arrow/cpp/build/release/ -DskipTests -Dcheckstyle.skip @@ -53,11 +53,13 @@ jobs: git clone https://github.com/oap-project/arrow-data-source.git cd arrow-data-source mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn - - run: | + - name: Run Maven tests + run: | cd core/ - mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.TestAndWriteLogs + mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.TestAndWriteLogs -DargLine="-Xmx1G -XX:MaxDirectMemorySize=500M -Dio.netty.allocator.numDirectArena=1" env: - MAVEN_OPTS: "-Xmx2048m" + MALLOC_ARENA_MAX: "4" + MAVEN_OPTS: "-Xmx1G" COMMENT_TEXT_OUTPUT_PATH: "/tmp/comment_text.txt" COMMENT_IMAGE_OUTPUT_PATH: "/tmp/comment_image.png" ENABLE_TPCH_TESTS: "true" diff --git a/core/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java b/core/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java index d804951dd..f7532cf71 100644 --- a/core/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java +++ b/core/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java @@ -66,15 +66,12 @@ public final class ArrowWritableColumnVector extends WritableColumnVector { private int ordinal; private ValueVector vector; private ValueVector dictionaryVector; - private static BufferAllocator OffRecordAllocator = null; + private static BufferAllocator OffRecordAllocator = SparkMemoryUtils.globalAllocator(); public static BufferAllocator getAllocator() { return SparkMemoryUtils.contextAllocator(); } public static BufferAllocator getOffRecordAllocator() { - if (OffRecordAllocator == null) { - OffRecordAllocator = new RootAllocator(Long.MAX_VALUE); - } return OffRecordAllocator; } public static AtomicLong vectorCount = new AtomicLong(0); diff --git a/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java b/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java index 43636958e..ec176ec5c 100644 --- a/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java +++ b/core/src/main/java/com/intel/oap/vectorized/BatchIterator.java @@ -29,7 +29,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBuffer; import org.apache.arrow.vector.ipc.message.MessageSerializer; -public class BatchIterator { +public class BatchIterator implements AutoCloseable { private native boolean nativeHasNext(long nativeHandler); private native ArrowRecordBatchBuilder nativeNext(long nativeHandler); private native MetricsObject nativeFetchMetrics(long nativeHandler); @@ -89,7 +89,7 @@ public SerializableObject nextHashRelationObject() return null; } NativeSerializableObject obj = nativeNextHashRelation(nativeHandler); - SerializableObject objImpl = new SerializableObject(obj); + SerializableObject objImpl = new SerializableObject(obj, new AutoCloseable[]{this}); return objImpl; } @@ -196,6 +196,7 @@ public void setDependencies(BatchIterator[] dependencies) { nativeSetDependencies(nativeHandler, instanceIdList); } + @Override public void close() { if (!closed) { nativeClose(nativeHandler); diff --git a/core/src/main/java/com/intel/oap/vectorized/SerializableObject.java b/core/src/main/java/com/intel/oap/vectorized/SerializableObject.java index 5bded90e0..e40804e38 100644 --- a/core/src/main/java/com/intel/oap/vectorized/SerializableObject.java +++ b/core/src/main/java/com/intel/oap/vectorized/SerializableObject.java @@ -28,13 +28,15 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; +import org.apache.arrow.util.AutoCloseables; /** ArrowBufBuilder. */ public class SerializableObject implements Externalizable, KryoSerializable { public int total_size; public int[] size; private ByteBuf[] directAddrs; - private ByteBufAllocator allocator; + + private transient AutoCloseable[] resources = null; public SerializableObject() {} @@ -44,7 +46,7 @@ public SerializableObject() {} * @param memoryAddress native ArrowBuf data addr. * @param size ArrowBuf size. */ - public SerializableObject(long[] memoryAddress, int[] size) throws IOException { + public SerializableObject(long[] memoryAddress, int[] size, AutoCloseable[] resources) throws IOException { this.total_size = 0; this.size = size; directAddrs = new ByteBuf[size.length]; @@ -52,17 +54,12 @@ public SerializableObject(long[] memoryAddress, int[] size) throws IOException { this.total_size += size[i]; directAddrs[i] = Unpooled.wrappedBuffer(memoryAddress[i], size[i], false); } + this.resources = resources; } - /** - * Create an instance for NativeSerializableObject. - * - * @param memoryAddress native ArrowBuf data addr. - * @param size ArrowBuf size. - */ - public SerializableObject(NativeSerializableObject obj) + public SerializableObject(NativeSerializableObject obj, AutoCloseable[] resources) throws IOException, ClassNotFoundException { - this(obj.memoryAddress, obj.size); + this(obj.memoryAddress, obj.size, resources); } @Override @@ -70,7 +67,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept this.total_size = in.readInt(); int size_len = in.readInt(); this.size = (int[]) in.readObject(); - allocator = UnpooledByteBufAllocator.DEFAULT; + ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT; directAddrs = new ByteBuf[size_len]; for (int i = 0; i < size.length; i++) { directAddrs[i] = allocator.directBuffer(size[i], size[i]); @@ -104,7 +101,7 @@ public void read(Kryo kryo, Input in) { this.total_size = in.readInt(); int size_len = in.readInt(); this.size = in.readInts(size_len); - allocator = UnpooledByteBufAllocator.DEFAULT; + ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT; directAddrs = new ByteBuf[size_len]; for (int i = 0; i < size.length; i++) { directAddrs[i] = allocator.directBuffer(size[i], size[i]); @@ -141,6 +138,7 @@ public void write(Kryo kryo, Output out) { public void close() { releaseDirectMemory(); + releaseResources(); } public long[] getDirectMemoryAddrs() throws IOException { @@ -154,7 +152,7 @@ public long[] getDirectMemoryAddrs() throws IOException { return addrs; } - public void releaseDirectMemory() { + private void releaseDirectMemory() { if (directAddrs != null) { for (int i = 0; i < directAddrs.length; i++) { directAddrs[i].release(); @@ -162,10 +160,14 @@ public void releaseDirectMemory() { } } - public int getRefCnt() { - if (directAddrs != null) { - return directAddrs[0].refCnt(); + private void releaseResources() { + if (resources == null) { + return; + } + try { + AutoCloseables.close(resources); + } catch (Exception e) { + throw new RuntimeException(e); } - return 0; } } \ No newline at end of file diff --git a/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala index 9dce65b4a..3e58bb8f5 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala @@ -54,7 +54,7 @@ class ColumnarPluginConfig(conf: SparkConf) { conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", defaultValue = false) val tmpFile: String = conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null) - val broadcastCacheTimeout: Int = + @deprecated val broadcastCacheTimeout: Int = conf.getInt("spark.sql.columnar.sort.broadcast.cache.timeout", defaultValue = -1) val hashCompare: Boolean = conf.getBoolean("spark.oap.sql.columnar.hashCompare", defaultValue = false) diff --git a/core/src/main/scala/com/intel/oap/execution/BroadcastColumnarRDD.scala b/core/src/main/scala/com/intel/oap/execution/BroadcastColumnarRDD.scala index 7a4246aa1..72aca6eed 100644 --- a/core/src/main/scala/com/intel/oap/execution/BroadcastColumnarRDD.scala +++ b/core/src/main/scala/com/intel/oap/execution/BroadcastColumnarRDD.scala @@ -17,15 +17,11 @@ package com.intel.oap.execution -import com.intel.oap.ColumnarPluginConfig -import com.intel.oap.expression.ConverterUtils import com.intel.oap.vectorized.CloseableColumnBatchIterator import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch private final case class BroadcastColumnarRDDPartition(index: Int) extends Partition @@ -41,9 +37,7 @@ case class BroadcastColumnarRDD( (0 until numPartitioning).map { index => new BroadcastColumnarRDDPartition(index) }.toArray } override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { - val timeout: Int = SQLConf.get.broadcastTimeout.toInt val relation = inputByteBuf.value.asReadOnlyCopy - relation.countDownClose(timeout) new CloseableColumnBatchIterator(relation.getColumnarBatchAsIter) } } diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala index 675010cc0..dffbf5c88 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala @@ -17,47 +17,26 @@ package com.intel.oap.execution -import java.io.{ByteArrayInputStream, ObjectInputStream} -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit._ - -import com.intel.oap.vectorized._ +import com.google.common.collect.Lists import com.intel.oap.ColumnarPluginConfig -import org.apache.spark.TaskContext +import com.intel.oap.expression._ +import com.intel.oap.vectorized.{ExpressionEvaluator, _} +import org.apache.arrow.gandiva.expression._ +import org.apache.arrow.vector.types.pojo.{ArrowType, Field} import org.apache.spark.rdd.RDD -import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetrics - -import scala.collection.JavaConverters._ -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.BoundReference -import org.apache.spark.sql.catalyst.expressions.BindReferences._ import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils} +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import org.apache.arrow.vector.ipc.message.ArrowFieldNode -import org.apache.arrow.vector.ipc.message.ArrowRecordBatch -import org.apache.arrow.vector.types.pojo.ArrowType -import org.apache.arrow.vector.types.pojo.Field -import org.apache.arrow.vector.types.pojo.Schema -import org.apache.arrow.gandiva.expression._ -import org.apache.arrow.gandiva.evaluator._ -import io.netty.buffer.ArrowBuf -import io.netty.buffer.ByteBuf -import com.google.common.collect.Lists -import com.intel.oap.expression._ -import com.intel.oap.vectorized.ExpressionEvaluator -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils -import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec -import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin} -import org.apache.spark.sql.types.{StructField, StructType} /** * Performs a hash join of two child relations by first shuffling the data using the join keys. @@ -260,8 +239,6 @@ case class ColumnarBroadcastHashJoinExec( var eval_elapse: Long = 0 val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]() - val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout - streamedPlan.executeColumnar().mapPartitions { iter => ExecutorManager.tryTaskSet(numaBindingInfo) val hashRelationKernel = new ExpressionEvaluator() @@ -283,6 +260,7 @@ case class ColumnarBroadcastHashJoinExec( Field.nullable("result", new ArrowType.Int(32, true))) hashRelationKernel.build(hash_relation_schema, Lists.newArrayList(hash_relation_expr), true) val hashRelationResultIterator = hashRelationKernel.finishByIterator() + // we need to set original recordBatch to hashRelationKernel var numRows = 0 while (depIter.hasNext) { @@ -328,7 +306,6 @@ case class ColumnarBroadcastHashJoinExec( hashRelationResultIterator.close nativeKernel.close nativeIterator.close - relation.countDownClose(timeout) } // now we can return this wholestagecodegen iter @@ -451,7 +428,6 @@ case class ColumnarBroadcastHashJoinExec( val listJars = uploadAndListJars(signature) val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]() val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer() - val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout streamedPlan.executeColumnar().mapPartitions { streamIter => ExecutorManager.tryTaskSet(numaBindingInfo) @@ -518,7 +494,6 @@ case class ColumnarBroadcastHashJoinExec( hashRelationResultIterator.close nativeKernel.close nativeIterator.close - relation.countDownClose(timeout) } val resultStructType = ArrowUtils.fromArrowSchema(resCtx.outputSchema) val res = new Iterator[ColumnarBatch] { diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala index 0f58a2f21..c84471f5f 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala @@ -449,7 +449,6 @@ case class ColumnarShuffledHashJoinExec( val signature = getCodeGenSignature(hashTableType) val listJars = uploadAndListJars(signature) - val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer() if (hashTableType == 1) { diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala index 67c78a4ca..a03230e9e 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala @@ -17,33 +17,25 @@ package com.intel.oap.execution -import java.io.{ByteArrayInputStream, ObjectInputStream} +import com.google.common.collect.Lists import com.intel.oap.ColumnarPluginConfig -import com.intel.oap.execution._ import com.intel.oap.expression._ -import com.intel.oap.vectorized._ -import org.apache.spark._ +import com.intel.oap.vectorized.{BatchIterator, ExpressionEvaluator, _} +import org.apache.arrow.gandiva.expression._ +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} -import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager} -import org.apache.arrow.gandiva.expression._ -import org.apache.arrow.vector.types.pojo.ArrowType -import org.apache.arrow.vector.types.pojo.Field -import org.apache.arrow.vector.types.pojo.Schema -import com.intel.oap.vectorized.ExpressionEvaluator -import com.intel.oap.vectorized.BatchIterator -import com.google.common.collect.Lists import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils} -import scala.collection.mutable.ListBuffer import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer case class ColumnarCodegenContext(inputSchema: Schema, outputSchema: Schema, root: TreeNode) {} @@ -264,7 +256,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I val numOutputBatches = child.longMetric("numOutputBatches") val pipelineTime = longMetric("pipelineTime") - val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout var build_elapse: Long = 0 var eval_elapse: Long = 0 @@ -523,7 +514,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I dependentKernelIterators.foreach(_.close) nativeKernel.close nativeIterator.close - relationHolder.foreach(r => r.countDownClose(timeout)) + relationHolder.clear() } SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { close diff --git a/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala b/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala index edce539ad..440b2e55b 100644 --- a/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala +++ b/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala @@ -121,11 +121,16 @@ object ConverterUtils extends Logging { schema = new Schema(vectors.map(_.getValueVector().getField).asJava) MessageSerializer.serialize(channel, schema, option) } - MessageSerializer.serialize( - channel, - ConverterUtils - .createArrowRecordBatch(columnarBatch.numRows, vectors.map(_.getValueVector)), - option) + val batch = ConverterUtils + .createArrowRecordBatch(columnarBatch.numRows, vectors.map(_.getValueVector)) + try { + MessageSerializer.serialize( + channel, + batch, + option) + } finally { + batch.close() + } } } diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index c7686f9c0..53ec53b98 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -5,39 +5,27 @@ import java.util.concurrent._ import com.google.common.collect.Lists import com.intel.oap.expression._ -import com.intel.oap.vectorized.{ArrowWritableColumnVector, BatchIterator, ExpressionEvaluator} -import io.netty.buffer.{ByteBuf, ByteBufAllocator, ByteBufOutputStream} -import java.io.{ObjectOutputStream, OutputStream} -import java.nio.ByteBuffer - -import scala.concurrent.duration.NANOSECONDS -import scala.concurrent.{ExecutionContext, Promise} -import scala.util.control.NonFatal -import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{broadcast, SparkException} +import com.intel.oap.vectorized.{ArrowWritableColumnVector, ExpressionEvaluator} +import org.apache.arrow.gandiva.expression._ +import org.apache.arrow.vector.types.pojo.{ArrowType, Field} +import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} -import org.apache.spark.sql.catalyst.expressions.BoundReference -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{ColumnarHashedRelation, SparkPlan, SQLExecution} -import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, _} +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, _} import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} -import org.apache.spark.TaskContext -import org.apache.spark.util.{SparkFatalException, ThreadUtils} -import org.apache.arrow.vector.ipc.message.ArrowRecordBatch -import org.apache.arrow.vector.types.pojo.ArrowType -import org.apache.arrow.vector.types.pojo.Field -import org.apache.arrow.vector.types.pojo.Schema -import org.apache.arrow.gandiva.expression._ -import org.apache.arrow.gandiva.evaluator._ -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils -import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SparkFatalException +import org.apache.spark.{SparkException, broadcast} + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Promise +import scala.concurrent.duration.NANOSECONDS +import scala.util.control.NonFatal case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) extends Exchange { @@ -139,8 +127,6 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) while (iter.hasNext) { val batch = iter.next if (batch.numRows > 0) { - (0 until batch.numCols).foreach(i => - batch.column(i).asInstanceOf[ArrowWritableColumnVector].retain()) _input += batch numRows += batch.numRows val dep_rb = ConverterUtils.createArrowRecordBatch(batch) @@ -148,9 +134,10 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) ConverterUtils.releaseArrowRecordBatch(dep_rb) } } + // Note: Do not close this object manually. GC will take over that via Cleaner for ColumnarHashedRelation val hashRelationResultIterator = hashRelationKernel.finishByIterator() - val hashRelationObj = hashRelationResultIterator.nextHashRelationObject() + hashRelationKernel.close() relation = new ColumnarHashedRelation(hashRelationObj, _input.toArray, size_raw).asReadOnlyCopy val dataSize = relation.asInstanceOf[ColumnarHashedRelation].size @@ -205,10 +192,6 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) case e: Throwable => promise.failure(e) throw e - } finally { - val timeout: Int = SQLConf.get.broadcastTimeout.toInt - if (relation != null) - relation.asInstanceOf[ColumnarHashedRelation].countDownClose(timeout) } } } diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarHashedRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarHashedRelation.scala index 090178588..f60810d8b 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarHashedRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarHashedRelation.scala @@ -18,17 +18,15 @@ package org.apache.spark.sql.execution import java.io._ -import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} -import com.esotericsoftware.kryo.{Kryo, KryoSerializable} + import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.intel.oap.expression.ConverterUtils import com.intel.oap.vectorized.{ArrowWritableColumnVector, SerializableObject} -import org.apache.spark.util.{KnownSizeEstimation, Utils} -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} -import scala.concurrent.Future -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.{Failure, Success} -import java.util.concurrent.atomic.AtomicBoolean +import org.apache.spark.sql.execution.ColumnarHashedRelation.Deallocator +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.KnownSizeEstimation +import sun.misc.Cleaner class ColumnarHashedRelation( var hashRelationObj: SerializableObject, @@ -37,60 +35,36 @@ class ColumnarHashedRelation( extends Externalizable with KryoSerializable with KnownSizeEstimation { - val refCnt: AtomicInteger = new AtomicInteger() - val closed: AtomicBoolean = new AtomicBoolean() + + createCleaner(hashRelationObj, arrowColumnarBatch) def this() = { this(null, null, 0) } + private def createCleaner(obj: SerializableObject, batch: Array[ColumnarBatch]): Unit = { + if (obj == null && batch == null) { + // no need to clean up + return + } + Cleaner.create(this, new Deallocator(obj, batch)) + } + + def asReadOnlyCopy(): ColumnarHashedRelation = { //new ColumnarHashedRelation(hashRelationObj, arrowColumnarBatch, arrowColumnarBatchSize) - refCnt.incrementAndGet() this } override def estimatedSize: Long = 0 - def close(waitTime: Int): Future[Int] = Future { - Thread.sleep(waitTime * 1000) - if (refCnt.get == 0) { - if (!closed.getAndSet(true)) { - hashRelationObj.close - arrowColumnarBatch.foreach(_.close) - } - } - refCnt.get - } - - def countDownClose(waitTime: Int = -1): Unit = { - val curRefCnt = refCnt.decrementAndGet() - if (waitTime == -1) return - if (curRefCnt == 0) { - close(waitTime).onComplete { - case Success(resRefCnt) => {} - case Failure(e) => - System.err.println(s"Failed to close ColumnarHashedRelation, exception = $e") - } - } - } - - override def finalize(): Unit = { - if (!closed.getAndSet(true)) { - hashRelationObj.close - arrowColumnarBatch.foreach(_.close) - } - } - override def writeExternal(out: ObjectOutput): Unit = { - if (closed.get()) return out.writeObject(hashRelationObj) val rawArrowData = ConverterUtils.convertToNetty(arrowColumnarBatch) out.writeObject(rawArrowData) } override def write(kryo: Kryo, out: Output): Unit = { - if (closed.get()) return kryo.writeObject(out, hashRelationObj) val rawArrowData = ConverterUtils.convertToNetty(arrowColumnarBatch) kryo.writeObject(out, rawArrowData) @@ -102,6 +76,7 @@ class ColumnarHashedRelation( arrowColumnarBatchSize = rawArrowData.length arrowColumnarBatch = ConverterUtils.convertFromNetty(null, new ByteArrayInputStream(rawArrowData)).toArray + createCleaner(hashRelationObj, arrowColumnarBatch) // retain all cols /*arrowColumnarBatch.foreach(cb => { (0 until cb.numCols).toList.foreach(i => @@ -116,6 +91,7 @@ class ColumnarHashedRelation( arrowColumnarBatchSize = rawArrowData.length arrowColumnarBatch = ConverterUtils.convertFromNetty(null, new ByteArrayInputStream(rawArrowData)).toArray + createCleaner(hashRelationObj, arrowColumnarBatch) // retain all cols /*arrowColumnarBatch.foreach(cb => { (0 until cb.numCols).toList.foreach(i => @@ -129,9 +105,6 @@ class ColumnarHashedRelation( } def getColumnarBatchAsIter: Iterator[ColumnarBatch] = { - if (closed.get()) - throw new InvalidObjectException( - s"can't getColumnarBatchAsIter from a deleted ColumnarHashedRelation.") new Iterator[ColumnarBatch] { var idx = 0 val total_len = arrowColumnarBatch.length @@ -147,5 +120,23 @@ class ColumnarHashedRelation( } } } - +} +object ColumnarHashedRelation { + + private class Deallocator ( + var hashRelationObj: SerializableObject, + var arrowColumnarBatch: Array[ColumnarBatch]) extends Runnable { + + override def run(): Unit = { + try { + Option(hashRelationObj).foreach(_.close()) + Option(arrowColumnarBatch).foreach(_.foreach(_.close)) + } catch { + case e: Exception => + // We should suppress all possible errors in Cleaner to prevent JVM from being shut down + System.err.println("ColumnarHashedRelation: Error running deaallocator") + e.printStackTrace() + } + } + } } diff --git a/core/src/test/java/com/intel/oap/tpch/MallocUtils.java b/core/src/test/java/com/intel/oap/tpch/MallocUtils.java new file mode 100644 index 000000000..f1062d76d --- /dev/null +++ b/core/src/test/java/com/intel/oap/tpch/MallocUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.tpch; + +import com.intel.oap.vectorized.JniUtils; + +import java.io.IOException; + +public class MallocUtils { + + static { + try { + JniUtils.getInstance(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Visible for testing: Try turning back allocated native memory to OS. This might have no effect + * when using Jemalloc. + */ + static native void mallocTrim(); + + /** + * Visible for testing: Print malloc statistics. + */ + static native void mallocStats(); +} diff --git a/core/src/test/scala/com/intel/oap/tpch/MemoryUsageTest.scala b/core/src/test/scala/com/intel/oap/tpch/MemoryUsageTest.scala index 769b43d60..eacd56621 100644 --- a/core/src/test/scala/com/intel/oap/tpch/MemoryUsageTest.scala +++ b/core/src/test/scala/com/intel/oap/tpch/MemoryUsageTest.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.codehaus.jackson.map.ObjectMapper import org.knowm.xchart.{BitmapEncoder, XYChartBuilder} @@ -71,7 +72,7 @@ class MemoryUsageTest extends QueryTest with SharedSparkSession { .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") // .set("spark.sql.autoBroadcastJoinThreshold", "1") .set("spark.unsafe.exceptionOnMemoryLeak", "false") - .set("spark.sql.columnar.sort.broadcast.cache.timeout", "600") + .set("spark.network.io.preferDirectBufs", "false") return conf } @@ -484,10 +485,13 @@ class MemoryUsageTest extends QueryTest with SharedSparkSession { createTPCHTables() writeCommentLine("```") writeCommentLine("Before suite starts: %s".format(genReportLine())) - (1 to 5).foreach { executionId => + (1 to 20).foreach { executionId => writeCommentLine("Iteration %d:".format(executionId)) (1 to 22).foreach(i => { runTPCHQuery(i, executionId) + MallocUtils.mallocTrim() + System.gc() + System.gc() writeCommentLine(" Query %d: %s".format(i, genReportLine())) ramMonitor.writeImage(commentImageOutputPath) }) diff --git a/cpp/src/codegen/common/hash_relation.h b/cpp/src/codegen/common/hash_relation.h index 9f3c07eff..273ff27e9 100644 --- a/cpp/src/codegen/common/hash_relation.h +++ b/cpp/src/codegen/common/hash_relation.h @@ -155,6 +155,16 @@ class HashRelation { return arrow::Status::NotImplemented("HashRelation AppendKeyColumn is abstract."); } + arrow::Status Minimize() { + if (hash_table_ == nullptr) { + return arrow::Status::OK(); + } + if (shrinkToFit(hash_table_)) { + return arrow::Status::OK(); + } + return arrow::Status::Invalid("Error minimizing hash table"); + } + arrow::Status AppendKeyColumn( std::shared_ptr in, const std::vector>& payloads) { diff --git a/cpp/src/jni/jni_wrapper.cc b/cpp/src/jni/jni_wrapper.cc index fbdaffeab..e67039079 100644 --- a/cpp/src/jni/jni_wrapper.cc +++ b/cpp/src/jni/jni_wrapper.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -1602,6 +1603,20 @@ JNIEXPORT void JNICALL Java_com_intel_oap_vectorized_ShuffleDecompressionJniWrap decompression_schema_holder_.Erase(schema_holder_id); } +JNIEXPORT void JNICALL +Java_com_intel_oap_tpch_MallocUtils_mallocTrim(JNIEnv* env, jobject obj) { +// malloc_stats_print(statsPrint, nullptr, nullptr); + std::cout << "Calling malloc_trim... " << std::endl; + malloc_trim(0); +} + +JNIEXPORT void JNICALL +Java_com_intel_oap_tpch_MallocUtils_mallocStats(JNIEnv* env, jobject obj) { +// malloc_stats_print(statsPrint, nullptr, nullptr); + std::cout << "Calling malloc_stats... " << std::endl; + malloc_stats(); +} + #ifdef __cplusplus } #endif diff --git a/cpp/src/third_party/row_wise_memory/hashMap.h b/cpp/src/third_party/row_wise_memory/hashMap.h index 1353b9c23..4911839fc 100755 --- a/cpp/src/third_party/row_wise_memory/hashMap.h +++ b/cpp/src/third_party/row_wise_memory/hashMap.h @@ -199,6 +199,18 @@ static inline int getValueFromBytesMapByOffset(unsafeHashMap* hashMap, return KeyAddressOffset; } +static inline bool shrinkToFit(unsafeHashMap* hashMap) { + if (hashMap->cursor >= hashMap->mapSize) { + return true; + } + auto pool = (arrow::MemoryPool*)hashMap->pool; + auto status = pool->Reallocate(hashMap->mapSize, hashMap->cursor, (uint8_t**)&hashMap->bytesMap); + if (status.ok()) { + hashMap->mapSize = hashMap->cursor; + } + return status.ok(); +} + static inline bool growHashBytesMap(unsafeHashMap* hashMap) { std::cout << "growHashBytesMap" << std::endl; int oldSize = hashMap->mapSize;