From 7977c2fda6f168ac88a953f8d0dbc42817ff825b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 1 Apr 2015 23:10:23 -0700 Subject: [PATCH] do retry on client side --- .../org/apache/spark/api/python/PythonRDD.scala | 11 +---------- python/pyspark/rdd.py | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 7670a870e860a..19f4c95fcad74 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -613,16 +613,7 @@ private[spark] object PythonRDD extends Logging { setDaemon(true) override def run() { try { - var sock: Socket = null - try { - sock = serverSocket.accept() - } catch { - case e: SocketTimeoutException => - // there is a small chance that the client had connected, so retry - logWarning("Timed out after 4 seconds, retry once") - serverSocket.setSoTimeout(10) - sock = serverSocket.accept() - } + val sock = serverSocket.accept() val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream)) try { writeIteratorToStream(items, out) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index b66d32c9d5b39..e655a0cea3889 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -113,11 +113,23 @@ def _parse_memory(s): def _load_from_socket(port, serializer): sock = socket.socket() - sock.settimeout(5) + sock.settimeout(1) try: sock.connect(("localhost", port)) rf = sock.makefile("rb", 65536) - for item in serializer.load_stream(rf): + iter = serializer.load_stream(rf) + try: + yield next(iter) + except socket.timeout as e: + # the connection is not acknowledged by JVM, retry + # server will be closed after 3 seconds, then it will be refused + for v in _load_from_socket(port, serializer): + yield v + return + + # increase the timeout, because the server side may be slowed down by GC + sock.settimeout(10) + for item in iter: yield item finally: sock.close()