From 672209f2909a95e891f3c779bfb2f0e534239851 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Mon, 28 May 2018 10:50:17 +0800 Subject: [PATCH] [SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean shutdown of Arrow memory allocator ## What changes were proposed in this pull request? There is a race condition of closing Arrow VectorSchemaRoot and Allocator in the writer thread of ArrowPythonRunner. The race results in memory leak exception when closing the allocator. This patch removes the closing routine from the TaskCompletionListener and make the writer thread responsible for cleaning up the Arrow memory. This issue be reproduced by this test: ``` def test_memory_leak(self): from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode # Have all data in a single executor thread so it can trigger the race condition easier with self.sql_conf({'spark.sql.shuffle.partitions': 1}): df = self.spark.range(0, 1000) df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \ .withColumn('id', explode(col('id'))) \ .withColumn('v', array([lit(i) for i in range(0, 1000)])) pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) def foo(pdf): xxx return pdf result = df.groupby('id').apply(foo) with QuietTest(self.sc): with self.assertRaises(py4j.protocol.Py4JJavaError) as context: result.count() self.assertTrue('Memory leaked' not in str(context.exception)) ``` Note: Because of the race condition, the test case cannot reproduce the issue reliably so it's not added to test cases. ## How was this patch tested? Because of the race condition, the bug cannot be unit test easily. So far it has only happens on large amount of data. This is currently tested manually. Author: Li Jin Closes #21397 from icexelloss/SPARK-24334-arrow-memory-leak. --- .../execution/python/ArrowPythonRunner.scala | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 5fcdcddca7d51..01e19bddbfb66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -70,19 +70,13 @@ class ArrowPythonRunner( val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"stdout writer for $pythonExec", 0, Long.MaxValue) - val root = VectorSchemaRoot.create(arrowSchema, allocator) - val arrowWriter = ArrowWriter.create(root) - - context.addTaskCompletionListener { _ => - root.close() - allocator.close() - } - - val writer = new ArrowStreamWriter(root, null, dataOut) - writer.start() Utils.tryWithSafeFinally { + val arrowWriter = ArrowWriter.create(root) + val writer = new ArrowStreamWriter(root, null, dataOut) + writer.start() + while (inputIterator.hasNext) { val nextBatch = inputIterator.next() @@ -94,8 +88,21 @@ class ArrowPythonRunner( writer.writeBatch() arrowWriter.reset() } - } { + // end writes footer to the output stream and doesn't clean any resources. + // It could throw exception if the output stream is closed, so it should be + // in the try block. writer.end() + } { + // If we close root and allocator in TaskCompletionListener, there could be a race + // condition where the writer thread keeps writing to the VectorSchemaRoot while + // it's being closed by the TaskCompletion listener. + // Closing root and allocator here is cleaner because root and allocator is owned + // by the writer thread and is only visible to the writer thread. + // + // If the writer thread is interrupted by TaskCompletionListener, it should either + // (1) in the try block, in which case it will get an InterruptedException when + // performing io, and goes into the finally block or (2) in the finally block, + // in which case it will ignore the interruption and close the resources. root.close() allocator.close() }