Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Defining Input

Oytun Tez edited this page Mar 22, 2019 · 21 revisions

The shortest way of getting a DataSet or DataStream is to pass a Collection of input records:

import static java.util.Arrays.asList;
//...
DataSet<String> dataSet = 
			createTestDataSet(asList("fritz", "peter", "hans"));
DataStream<String> dataStream =
			createTestStreamWith(asList("fritz", "peter", "hans"));

Using InputBuilder

To specify Input for tests you can use the InputBuilder:

Input<Tuple2<String,Integer>> input = InputBuilder
		.startWith(Tuple2.of("one",1))
		.emit(Tuple2.of("two",2))
		.emit(Tuple2.of("three",3))
		.repeatAll(times(2))
		.emit(Tuple2.of("four",3),times(3));
  • .startWith(record) provides you with an instance of InputBuilder that has the generic type of the passed record.
  • .emit(record) adds a record to the list of input.
  • .repeatAll(int times) repeats all records, added to the input up to this point, a number of times.
  • .emit(record, int times) adds the record a number of times to the end of the input.

The Input object can then be used to produce a DataSet or DataStream:

DataSet<Tuple2<String,Integer>> dataSet = createTestDataSet(input);
DataStream<Tuple2<String,Integer>> dataStream = createTestDataStream(input);

If you're extending the DataSetTestBase or StreamTestBase, you can create a DataSet or DataStream in one step in fluent way by using .createTestDataSetWith(record) or .createTestDataStreamWith(record):

DataSet<Tuple2<String,Integer>> dataStream = 
		createTestDataSetWith(Tuple2.of("one",1))
				.emit(Tuple2.of("two",2))
				.emit(Tuple2.of("three",3))
				.repeatAll(times(2))
				.emit(Tuple2.of("four", 3), times(3))
				.close();

Note: that you have to call .close(); at the end to convert the SourceBuilder into a DataSet.

Using EventTimeInputBuilder

To provide the possibility to test streaming data flows that use windowing the framework sets the TimeCharacteristic, of the wrapped StreamExecutionEnvironment, to EventTime. The ParallelFromStreamRecordsFunction utilized in the background is emitting a DataStream with timestamps and watermarks in order to test long running windows in the matter of a few seconds.

To use this feature you have to create EventTimeInput using the EventTimeInputBuilder:

EventTimeInput<Tuple2<Integer, String>> etInput =
		EventTimeInputBuilder.startWith(Tuple2.of(1, "fritz"))
				.emit(Tuple2.of(1, "hans"), after(15, seconds))
				.emit(Tuple2.of(1, "heidi"), before(5, seconds))
				.emit(Tuple2.of(3, "peter"), intoWindow(20, seconds), times(10))
				.repeatAll(after(10, seconds), times(1))
				.close();
  • .startWith(record) provides you with an instance of EventTimeInputBuilder that has the generic type of the passed record.

  • .emit(record, TimeSpan span) adds a record to the list of input with a time gap between the previous record and this record, set by TimeSpan. You can use after, before, or intoWindow. after adds and before subtracts from the timestamp the previous record has. The records will be emitted in the order they're added to the input. Using Before will create input with not continuously rising timestamps! However the framework will calculate and emit ideal watermarks in this case. intoWindow will fit the record into a window up to the defined time. It does this by taking the time and subtracting 1 millisecond from it.

  • .emit(record, TimeSpan span, int times) adds the record a number of times to the end of the input. With the TimeSpan between each repetition. intoWindow will emit the record repeatedly into the same window.

  • .repeatAll(int, TimeSpan span) repeats all records, added to the input up to this point, a number of times. The TimeSpan defines the gap between each repetition of the whole input. The relative time span between records already defined will be kept.

EventTimeInput is passed to the framework in same way as Input:

DataStream<Tuple2<String,Integer>> dataStream = createTestDataStream(etInput);