-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathTumblingEventTimeUsingAscendingWM.java
61 lines (47 loc) · 2.6 KB
/
TumblingEventTimeUsingAscendingWM.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.manning.fia.c05;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import com.manning.fia.model.media.BaseNewsFeed;
import com.manning.fia.transformations.media.NewsFeedMapper3;
import com.manning.fia.utils.DataSourceFactory;
/**
* Created by hari on 6/26/16.
* expain the difference in line of using Assigner ..
* while writing in the book jst flip assignTimestampsAndWatermarks with TumblingEventTimeUsingApplyExample
*/
public class TumblingEventTimeUsingAscendingWM {
public void executeJob(ParameterTool parameterTool) throws Exception {
StreamExecutionEnvironment execEnv;
DataStream<String> dataStream;
DataStream<Tuple5<Long, String, String, String, String>> selectDS;
DataStream<Tuple5<Long, String, String, String, String>> timestampsAndWatermarksDS;
KeyedStream<Tuple5<Long, String, String, String, String>, Tuple> keyedDS;
WindowedStream<Tuple5<Long, String, String, String, String>, Tuple, TimeWindow> windowedStream;
DataStream<Tuple6<Long, Long, List<Long>, String, String, Long>> result;
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
dataStream = execEnv.addSource(DataSourceFactory.getDataSource(parameterTool));
selectDS = dataStream.map(new NewsFeedMapper3());
timestampsAndWatermarksDS = selectDS.assignTimestampsAndWatermarks(new AscendingTimestampAndWatermarkAssigner());
keyedDS = timestampsAndWatermarksDS.keyBy(1, 2);
windowedStream = keyedDS.timeWindow(Time.seconds(5));
result = windowedStream.apply(new ApplyFunction());
result.print();
execEnv.execute("Tumbling Event Time");
}
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
TumblingEventTimeUsingAscendingWM window = new TumblingEventTimeUsingAscendingWM();
window.executeJob(parameterTool);
}
}