Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector #2991

Closed
wants to merge 23 commits into from

Conversation

jerryshao
Copy link
Contributor

Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in SPARK-4062.

@jerryshao
Copy link
Contributor Author

Hi @tdas , would you mind taking a look at this? Thanks a lot.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22425 has started for PR 2991 at commit 5cc4cb1.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22425 has finished for PR 2991 at commit 5cc4cb1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22425/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22438 has started for PR 2991 at commit 09d57c5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22438 has finished for PR 2991 at commit 09d57c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22438/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22446 has started for PR 2991 at commit bb57d05.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22446 has finished for PR 2991 at commit bb57d05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22446/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Oct 30, 2014

@jerryshao Thanks for this change! I am taking a look at this. But the code does not merge. So please update the code to the master branch.

@jerryshao
Copy link
Contributor Author

OK, will do :).

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22595 has started for PR 2991 at commit 48752b2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22595 has finished for PR 2991 at commit 48752b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22595/
Test PASSed.

@jerryshao
Copy link
Contributor Author

Hi @tdas , would you mind reviewing this code? Thanks a lot.

waitToPush()
currentBuffer += data
listener.onStoreData(data, metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of calling onStore data right after inserting into the buffer? If something needs to be done right after insertion then that can be done in the caller class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason of putting onStoreData here is that this is synchronized with the same mutex, if we put this into caller class, we should use another mutex, this will add some other synchronization logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually adding synch logic in caller is more obvious, and introduces minimum extra logic in the BlockGenerator, keeping the BlockGenerator more generic. It is tricky to document stuff like "method 1 and method 2 of BlockGenerator are called in the same lock, but not other methods".

@tdas
Copy link
Contributor

tdas commented Nov 7, 2014

I have made some high level comments, but I am going to do another deeper roudn to understand the logic and correctness. In the mean time, since the WAL has been merged, could you test whether this patch is working as expected with the WAL turned on?

@jerryshao
Copy link
Contributor Author

Thanks a lot for your review, I will address the above comments, besides we are under test with StreamBench, the result will return to you later.

private lazy val blockOffsetMap =
new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]

private lazy val blockGeneratorListener = new BlockGeneratorListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to define the named class for this generator listener.

@SparkQA
Copy link

SparkQA commented Nov 10, 2014

Test build #23142 has started for PR 2991 at commit c174454.

  • This patch merges cleanly.

@tdas
Copy link
Contributor

tdas commented Nov 13, 2014

@jerryshao Here is another round of changes from me.
You correctly identified a flaw in the lock logic in the last change I made. I played around with different implementations, and I came up with two implementation that I think are correct, and tries to preserve the BlockGenerator performance for existing receivers. Both of them are pulls requests on your repo.

  1. Refactor 1, Refactor 1: Achieve deadlock-free lock ordering by subclassing BlockGenerator and taking receiver lock first in BlockGenerator.updateCurrentBuffer jerryshao/apache-spark#7: Here I attempt to fix the flaw by taking receiver lock before updating the buffer. And this is done by extending the BlockGenerator and overriding the updateCurrentBuffer method to take receiver lock first. This ensures deadlock free locking by always taking locks in the same order - receiver lock followed by block generator lock. The default block generator code path is not affected, so other receiver should not be affected either.
  2. Refactor2, Refactor 2: Removed the receiver's locks and essentially reverted to Saisai's original design jerryshao/apache-spark#6: I essentially reverted back to your original proposal. :) As I tried out various different implementations, and finally got "Refactor 1", I realized that it was more complicated than what you proposed. So I reverted back to that, and added a lot of scala docs explaining the behavior. Personally I am in favor of this now.

Besides I also eliminated more duplicate code form the unit tests and merged two unit tests to reduce test run times. Please take a look.

@tdas
Copy link
Contributor

tdas commented Nov 13, 2014

I did more refactoring for Refactoring 2, to create jerryshao#8 . This is what I finally recommend for merging. Please take a look. I have run the 3 kafka testsuites repeatedly for a long time, and they seemed consistently pass.

@jerryshao
Copy link
Contributor Author

OK, I will, thanks a lot, greatly appreciated.

Refactor 3 = Refactor 2 + refactored KafkaStreamSuite further to elimite KafkaTestUtils, and made Java testsuite more robust
@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23345 has started for PR 2991 at commit 5461f1c.

  • This patch merges cleanly.

@tdas
Copy link
Contributor

tdas commented Nov 14, 2014

Lets see if this passes jenkins, I hadnt tried that yet

@jerryshao
Copy link
Contributor Author

Hi TD, this test is so flaky, it fails several times in my local test:

- block addition, block to batch allocation and cleanup with write ahead log *** FAILED *** (21 milliseconds)
[info]   java.io.FileNotFoundException: File /tmp/1415929501402-0/receivedBlockMetadata/log-0-1000 does not exist.
[info]   at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:397)
[info]   at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:324)
[info]   at org.apache.spark.streaming.util.WriteAheadLogSuite$.getLogFilesInDirectory(WriteAheadLogSuite.scala:344)
[info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite.getWriteAheadLogFiles(ReceivedBlockTrackerSuite.scala:226)
[info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite$$anonfun$4.apply$mcV$sp(ReceivedBlockTrackerSuite.scala:171)
[info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite$$anonfun$4.apply(ReceivedBlockTrackerSuite.scala:96)
[info]   at org.apache.spark.streaming.ReceivedBlockTrackerSuite$$anonfun$4.apply(ReceivedBlockTrackerSuite.scala:96)
[info]   at 
....

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23345 has finished for PR 2991 at commit 5461f1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23345/
Test PASSed.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23352 has started for PR 2991 at commit 5461f1c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23352 has finished for PR 2991 at commit 5461f1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23352/
Test PASSed.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23362 has started for PR 2991 at commit 5461f1c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23362 timed out for PR 2991 at commit 5461f1c after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23362/
Test FAILed.

@tdas
Copy link
Contributor

tdas commented Nov 14, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23380 has started for PR 2991 at commit 5461f1c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 14, 2014

Test build #23380 has finished for PR 2991 at commit 5461f1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23380/
Test PASSed.

@tdas
Copy link
Contributor

tdas commented Nov 14, 2014

Alright, I am merging this! Thanks @jerryshao for all the hard work. Though, I encourage you to continue testing this receiver on your side prior to the spark 1.2 release for stability and correctness.

asfgit pushed a commit that referenced this pull request Nov 14, 2014
…afka connector

Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062).

Author: jerryshao <[email protected]>
Author: Tathagata Das <[email protected]>
Author: Saisai Shao <[email protected]>

Closes #2991 from jerryshao/kafka-refactor and squashes the following commits:

5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3
eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust.
fab14c7 [Tathagata Das] minor update.
149948b [Tathagata Das] Fixed mistake
14630aa [Tathagata Das] Minor updates.
d9a452c [Tathagata Das] Minor updates.
ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design.
2a20a01 [jerryshao] Address some comments
9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor
b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites
e501b3c [jerryshao] Add Mima excludes
b798535 [jerryshao] Fix the missed issue
e5e21c1 [jerryshao] Change to while loop
ea873e4 [jerryshao] Further address the comments
98f3d07 [jerryshao] Fix comment style
4854ee9 [jerryshao] Address all the comments
96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test
8135d31 [jerryshao] Fix flaky test
a949741 [jerryshao] Address the comments
16bfe78 [jerryshao] Change the ordering of imports
0894aef [jerryshao] Add some comments
77c3e50 [jerryshao] Code refactor and add some unit tests
dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver

(cherry picked from commit 5930f64)
Signed-off-by: Tathagata Das <[email protected]>
@asfgit asfgit closed this in 5930f64 Nov 14, 2014
@jerryshao
Copy link
Contributor Author

Yes, I will, thanks a lot, greatly appreciate your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants