-
Notifications
You must be signed in to change notification settings - Fork 30
/
UserReportSystem.java
122 lines (96 loc) · 4.88 KB
/
UserReportSystem.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// camel-k: language=java property=file:application.properties
// camel-k: dependency=github:openshift-integration:camel-k-example-event-streaming
// camel-k: config=secret:example-event-streaming-user-reporting
import java.util.Arrays;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.PropertyInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import com.redhat.integration.common.Data;
public class UserReportSystem extends RouteBuilder {
@PropertyInject("users.allowed")
private String usersAllowed;
public void configure() throws Exception {
final String AUTH_HEADER = "authorized";
final String VALID_HEADER = "valid";
final String REPORT_TYPE_HEADER = "type";
rest("/")
.get("/report/list").to("direct:report-list")
.put("/report/new").to("direct:report-new");
from("direct:report-list")
.transform().constant("Not implemented");
from("direct:report-new")
.streamCaching()
.wireTap("direct:audit")
.unmarshal().json(JsonLibrary.Jackson, Data.class)
.step()
.to("direct:authenticate")
.choice()
.when(header(AUTH_HEADER).isEqualTo(true))
.to("direct:publish")
.end();
from("direct:audit")
.to("knative:channel/audit");
from("direct:log")
.convertBodyTo(String.class)
.to("log:info");
from("direct:authenticate")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
String[] userList = usersAllowed.split(",");
Data data = exchange.getMessage().getBody(Data.class);
if (Arrays.asList(userList).contains(data.getUser().getName())) {
exchange.getMessage().setHeader(AUTH_HEADER, true);
exchange.getMessage().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
}
else {
exchange.getMessage().setHeader(AUTH_HEADER, false);
exchange.getMessage().setBody("Unauthorized");
exchange.getMessage().setHeader(Exchange.HTTP_RESPONSE_CODE, 401);
}
}
});
from("direct:publish")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Data data = exchange.getMessage().getBody(Data.class);
Data.Report report = data.getReport();
if (report == null || report.getType() == null || report.getType().isEmpty()) {
exchange.getMessage().setHeader(VALID_HEADER, false);
exchange.getMessage().setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
exchange.getMessage().setBody("Invalid report data: empty or null");
}
else {
ObjectMapper mapper = new ObjectMapper();
switch (report.getType()) {
case "health":
case "crime": {
exchange.getMessage().setHeader(VALID_HEADER, true);
exchange.getMessage().setHeader(REPORT_TYPE_HEADER, report.getType());
String body = mapper.writeValueAsString(data);
exchange.getMessage().setBody(body);
break;
}
default: {
exchange.getMessage().setHeader(VALID_HEADER, true);
exchange.getMessage().setBody("Invalid report data: unsupported report data");
}
}
}
}
})
.choice()
.when(header(VALID_HEADER).isEqualTo(false))
.stop()
.when(header(VALID_HEADER).isEqualTo(true))
.choice()
.when(header(REPORT_TYPE_HEADER).isEqualTo("crime"))
.to("kafka:crime-data?brokers={{kafka.bootstrap.address}}")
.transform().constant("OK").stop()
.when(header(REPORT_TYPE_HEADER).isEqualTo("health"))
.to("kafka:health-data?brokers={{kafka.bootstrap.address}}")
.transform().constant("OK");
}
}