Skip to content

Commit

Permalink
[SPARK-24713] AppMatser of spark streaming kafka OOM if there are hund…
Browse files Browse the repository at this point in the history
We have hundreds of kafka topics need to be consumed in one application. The application master will throw OOM exception after hanging for nearly half of an hour.

OOM happens in the env with a lot of topics, and it's not convenient to set up such kind of env in the unit test. So I didn't change/add test case.

Author: Yuanbo Liu <[email protected]>
Author: yuanbo <[email protected]>

Closes #21690 from yuanboliu/master.
  • Loading branch information
Yuanbo Liu authored and koeninger committed Jul 13, 2018
1 parent 0ce11d0 commit 0f24c6f
Showing 1 changed file with 2 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
* which would throw off consumer position. Fix position if this happens.
*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {
// don't actually want to consume any messages, so pause all partitions
c.pause(c.assignment())
val msgs = c.poll(0)
if (!msgs.isEmpty) {
// position should be minimum offset per topicpartition
Expand Down Expand Up @@ -204,8 +206,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap

// don't want to consume messages, so pause
c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
Expand Down Expand Up @@ -262,9 +262,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
tp -> c.position(tp)
}.toMap
}

// don't actually want to consume any messages, so pause all partitions
c.pause(currentOffsets.keySet.asJava)
}

override def stop(): Unit = this.synchronized {
Expand Down

0 comments on commit 0f24c6f

Please sign in to comment.