diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index 0774c3f31..090031127 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.TaskContext import org.apache.spark.unsafe.Platform @@ -104,7 +105,9 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child = override def hasNext: Boolean = { rowIterator.hasNext } - + TaskContext.get().addTaskCompletionListener[Unit] { _ => + arrowBuf.close() + } override def next(): ColumnarBatch = { var isUnsafeRow = true var firstRow = InternalRow.apply()