From d5d86f642c3346d70d0c4ce60374871e923f5be7 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 4 May 2015 16:15:32 -0700 Subject: [PATCH] Fix incorrect lastErrorTime --- .../spark/streaming/scheduler/ReceiverTracker.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index e1d80af9ac062..f6852bd26509d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -154,12 +154,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false private def deregisterReceiver(streamId: Int, message: String, error: String) { val newReceiverInfo = receiverInfo.get(streamId) match { case Some(oldInfo) => + val lastErrorTime = + if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis() oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, - lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis()) + lastError = error, lastErrorTime = lastErrorTime) case None => logWarning("No prior receiver info") + val lastErrorTime = + if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis() ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, - lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis()) + lastError = error, lastErrorTime = lastErrorTime) } receiverInfo -= streamId listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))