From 731462a04f8e33ac507ad19b4270c783a012a33e Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 25 May 2017 15:47:59 +0800 Subject: [PATCH] [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data ## What changes were proposed in this pull request? Currently, when a task is calling spill() but it receives a killing request from driver (e.g., speculative task), the `TaskMemoryManager` will throw an `OOM` exception. And we don't catch `Fatal` exception when a error caused by `Thread.interrupt`. So for `ClosedByInterruptException`, we should throw `RuntimeException` instead of `OutOfMemoryError`. https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK ## How was this patch tested? Existing unit tests. Author: Xianyang Liu Closes #18090 from ConeyLiu/SPARK-20250. --- .../java/org/apache/spark/memory/TaskMemoryManager.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 5f91411749167..761ba9de659d5 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -19,6 +19,7 @@ import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; import java.util.ArrayList; import java.util.BitSet; @@ -184,6 +185,10 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { break; } } + } catch (ClosedByInterruptException e) { + // This called by user to kill a task (e.g: speculative task). + logger.error("error while calling spill() on " + c, e); + throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + c, e); throw new OutOfMemoryError("error while calling spill() on " + c + " : " @@ -201,6 +206,10 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { Utils.bytesToString(released), consumer); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } + } catch (ClosedByInterruptException e) { + // This called by user to kill a task (e.g: speculative task). + logger.error("error while calling spill() on " + consumer, e); + throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); throw new OutOfMemoryError("error while calling spill() on " + consumer + " : "