From 73b6da5a60552277de05b44f0a63646501080a30 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 16 Mar 2016 21:43:38 +0800 Subject: [PATCH 1/6] create st_je --- .../mllib/JavaStreamingTestExample.java | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java new file mode 100644 index 0000000000000..c34fcabcee493 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import org.apache.spark.Accumulator; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.linalg.Matrix; +import org.apache.spark.mllib.linalg.SingularValueDecomposition; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import org.apache.spark.mllib.stat.test.BinarySample; +import org.apache.spark.mllib.stat.test.StreamingTest; +import org.apache.spark.mllib.stat.test.StreamingTestResult; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.StreamingContext; +import org.apache.spark.streaming.Seconds; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.DStream; +import org.apache.spark.util.Utils; +import org.apache.spark.api.java.function.VoidFunction; + +import java.util.LinkedList; + +/** + * Example for Streaming Testing. + */ +public class JavaStreamingTestExample { + public static void main(String[] args) { + if (args.length != 3) { + System.err.println("Usage: JavaStreamingTestExample " + + " "); + System.exit(1); + } + + String dataDir = args[0]; + Duration batchDuration = Seconds.apply(Long.valueOf(args[1])); + int numBatchesTimeout = Integer.valueOf(args[2]); + + SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample"); + JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration); + + ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString()); + + // $example on$ + JavaDStream data = ssc.textFileStream(dataDir).map( + new Function() { + @Override + public BinarySample call(String line) throws Exception { + String[] ts = line.split(","); + boolean label = Boolean.valueOf(ts[0]); + double value = Double.valueOf(ts[1]); + return new BinarySample(label, value); + } + }); + + StreamingTest streamingTest = new StreamingTest() + .setPeacePeriod(0) + .setWindowSize(0) + .setTestMethod("welch"); + + JavaDStream out = streamingTest.registerStream(data); + out.print(); + // $example off$ + + // Stop processing if test becomes significant or we time out + final Accumulator timeoutCounter = + ssc.sparkContext().accumulator(numBatchesTimeout); + + out.foreachRDD(new VoidFunction>() { + @Override + public void call(JavaRDD rdd) throws Exception { + timeoutCounter.add(-1); + + long cntSignificant = rdd.filter(new Function() { + @Override + public Boolean call(StreamingTestResult v) throws Exception { + return v.pValue() < 0.05; + } + }).count(); + + if (timeoutCounter.value() <= 0 || cntSignificant > 0) { + rdd.context().stop(); + } + } + }); + + ssc.start(); + ssc.awaitTermination(); + } +} From c10d5be2c338e822dd8ba561581e2bd7b4a8849b Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 17 Mar 2016 11:28:08 +0800 Subject: [PATCH 2/6] fix nits --- .../mllib/JavaStreamingTestExample.java | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index c34fcabcee493..a8e26f58a960a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -19,31 +19,37 @@ import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.linalg.Matrix; -import org.apache.spark.mllib.linalg.SingularValueDecomposition; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.mllib.linalg.distributed.RowMatrix; import org.apache.spark.mllib.stat.test.BinarySample; import org.apache.spark.mllib.stat.test.StreamingTest; import org.apache.spark.mllib.stat.test.StreamingTestResult; import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.DStream; import org.apache.spark.util.Utils; import org.apache.spark.api.java.function.VoidFunction; -import java.util.LinkedList; - /** - * Example for Streaming Testing. + * Perform streaming testing using Welch's 2-sample t-test on a stream of data, where the data + * stream arrives as text files in a directory. Stops when the two groups are statistically + * significant (p-value < 0.05) or after a user-specified timeout in number of batches is exceeded. + * + * The rows of the text files must be in the form `Boolean, Double`. For example: + * false, -3.92 + * true, 99.32 + * + * Usage: + * JavaStreamingTestExample + * + * To run on your local machine using the directory `dataDir` with 5 seconds between each batch and + * a timeout after 100 insignificant batches, call: + * $ bin/run-example mllib.JavaStreamingTestExample dataDir 5 100 + * + * As you add text files to `dataDir` the significance test wil continually update every + * `batchDuration` seconds until the test becomes significant (p-value < 0.05) or the number of + * batches processed exceeds `numBatchesTimeout`. */ public class JavaStreamingTestExample { public static void main(String[] args) { From 3458acaf4ca6ae44ae4d5cc740870cd3d7d29f98 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 17 Mar 2016 12:12:07 +0800 Subject: [PATCH 3/6] add to md --- docs/mllib-statistics.md | 5 +++++ .../spark/examples/mllib/JavaStreamingTestExample.java | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index 652d215fa8653..9bd0aacbb97f3 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -544,6 +544,11 @@ provides streaming hypothesis testing. {% include_example scala/org/apache/spark/examples/mllib/StreamingTestExample.scala %} + +
+[`StreamingTest`](api/java/index.html#org.apache.spark.mllib.stat.test.StreamingTest) +provides streaming hypothesis testing. +
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index a8e26f58a960a..9690a040f55b9 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -17,19 +17,21 @@ package org.apache.spark.examples.mllib; + import org.apache.spark.Accumulator; -import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.stat.test.BinarySample; import org.apache.spark.mllib.stat.test.StreamingTest; import org.apache.spark.mllib.stat.test.StreamingTestResult; +import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.util.Utils; -import org.apache.spark.api.java.function.VoidFunction; + /** * Perform streaming testing using Welch's 2-sample t-test on a stream of data, where the data From ff56ff56d46db9ee64924c44fb18c03c0ff91e4d Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 17 Mar 2016 12:13:45 +0800 Subject: [PATCH 4/6] add to md --- docs/mllib-statistics.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index 9bd0aacbb97f3..b773031bc72ee 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -548,6 +548,8 @@ provides streaming hypothesis testing.
[`StreamingTest`](api/java/index.html#org.apache.spark.mllib.stat.test.StreamingTest) provides streaming hypothesis testing. + +{% include_example java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java %}
From 77502998be1a2b1335f57c878d43d3dcb0044077 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 17 Mar 2016 16:09:34 +0800 Subject: [PATCH 5/6] add required imports --- .../apache/spark/examples/mllib/JavaStreamingTestExample.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index 9690a040f55b9..44d9d66444ada 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -19,12 +19,16 @@ import org.apache.spark.Accumulator; +// $example on$ import org.apache.spark.api.java.function.VoidFunction; +// $example off$ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; +// $example on$ import org.apache.spark.mllib.stat.test.BinarySample; import org.apache.spark.mllib.stat.test.StreamingTest; import org.apache.spark.mllib.stat.test.StreamingTestResult; +// $example off$ import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Seconds; From f251229b817816dec90ba3019d70367513ea76f2 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 17 Mar 2016 16:40:05 +0800 Subject: [PATCH 6/6] remove unnecessary imports --- .../apache/spark/examples/mllib/JavaStreamingTestExample.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index 44d9d66444ada..2197ef9481a79 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -19,9 +19,7 @@ import org.apache.spark.Accumulator; -// $example on$ import org.apache.spark.api.java.function.VoidFunction; -// $example off$ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; // $example on$