Skip to content

Commit

Permalink
[SPARK-12513][STREAMING] SocketReceiver hang in Netcat example
Browse files Browse the repository at this point in the history
Explicitly close client side socket connection before restart socket receiver.

Author: guoxu1231 <[email protected]>
Author: Shawn Guo <[email protected]>

Closes #10464 from guoxu1231/SPARK-12513.
  • Loading branch information
powerLambda authored and srowen committed Jan 4, 2016
1 parent 9fd7a2f commit 962aac4
Showing 1 changed file with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.streaming.dstream

import java.io._
import java.net.{Socket, UnknownHostException}
import java.net.{ConnectException, Socket}

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand Down Expand Up @@ -51,7 +51,20 @@ class SocketReceiver[T: ClassTag](
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {

private var socket: Socket = _

def onStart() {

logInfo(s"Connecting to $host:$port")
try {
socket = new Socket(host, port)
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
}
logInfo(s"Connected to $host:$port")

// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
Expand All @@ -60,37 +73,34 @@ class SocketReceiver[T: ClassTag](
}

def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
// in case restart thread close it twice
synchronized {
if (socket != null) {
socket.close()
socket = null
logInfo(s"Closed socket to $host:$port")
}
}
}

/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
store(iterator.next())
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
onStop()
}
}
}
Expand Down

0 comments on commit 962aac4

Please sign in to comment.