diff --git a/flinkspector-core/src/main/java/org/flinkspector/core/runtime/Runner.java b/flinkspector-core/src/main/java/org/flinkspector/core/runtime/Runner.java index 4fbb7c4..1b183b6 100644 --- a/flinkspector-core/src/main/java/org/flinkspector/core/runtime/Runner.java +++ b/flinkspector-core/src/main/java/org/flinkspector/core/runtime/Runner.java @@ -268,10 +268,8 @@ public int registerListener(OutputVerifier verifier, @Override public void onSuccess(ResultState state) { - System.out.println("Success"); if(state != ResultState.SUCCESS) { if (runningListeners.decrementAndGet() == 0) { - System.out.println("kill-s"); stopExecution(); } } @@ -280,9 +278,7 @@ public void onSuccess(ResultState state) { @Override public void onFailure(Throwable throwable) { //check if other sockets are still running - System.out.println("Failure"); if (runningListeners.decrementAndGet() == 0) { - System.out.println("kill-f"); stopExecution(); } } diff --git a/flinkspector-dataset/src/test/scala/org/flinkspector/dataset/DataSetEnvironmentSpec.scala b/flinkspector-dataset/src/test/scala/org/flinkspector/dataset/DataSetEnvironmentSpec.scala index bacf452..4e3543e 100644 --- a/flinkspector-dataset/src/test/scala/org/flinkspector/dataset/DataSetEnvironmentSpec.scala +++ b/flinkspector-dataset/src/test/scala/org/flinkspector/dataset/DataSetEnvironmentSpec.scala @@ -26,6 +26,7 @@ import org.flinkspector.core.trigger.VerifyFinishedTrigger import org.scalatest.exceptions.TestFailedException import scala.collection.JavaConversions._ +import org.scalatest.time.SpanSugar._ class DataSetEnvironmentSpec extends CoreSpec { @@ -107,7 +108,8 @@ class DataSetEnvironmentSpec extends CoreSpec { }) val outputFormat = env.createTestOutputFormat(happyVerifier) dataSet.output(outputFormat) - env.executeTest() + + env.executeTest() //check if a forceful stop was invoked env.hasBeenStopped shouldBe true @@ -135,7 +137,10 @@ class DataSetEnvironmentSpec extends CoreSpec { val outputFormat = env.createTestOutputFormat(sadVerifier) dataSet.output(outputFormat) // failAfter(Span(10,Seconds)) { + an [FlinkTestFailedException] shouldBe thrownBy (env.executeTest()) + + // } //check if a forceful stop was invoked env.hasBeenStopped shouldBe true diff --git a/flinkspector-datastream/src/test/scala/org/flinkspector/datastream/StreamTestEnvironmentSpec.scala b/flinkspector-datastream/src/test/scala/org/flinkspector/datastream/StreamTestEnvironmentSpec.scala index bec099c..ea8cb43 100644 --- a/flinkspector-datastream/src/test/scala/org/flinkspector/datastream/StreamTestEnvironmentSpec.scala +++ b/flinkspector-datastream/src/test/scala/org/flinkspector/datastream/StreamTestEnvironmentSpec.scala @@ -113,7 +113,7 @@ class StreamTestEnvironmentSpec extends CoreSpec { val sink = env.createTestSink(happyVerifier) source.addSink(sink) - env.executeTest() + env.executeTest() //check if a forceful stop was invoked env.hasBeenStopped shouldBe true @@ -139,9 +139,9 @@ class StreamTestEnvironmentSpec extends CoreSpec { }) val sink = env.createTestSink(sadVerifier) source.addSink(sink) - failAfter(Span(3,Seconds)) { + an [FlinkTestFailedException] shouldBe thrownBy (env.executeTest()) - } + //check if a forceful stop was invoked env.hasBeenStopped shouldBe true