From b838f35867edaafd95c57d5581cbb5ac15bbcdde Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 1 Apr 2015 22:15:43 -0700 Subject: [PATCH 1/3] retry after timeout --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 11 ++++++++++- python/pyspark/rdd.py | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 19f4c95fcad74..7670a870e860a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -613,7 +613,16 @@ private[spark] object PythonRDD extends Logging { setDaemon(true) override def run() { try { - val sock = serverSocket.accept() + var sock: Socket = null + try { + sock = serverSocket.accept() + } catch { + case e: SocketTimeoutException => + // there is a small chance that the client had connected, so retry + logWarning("Timed out after 4 seconds, retry once") + serverSocket.setSoTimeout(10) + sock = serverSocket.accept() + } val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream)) try { writeIteratorToStream(items, out) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c337a43c8a7fc..b66d32c9d5b39 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -113,6 +113,7 @@ def _parse_memory(s): def _load_from_socket(port, serializer): sock = socket.socket() + sock.settimeout(5) try: sock.connect(("localhost", port)) rf = sock.makefile("rb", 65536) From 7977c2fda6f168ac88a953f8d0dbc42817ff825b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 1 Apr 2015 23:10:23 -0700 Subject: [PATCH 2/3] do retry on client side --- .../org/apache/spark/api/python/PythonRDD.scala | 11 +---------- python/pyspark/rdd.py | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 7670a870e860a..19f4c95fcad74 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -613,16 +613,7 @@ private[spark] object PythonRDD extends Logging { setDaemon(true) override def run() { try { - var sock: Socket = null - try { - sock = serverSocket.accept() - } catch { - case e: SocketTimeoutException => - // there is a small chance that the client had connected, so retry - logWarning("Timed out after 4 seconds, retry once") - serverSocket.setSoTimeout(10) - sock = serverSocket.accept() - } + val sock = serverSocket.accept() val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream)) try { writeIteratorToStream(items, out) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index b66d32c9d5b39..e655a0cea3889 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -113,11 +113,23 @@ def _parse_memory(s): def _load_from_socket(port, serializer): sock = socket.socket() - sock.settimeout(5) + sock.settimeout(1) try: sock.connect(("localhost", port)) rf = sock.makefile("rb", 65536) - for item in serializer.load_stream(rf): + iter = serializer.load_stream(rf) + try: + yield next(iter) + except socket.timeout as e: + # the connection is not acknowledged by JVM, retry + # server will be closed after 3 seconds, then it will be refused + for v in _load_from_socket(port, serializer): + yield v + return + + # increase the timeout, because the server side may be slowed down by GC + sock.settimeout(10) + for item in iter: yield item finally: sock.close() From e5a51a21b90eecafe0e5677a529ad12cf9a1033d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 1 Apr 2015 23:51:05 -0700 Subject: [PATCH 3/3] remove setReuseAddress --- .../org/apache/spark/api/python/PythonRDD.scala | 1 - python/pyspark/rdd.py | 16 ++-------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 19f4c95fcad74..36cf2af0857dd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -605,7 +605,6 @@ private[spark] object PythonRDD extends Logging { */ private def serveIterator[T](items: Iterator[T], threadName: String): Int = { val serverSocket = new ServerSocket(0, 1) - serverSocket.setReuseAddress(true) // Close the socket if no connection in 3 seconds serverSocket.setSoTimeout(3000) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e655a0cea3889..2d05611321ed6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -113,23 +113,11 @@ def _parse_memory(s): def _load_from_socket(port, serializer): sock = socket.socket() - sock.settimeout(1) + sock.settimeout(3) try: sock.connect(("localhost", port)) rf = sock.makefile("rb", 65536) - iter = serializer.load_stream(rf) - try: - yield next(iter) - except socket.timeout as e: - # the connection is not acknowledged by JVM, retry - # server will be closed after 3 seconds, then it will be refused - for v in _load_from_socket(port, serializer): - yield v - return - - # increase the timeout, because the server side may be slowed down by GC - sock.settimeout(10) - for item in iter: + for item in serializer.load_stream(rf): yield item finally: sock.close()