Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
additional tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Kolb committed Feb 18, 2016
1 parent dade2fa commit a05e2bf
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
* <p/>
* <pre>
* {@code
* new MatchTuples<Tuple2<String,Integer>>("name","age")
* new MatchFields<Tuple2<String,Integer>>("name","age")
* .assertThat("age", greaterThan(21))
* .assertThat("name", either(is("fritz")).or(is("peter")))
* .anyOfThem().onEachRecord();
* </pre>
*
* @param <T>
*/
public class MatchTuples<T extends Tuple> extends MatchRecords<T> {
public class MatchFields<T extends Tuple> extends MatchRecords<T> {

/**
* {@link TupleMask} used to map the keys to the inspected tuples.
Expand All @@ -53,18 +53,18 @@ public class MatchTuples<T extends Tuple> extends MatchRecords<T> {
*
* @param mask {@link TupleMask} to use.
* @param <T> type of output
* @return new instance of {@link MatchTuples}
* @return new instance of {@link MatchFields}
*/
public static <T extends Tuple> MatchTuples<T> fromMask(TupleMask<T> mask) {
return new MatchTuples<T>(mask);
public static <T extends Tuple> MatchFields<T> fromMask(TupleMask<T> mask) {
return new MatchFields<T>(mask);
}

/**
* Default Constructor.
*
* @param mask {@link TupleMask} to use.
*/
public MatchTuples(TupleMask<T> mask) {
public MatchFields(TupleMask<T> mask) {
super();
this.mask = mask;
}
Expand All @@ -76,7 +76,7 @@ public MatchTuples(TupleMask<T> mask) {
* @param first key
* @param rest of keys
*/
public MatchTuples(String first, String... rest) {
public MatchFields(String first, String... rest) {
this(new TupleMask<T>(first,rest));
}

Expand All @@ -86,7 +86,7 @@ public MatchTuples(String first, String... rest) {
* @param key of the field
* @param match matcher to use on the field
*/
public MatchTuples<T> assertThat(String key, Matcher match) {
public MatchFields<T> assertThat(String key, Matcher match) {
assertThat(new TupleMatcher<T>(KeyMatcherPair.of(key, match),mask));
return this;
}
Expand All @@ -95,7 +95,7 @@ public MatchTuples<T> assertThat(String key, Matcher match) {
* Add a {@link Matcher} to the list of assertions to verify.
* @param matcher testing the output records
*/
public MatchTuples<T> assertThatRecord(Matcher<? super T> matcher) {
public MatchFields<T> assertThatRecord(Matcher<? super T> matcher) {
super.assertThat(matcher);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.hamcrest.core.Is
import scala.collection.JavaConversions._

class MatchTuplesSpec extends CoreSpec {
"The MatchTuples" should "store a list of [[KeyMatcherPair]]s" in {
"The MatchFields" should "store a list of [[KeyMatcherPair]]s" in {
val matcher = Is.is(1)
val block = new MatchTuples[Fluple3[Int, Int, Int]]("1", "2", "3")
val block = new MatchFields[Fluple3[Int, Int, Int]]("1", "2", "3")
block.assertThat("1", matcher)
block.assertThat("2", matcher)
block.assertThat("2", matcher)
Expand Down Expand Up @@ -88,7 +88,7 @@ class MatchTuplesSpec extends CoreSpec {
trait AssertBlockCase {
val matcher = Is.is(1)
val block =
new MatchTuples[Fluple4[Int, Int, Int, Int]]("1", "2", "3", "4")
new MatchFields[Fluple4[Int, Int, Int, Int]]("1", "2", "3", "4")
block.assertThat("1", matcher)
block.assertThat("2", matcher)
block.assertThat("3", matcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.flinkspector.core.collection.ExpectedRecords;
import org.flinkspector.core.quantify.MatchTuples;
import org.flinkspector.core.quantify.MatchFields;
import org.flinkspector.core.quantify.OutputMatcher;
import org.flinkspector.core.trigger.FinishAtCount;
import org.flinkspector.dataset.DataSetTestBase;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testMap() throws Throwable {
*/
OutputMatcher<Tuple2<String, Integer>> matcher =
//name the values in your tuple with keys:
new MatchTuples<Tuple2<String, Integer>>("name", "value")
new MatchFields<Tuple2<String, Integer>>("name", "value")
//add an assertion using a value and hamcrest matchers
.assertThat("name", isA(String.class))
.assertThat("value", lessThan(5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import _root_.scala.language.implicitConversions
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamTestEnvironment) extends StreamExecutionEnvironment(testEnv) {
class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamTestEnvironment)
extends StreamExecutionEnvironment(testEnv) {


@throws(classOf[Throwable])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
*/
private var testEnv: DataStreamTestEnvironment = _

private var executed = false

override def beforeEach() {
testEnv = DataStreamTestEnvironment.createTestEnvironment(1)
testEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Expand All @@ -55,16 +53,12 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
try super.afterEach() // To be stackable, must call super.afterEach
finally {
testEnv.close()
executed = false
}
}


def executeTest(): Unit = {
if(!executed) {
executed = true
testEnv.executeTest()
}
}

/**
Expand Down Expand Up @@ -172,7 +166,6 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
case None =>
stream.addSink(createTestSink(matcher))
}
executeTest()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package org.flinkspector.scala.datastream
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.api.scala._
import org.flinkspector.core.collection.ExpectedRecords
import org.flinkspector.core.input.InputBuilder
Expand All @@ -38,6 +39,8 @@ class DataStreamSpec extends CoreSpec with FlinkDataStream{

//use the matcher on the datastream
stream should fulfill(expected)
executeTest()

}


Expand All @@ -58,6 +61,7 @@ class DataStreamSpec extends CoreSpec with FlinkDataStream{
field(v.value should be > 4)
}
}
executeTest()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.flinkspector.core.quantify.MatchTuples;
import org.flinkspector.core.quantify.MatchFields;
import org.flinkspector.core.quantify.MatchRecords;
import org.flinkspector.core.quantify.OutputMatcher;
import org.flinkspector.datastream.StreamTestBase;

Expand All @@ -31,7 +32,7 @@

/**
* This example shows how to startWith test input with time characteristics.
* And the usage of {@link MatchTuples} to build an {@link OutputMatcher}.
* And the usage of {@link MatchFields} to build an {@link OutputMatcher}.
* <p/>
* To ensure test cases run in a few seconds the framework sets the time characteristic of the data flow, to
* EventTime. The test source emitting the input, calculates and emits watermarks based on the timestamped input.
Expand Down Expand Up @@ -79,14 +80,14 @@ public void testWindowing() {
.close();

/*
* Creates an OutputMatcher using MatchTuples.
* MatchTuples builds an OutputMatcher working on Tuples.
* Creates an OutputMatcher using MatchFields.
* MatchFields builds an OutputMatcher working on Tuples.
* You assign String identifiers to your Tuple,
* and add hamcrest matchers testing the values.
*/
OutputMatcher<Tuple2<Integer, String>> matcher =
//name the values in your tuple with keys:
new MatchTuples<Tuple2<Integer, String>>("value", "name")
new MatchFields<Tuple2<Integer, String>>("value", "name")
//add an assertion using a value and hamcrest matchers
.assertThat("value", is(3))
.assertThat("name", either(is("fritz")).or(is("peter")))
Expand All @@ -95,6 +96,11 @@ public void testWindowing() {
//define how many records need to fulfill the
.onEachRecord();

OutputMatcher<Tuple2<Integer,String>> records =
new MatchRecords<Tuple2<Integer,String>>()
.assertThat(is(Tuple2.of(3, "fritz")))
.onAnyRecord();

/*
* Use assertStream to map DataStream to an OutputMatcher.
* You're also able to combine OutputMatchers with any
Expand Down

0 comments on commit a05e2bf

Please sign in to comment.