diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index e4f6ba626ebbf..219892dd02648 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -55,13 +55,13 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { } } else { // Calculate how much time we should sleep to bring ourselves to the desired rate. - val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate + val targetTimeInMillis = messagesWrittenSinceSync.toDouble * 1000 / desiredRate val elapsedTimeInMillis = elapsedNanosecs / 1000000 val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis if (sleepTimeInMillis > 0) { logTrace("Natural rate is " + rate + " per second but desired rate is " + desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.") - Thread.sleep(sleepTimeInMillis) + Thread.sleep(sleepTimeInMillis.toInt) } waitToPush() }