Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24713]AppMatser of spark streaming kafka OOM if there are hund… #21690

Closed
wants to merge 3 commits into from

Conversation

yuanboliu
Copy link
Member

We have hundreds of kafka topics need to be consumed in one application. The application master will throw OOM exception after hanging for nearly half of an hour.

OOM happens in the env with a lot of topics, and it's not convenient to set up such kind of env in the unit test. So I didn't change/add test case.

@yuanboliu
Copy link
Member Author

Can one of the admins verify this patch?

@yuanboliu
Copy link
Member Author

@koeninger Sorry to interrupt, would you please review my patch? Thanks in advance.

@koeninger
Copy link
Contributor

Jenkins, ok to test

@koeninger
Copy link
Contributor

@yuanboliu Can you clarify why repeated pause is necessary?

@SparkQA
Copy link

SparkQA commented Jul 2, 2018

Test build #92528 has finished for PR 21690 at commit 15d03e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yuanboliu
Copy link
Member Author

yuanboliu commented Jul 3, 2018

@koeninger Thanks for your review

The first pause is used to stop poll() in the method paranoidPoll
The second one is attached because of p.partition().
I'm not sure whether the state of pause will be rewritten after these methods are called, so I use pause repeatedly.

@koeninger
Copy link
Contributor

koeninger commented Jul 3, 2018

@yuanboliu From reading KafkaConsumer code, and from testing, I don't see where consumer.position() alone would un-pause topicpartitions. See below. Can you give a counter-example?

I am seeing poll() reset the paused state. When you are having the problem, are you seeing the info level log messages "poll(0) returned messages"?

If that's what's happening, I think the best we can do is call pause() in only one place, the first line of paranoidPoll, e.g.

c.pause(c.assignment)

val msgs = c.poll(0)

Here's what I saw in testing:

scala> c.paused
res34: java.util.Set[org.apache.kafka.common.TopicPartition] = []

scala> c.assignment
res35: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.pause(topics)

scala> c.paused
res37: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.position(tp)
res38: Long = 248

scala> c.paused
res39: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.poll(0)
res40: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe

scala> c.paused
res41: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.position(tp)
res42: Long = 248

scala> c.paused
res43: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.poll(1)
res44: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe

scala> c.paused
res45: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]

scala> c.poll(100)
res46: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = org.apache.kafka.clients.consumer.ConsumerRecords@28e4439b

scala> c.paused
res47: java.util.Set[org.apache.kafka.common.TopicPartition] = []

@yuanboliu
Copy link
Member Author

@koeninger Thanks for your details. Sorry quite busy this week. I will delete the last pause, test the patch on my own cluster this weekend and give feedback asap.

@koeninger
Copy link
Contributor

@yuanboliu
Copy link
Member Author

@koeninger Thanks for your reply. Agree with you. there is no need to to use pause repeatedly.
This is my test without any pause, and the app master stuck for a long time without any process

wechatworkscreenshot_abb443bd-97db-48f9-88a2-e45a65617f80

I will update my patch shortly.

@SparkQA
Copy link

SparkQA commented Jul 7, 2018

Test build #92709 has finished for PR 21690 at commit d1a8c60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yuanboliu
Copy link
Member Author

@koeninger Sorry to interrupt, could you take a look at my patch?

@koeninger
Copy link
Contributor

koeninger commented Jul 12, 2018 via email

@yuanboliu
Copy link
Member Author

After applying this patch, my application can be running successfully. This issue could happen in the case of many topics(hundreds of ) consumed.

@koeninger
Copy link
Contributor

LGTM, merging to master. Thanks!

@asfgit asfgit closed this in 0f24c6f Jul 13, 2018
@yuanboliu
Copy link
Member Author

Thanks very much

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants