diff --git a/flinkspector-core/src/main/java/org/flinkspector/core/quantify/MatchTuples.java b/flinkspector-core/src/main/java/org/flinkspector/core/quantify/MatchFields.java similarity index 83% rename from flinkspector-core/src/main/java/org/flinkspector/core/quantify/MatchTuples.java rename to flinkspector-core/src/main/java/org/flinkspector/core/quantify/MatchFields.java index ac1b29b..2de482d 100644 --- a/flinkspector-core/src/main/java/org/flinkspector/core/quantify/MatchTuples.java +++ b/flinkspector-core/src/main/java/org/flinkspector/core/quantify/MatchFields.java @@ -33,7 +33,7 @@ *

*

  * {@code
- * new MatchTuples>("name","age")
+ * new MatchFields>("name","age")
  * 		.assertThat("age", greaterThan(21))
  * 		.assertThat("name", either(is("fritz")).or(is("peter")))
  * 		.anyOfThem().onEachRecord();
@@ -41,7 +41,7 @@
  *
  * @param 
  */
-public class MatchTuples extends MatchRecords {
+public class MatchFields extends MatchRecords {
 
 	/**
 	 * {@link TupleMask} used to map the keys to the inspected tuples.
@@ -53,10 +53,10 @@ public class MatchTuples extends MatchRecords {
 	 *
 	 * @param mask {@link TupleMask} to use.
 	 * @param   type of output
-	 * @return new instance of {@link MatchTuples}
+	 * @return new instance of {@link MatchFields}
 	 */
-	public static  MatchTuples fromMask(TupleMask mask) {
-		return new MatchTuples(mask);
+	public static  MatchFields fromMask(TupleMask mask) {
+		return new MatchFields(mask);
 	}
 
 	/**
@@ -64,7 +64,7 @@ public static  MatchTuples fromMask(TupleMask mask) {
 	 *
 	 * @param mask {@link TupleMask} to use.
 	 */
-	public MatchTuples(TupleMask mask) {
+	public MatchFields(TupleMask mask) {
 		super();
 		this.mask = mask;
 	}
@@ -76,7 +76,7 @@ public MatchTuples(TupleMask mask) {
 	 * @param first key
 	 * @param rest of keys
 	 */
-	public MatchTuples(String first, String... rest) {
+	public MatchFields(String first, String... rest) {
 		this(new TupleMask(first,rest));
 	}
 
@@ -86,7 +86,7 @@ public MatchTuples(String first, String... rest) {
 	 * @param key   of the field
 	 * @param match matcher to use on the field
 	 */
-	public MatchTuples assertThat(String key, Matcher match) {
+	public MatchFields assertThat(String key, Matcher match) {
 		assertThat(new TupleMatcher(KeyMatcherPair.of(key, match),mask));
 		return this;
 	}
@@ -95,7 +95,7 @@ public MatchTuples assertThat(String key, Matcher match) {
 	 * Add a {@link Matcher} to the list of assertions to verify.
 	 * @param matcher testing the output records
 	 */
-	public MatchTuples assertThatRecord(Matcher matcher) {
+	public MatchFields assertThatRecord(Matcher matcher) {
 		super.assertThat(matcher);
 		return this;
 	}
diff --git a/flinkspector-core/src/test/scala/org/flinkspector/core/quantify/MatchTuplesSpec.scala b/flinkspector-core/src/test/scala/org/flinkspector/core/quantify/MatchTuplesSpec.scala
index 933b0ba..32b127f 100644
--- a/flinkspector-core/src/test/scala/org/flinkspector/core/quantify/MatchTuplesSpec.scala
+++ b/flinkspector-core/src/test/scala/org/flinkspector/core/quantify/MatchTuplesSpec.scala
@@ -23,9 +23,9 @@ import org.hamcrest.core.Is
 import scala.collection.JavaConversions._
 
 class MatchTuplesSpec extends CoreSpec {
-  "The MatchTuples" should "store a list of [[KeyMatcherPair]]s" in {
+  "The MatchFields" should "store a list of [[KeyMatcherPair]]s" in {
     val matcher = Is.is(1)
-    val block = new MatchTuples[Fluple3[Int, Int, Int]]("1", "2", "3")
+    val block = new MatchFields[Fluple3[Int, Int, Int]]("1", "2", "3")
     block.assertThat("1", matcher)
     block.assertThat("2", matcher)
     block.assertThat("2", matcher)
@@ -88,7 +88,7 @@ class MatchTuplesSpec extends CoreSpec {
   trait AssertBlockCase {
     val matcher = Is.is(1)
     val block =
-      new MatchTuples[Fluple4[Int, Int, Int, Int]]("1", "2", "3", "4")
+      new MatchFields[Fluple4[Int, Int, Int, Int]]("1", "2", "3", "4")
     block.assertThat("1", matcher)
     block.assertThat("2", matcher)
     block.assertThat("3", matcher)
diff --git a/flinkspector-dataset/src/test/java/org/flinkspector/dataset/examples/BatchTest.java b/flinkspector-dataset/src/test/java/org/flinkspector/dataset/examples/BatchTest.java
index 000f4c5..298ccea 100644
--- a/flinkspector-dataset/src/test/java/org/flinkspector/dataset/examples/BatchTest.java
+++ b/flinkspector-dataset/src/test/java/org/flinkspector/dataset/examples/BatchTest.java
@@ -20,7 +20,7 @@
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.flinkspector.core.collection.ExpectedRecords;
-import org.flinkspector.core.quantify.MatchTuples;
+import org.flinkspector.core.quantify.MatchFields;
 import org.flinkspector.core.quantify.OutputMatcher;
 import org.flinkspector.core.trigger.FinishAtCount;
 import org.flinkspector.dataset.DataSetTestBase;
@@ -77,7 +77,7 @@ public void testMap() throws Throwable {
 		 */
 		OutputMatcher> matcher =
 				//name the values in your tuple with keys:
-				new MatchTuples>("name", "value")
+				new MatchFields>("name", "value")
 						//add an assertion using a value and hamcrest matchers
 						.assertThat("name", isA(String.class))
 						.assertThat("value", lessThan(5))
diff --git a/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/DataStreamTestEnvironment.scala b/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/DataStreamTestEnvironment.scala
index 2eba485..c615720 100644
--- a/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/DataStreamTestEnvironment.scala
+++ b/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/DataStreamTestEnvironment.scala
@@ -32,7 +32,8 @@ import _root_.scala.language.implicitConversions
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
-class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamTestEnvironment) extends StreamExecutionEnvironment(testEnv) {
+class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamTestEnvironment)
+  extends StreamExecutionEnvironment(testEnv) {
 
 
   @throws(classOf[Throwable])
diff --git a/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/FlinkDataStream.scala b/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/FlinkDataStream.scala
index 0a571f4..0efb65d 100644
--- a/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/FlinkDataStream.scala
+++ b/flinkspector-datastream-scala_2.11/src/main/scala/org/flinkspector/scala/datastream/FlinkDataStream.scala
@@ -43,8 +43,6 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
    */
   private var testEnv: DataStreamTestEnvironment = _
 
-  private var executed = false
-
   override def beforeEach() {
     testEnv = DataStreamTestEnvironment.createTestEnvironment(1)
     testEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -55,16 +53,12 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
     try super.afterEach() // To be stackable, must call super.afterEach
     finally {
       testEnv.close()
-      executed = false
     }
   }
 
 
   def executeTest(): Unit = {
-    if(!executed) {
-      executed = true
       testEnv.executeTest()
-    }
   }
 
   /**
@@ -172,7 +166,6 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
           case None =>
             stream.addSink(createTestSink(matcher))
         }
-        executeTest()
       }
   }
 
diff --git a/flinkspector-datastream-scala_2.11/src/test/scala/org/flinkspector/scala/datastream/DataStreamSpec.scala b/flinkspector-datastream-scala_2.11/src/test/scala/org/flinkspector/scala/datastream/DataStreamSpec.scala
index 75e7e9c..aa1b8e6 100644
--- a/flinkspector-datastream-scala_2.11/src/test/scala/org/flinkspector/scala/datastream/DataStreamSpec.scala
+++ b/flinkspector-datastream-scala_2.11/src/test/scala/org/flinkspector/scala/datastream/DataStreamSpec.scala
@@ -15,6 +15,7 @@ package org.flinkspector.scala.datastream
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 import org.apache.flink.api.scala._
 import org.flinkspector.core.collection.ExpectedRecords
 import org.flinkspector.core.input.InputBuilder
@@ -38,6 +39,8 @@ class DataStreamSpec extends CoreSpec with FlinkDataStream{
 
     //use the matcher on the datastream
     stream should fulfill(expected)
+    executeTest()
+
   }
 
 
@@ -58,6 +61,7 @@ class DataStreamSpec extends CoreSpec with FlinkDataStream{
         field(v.value should be > 4)
       }
     }
+    executeTest()
   }
 }
 
diff --git a/flinkspector-datastream/src/test/java/org/flinkspector/datastream/examples/WindowingTest.java b/flinkspector-datastream/src/test/java/org/flinkspector/datastream/examples/WindowingTest.java
index bfdeaa6..89557e2 100644
--- a/flinkspector-datastream/src/test/java/org/flinkspector/datastream/examples/WindowingTest.java
+++ b/flinkspector-datastream/src/test/java/org/flinkspector/datastream/examples/WindowingTest.java
@@ -18,7 +18,8 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.flinkspector.core.quantify.MatchTuples;
+import org.flinkspector.core.quantify.MatchFields;
+import org.flinkspector.core.quantify.MatchRecords;
 import org.flinkspector.core.quantify.OutputMatcher;
 import org.flinkspector.datastream.StreamTestBase;
 
@@ -31,7 +32,7 @@
 
 /**
  * This example shows how to startWith test input with time characteristics.
- * And the usage of {@link MatchTuples} to build an {@link OutputMatcher}.
+ * And the usage of {@link MatchFields} to build an {@link OutputMatcher}.
  * 

* To ensure test cases run in a few seconds the framework sets the time characteristic of the data flow, to * EventTime. The test source emitting the input, calculates and emits watermarks based on the timestamped input. @@ -79,14 +80,14 @@ public void testWindowing() { .close(); /* - * Creates an OutputMatcher using MatchTuples. - * MatchTuples builds an OutputMatcher working on Tuples. + * Creates an OutputMatcher using MatchFields. + * MatchFields builds an OutputMatcher working on Tuples. * You assign String identifiers to your Tuple, * and add hamcrest matchers testing the values. */ OutputMatcher> matcher = //name the values in your tuple with keys: - new MatchTuples>("value", "name") + new MatchFields>("value", "name") //add an assertion using a value and hamcrest matchers .assertThat("value", is(3)) .assertThat("name", either(is("fritz")).or(is("peter"))) @@ -95,6 +96,11 @@ public void testWindowing() { //define how many records need to fulfill the .onEachRecord(); + OutputMatcher> records = + new MatchRecords>() + .assertThat(is(Tuple2.of(3, "fritz"))) + .onAnyRecord(); + /* * Use assertStream to map DataStream to an OutputMatcher. * You're also able to combine OutputMatchers with any