From 962aac4db99f3988c07ccb23439327c18ec178f1 Mon Sep 17 00:00:00 2001 From: guoxu1231 Date: Mon, 4 Jan 2016 14:23:07 +0000 Subject: [PATCH] [SPARK-12513][STREAMING] SocketReceiver hang in Netcat example Explicitly close client side socket connection before restart socket receiver. Author: guoxu1231 Author: Shawn Guo Closes #10464 from guoxu1231/SPARK-12513. --- .../dstream/SocketInputDStream.scala | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 10644b9201918..e70fc87c39d95 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import java.io._ -import java.net.{Socket, UnknownHostException} +import java.net.{ConnectException, Socket} import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -51,7 +51,20 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { + + logInfo(s"Connecting to $host:$port") + try { + socket = new Socket(host, port) + } catch { + case e: ConnectException => + restart(s"Error connecting to $host:$port", e) + return + } + logInfo(s"Connected to $host:$port") + // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) @@ -60,20 +73,22 @@ class SocketReceiver[T: ClassTag]( } def onStop() { - // There is nothing much to do as the thread calling receive() - // is designed to stop by itself isStopped() returns false + // in case restart thread close it twice + synchronized { + if (socket != null) { + socket.close() + socket = null + logInfo(s"Closed socket to $host:$port") + } + } } /** Create a socket connection and receive data until receiver is stopped */ def receive() { - var socket: Socket = null try { - logInfo("Connecting to " + host + ":" + port) - socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { - store(iterator.next) + store(iterator.next()) } if (!isStopped()) { restart("Socket data stream had no more data") @@ -81,16 +96,11 @@ class SocketReceiver[T: ClassTag]( logInfo("Stopped receiving") } } catch { - case e: java.net.ConnectException => - restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { - if (socket != null) { - socket.close() - logInfo("Closed socket to " + host + ":" + port) - } + onStop() } } }