From 15d03e63453e72675390c65ddf72492caefdadee Mon Sep 17 00:00:00 2001 From: yuanbo Date: Mon, 2 Jul 2018 11:58:53 +0800 Subject: [PATCH 1/3] [SPARK-24713]AppMatser of spark streaming kafka OOM if there are hundreds of topics consumed --- .../streaming/kafka010/DirectKafkaInputDStream.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index c3221481556f5..a7fa6a9ee0752 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -188,6 +188,9 @@ private[spark] class DirectKafkaInputDStream[K, V]( paranoidPoll(c) val parts = c.assignment().asScala + // no need to read data, so pause + c.pause(parts.asJava) + // make sure new partitions are reflected in currentOffsets val newPartitions = parts.diff(currentOffsets.keySet) @@ -208,7 +211,11 @@ private[spark] class DirectKafkaInputDStream[K, V]( c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) - parts.map(tp => tp -> c.position(tp)).toMap + + val result = parts.map(tp => tp -> c.position(tp)) + // pause here because of c.position(tp) + c.pause(parts.asJava) + result.toMap } // limits the maximum number of messages per partition From 544ecb3962b8536fe5ac8046d39097afb71a5cbf Mon Sep 17 00:00:00 2001 From: Yuanbo Liu Date: Sat, 7 Jul 2018 20:53:32 +0800 Subject: [PATCH 2/3] update patch --- .../kafka010/DirectKafkaInputDStream.scala | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index a7fa6a9ee0752..c03e85fe178ee 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -166,6 +166,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( * which would throw off consumer position. Fix position if this happens. */ private def paranoidPoll(c: Consumer[K, V]): Unit = { + // don't actually want to consume any messages, so pause all partitions + c.pause(c.assignment()) val msgs = c.poll(0) if (!msgs.isEmpty) { // position should be minimum offset per topicpartition @@ -188,9 +190,6 @@ private[spark] class DirectKafkaInputDStream[K, V]( paranoidPoll(c) val parts = c.assignment().asScala - // no need to read data, so pause - c.pause(parts.asJava) - // make sure new partitions are reflected in currentOffsets val newPartitions = parts.diff(currentOffsets.keySet) @@ -207,15 +206,9 @@ private[spark] class DirectKafkaInputDStream[K, V]( // position for new partitions determined by auto.offset.reset if no commit currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap - // don't want to consume messages, so pause - c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) - - val result = parts.map(tp => tp -> c.position(tp)) - // pause here because of c.position(tp) - c.pause(parts.asJava) - result.toMap + parts.map(tp => tp -> c.position(tp)).toMap } // limits the maximum number of messages per partition From d1a8c605e163bc09d1329cbd90560cc5165de555 Mon Sep 17 00:00:00 2001 From: Yuanbo Liu Date: Sat, 7 Jul 2018 20:56:52 +0800 Subject: [PATCH 3/3] update patch-1 --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index c03e85fe178ee..0246006acf0bd 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -262,9 +262,6 @@ private[spark] class DirectKafkaInputDStream[K, V]( tp -> c.position(tp) }.toMap } - - // don't actually want to consume any messages, so pause all partitions - c.pause(currentOffsets.keySet.asJava) } override def stop(): Unit = this.synchronized {