Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.
Alexander Kolb edited this page Dec 7, 2015 · 38 revisions

Testing Apache Flink's DataSet and DataStream API with Flinkspector is almost identical, except for data streams working with time characteristics. That's why no separate documentation, for testing these API's, is provided. The chapter on input describes how to define timed input for streaming data flows. Most examples are made for DataSet and DataStream.

##Introduction

The Framework can be utilized directly by using DataSetTestEnvironment or DataStreamTestEnvironment, or the base classes for JUnit.

DataSetTestEnvironment env = 
		DataSetTestEnvironment.createTestEnvironment(1)

DataSet<Integer> dataSet = env.createTestDataSet(asList(1,2,3))
		.map((MapFunction<Integer,Integer>) (value) -> {return value + 1});

ExpectedOutput<Integer> expectedOutput = 
		new ExpectedOutput<Integer>().expectAll(asList(2,3,4))

OutputFormat<Integer> outputFormat = 
		env.createTestOutputFormat(new HamcrestVerifier(expectedOutput)))
dataSet.output(outputFormat)
env.executeTest()

Using one of the test bases for JUnit is more comfortable and better readable:

class Test extends TestBase {

    @org.junit.Test
    public myTest() {
        DataSet<Integer> dataSet = createTestDataSet(asList(1,2,3))
            .map((MapFunction<Integer,Integer>) (value) -> {return value + 1});

        ExpectedOutput<Integer> expectedOutput = 
            new ExpectedOutput<Integer>().expectAll(asList(2,3,4))

        assertDataSet(dataSet, expectedOutput);
    }

}

##Philosophy The concept of Flinkspector is to define a list of input for each input of a transformation and specify expectations for each endpoint.

The best tactic for testing a Flink job, with Flinkspector, is to divide the whole processing logic of the job into smaller processing steps. Do this by bundling multiple transformations into a method.

public static DataStream<Tuple2<Integer, String>> aggregateViews(
		DataStream<Tuple2<Integer, String>> stream) {
	return stream.timeWindowAll(Time.of(20, seconds)).sum(0);
}

You will be rewarded with the opportunity to incrementally test processing steps while the whole logic of the job has not been defined. Side effects include: more comprehensible code and the possibility to easily recompose steps.

Don't use anonymous or private inner classes for MapFunction, FilterFunction etc., even if Flinkspector enables you to test them. Test all functions you've written using separate unit tests.

##Documentation