Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans #6499

Closed
wants to merge 11 commits into from

Conversation

MechCoder
Copy link
Contributor

Python bindings for StreamingKMeans

Will change status to MRG once docs, tests and examples are updated.

@SparkQA
Copy link

SparkQA commented May 29, 2015

Test build #33738 has finished for PR 6499 at commit 773c555.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@mengxr
Copy link
Contributor

mengxr commented May 29, 2015

cc @freeman-lab

@MechCoder
Copy link
Contributor Author

It's still in WIP yet.

@MechCoder MechCoder changed the title [WIP] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans May 30, 2015
@MechCoder
Copy link
Contributor Author

@mengxr @freeman-lab

I would really like some help here. The StreamingKMeansModel works as it is supposed to. I have added doc tests for those.

However, the trainOn method of StreamingKMeans (https://github.com/apache/spark/pull/6499/files#diff-21c55b407050d37f67a2919470e047ebR373) seems to run indefinitely. I printed 'rdd.collect()' under if rdd, it seems the foreachRDD continues even after all the batches in the dstream are exhausted.

A small example to reproduce.

from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
stk = StreamingKMeans()
initCenters = [[0.0, 0.0], [1.0, 1.0]] 
weights = [1.0, 1.0]
stk.setInitialCenters(initCenters, weights)
dv1, dv2, dv3, dv4 = [-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.9, 0.9]
dvc = [[dv1, dv3], [dv2, dv4]]
dvc = [sc.parallelize(i, 1) for i in dvc]
ssc = StreamingContext(sc, 2.0)
input_stream = ssc.queueStream(dvc)
stk.trainOn(input_stream)
ssc.start()

This does not seem to terminate.

@SparkQA
Copy link

SparkQA commented May 30, 2015

Test build #33807 has finished for PR 6499 at commit 1d8c66a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@MechCoder
Copy link
Contributor Author

I narrowed it down to this small test case.

from pyspark.streaming import StreamingContext
dvc = [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.9, 0.9]]
dvc = [sc.parallelize(i, 1) for i in dvc]
ssc = StreamingContext(sc, 2.0)
input_stream = ssc.queueStream(dvc)

def get_output(rdd):
    print(rdd.collect())
input_stream.foreachRDD(get_output)
ssc.start()

Why does this run indefinitely?

@MechCoder
Copy link
Contributor Author

Figured it out thanks to @davies through mail. Will finish this up tomorrow.

@freeman-lab
Copy link
Contributor

Cool, excited to look at this! Can definitely take a pass after you update.

@MechCoder
Copy link
Contributor Author

@freeman-lab @mengxr
Please give a first pass. I've update the PR with a couple of tests.
Also, Jenkins is showing me this error in Python 3 for the doctests. I wish I could understand what's going on, but for that I would need to set up a Python 3 env.
TypeError: must be a unicode character, not bytes, related to this JIRA https://issues.apache.org/jira/browse/SPARK-7379

@SparkQA
Copy link

SparkQA commented Jun 2, 2015

Test build #33985 has finished for PR 6499 at commit 7cc2d07.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 2, 2015

Test build #33987 has finished for PR 6499 at commit eaace71.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@@ -818,6 +830,78 @@ def test_model_transform(self):
self.assertEqual(model.transform([1.0, 2.0, 3.0]), DenseVector([1.0, 2.0, 3.0]))


class StreamingKMeansTest(MLLibStreamingTestCase):
def test_model_params(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies Could you please verify if these are the best way to add MLlib Streaming tests? I copied the idea from the tests in streaming.py.

@SparkQA
Copy link

SparkQA commented Jun 3, 2015

Test build #34090 has finished for PR 6499 at commit 538a7ee.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 3, 2015

Test build #34089 has finished for PR 6499 at commit 756b4d8.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34156 has finished for PR 6499 at commit e12c590.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34158 has finished for PR 6499 at commit 2ffed68.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34190 has finished for PR 6499 at commit 33351ca.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34196 has finished for PR 6499 at commit 2ffed68.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34206 has finished for PR 6499 at commit d37c473.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@MechCoder
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34251 has finished for PR 6499 at commit d37c473.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):


{% highlight python %}
model.trainOn(trainingData)
model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no output from this example. Call print.

@SparkQA
Copy link

SparkQA commented Jun 18, 2015

Test build #35161 has finished for PR 6499 at commit ff0a1fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 18, 2015

Test build #35163 has finished for PR 6499 at commit 2061a76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@MechCoder
Copy link
Contributor Author

@mengxr updated ! please review.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35232 has finished for PR 6499 at commit 6e87635.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@MechCoder
Copy link
Contributor Author

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35235 has finished for PR 6499 at commit 51052d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35238 has finished for PR 6499 at commit 7722d16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35246 has finished for PR 6499 at commit 7722d16.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@MechCoder
Copy link
Contributor Author

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35269 has finished for PR 6499 at commit 7722d16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@mengxr
Copy link
Contributor

mengxr commented Jun 19, 2015

test this please

@mengxr
Copy link
Contributor

mengxr commented Jun 19, 2015

(run another time in case the streaming tests are flaky)

@mengxr
Copy link
Contributor

mengxr commented Jun 19, 2015

LGTM pending Jenkins.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35289 has finished for PR 6499 at commit 7722d16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StreamingKMeansModel(KMeansModel):
    • class StreamingKMeans(object):

@mengxr
Copy link
Contributor

mengxr commented Jun 19, 2015

Merged into master. Thanks!

@asfgit asfgit closed this in 54976e5 Jun 19, 2015
@MechCoder MechCoder deleted the spark-4118 branch June 19, 2015 19:26
@MechCoder
Copy link
Contributor Author

Thanks for your reviews and help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants