Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Event Json input and output codecs #4436

Merged
merged 14 commits into from
Apr 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ public EventJsonInputCodec(final EventJsonInputCodecConfig config) {
this.overrideTimeReceived = config.getOverrideTimeReceived();
}

private boolean isCompatibleVersion(Map<String, Object> json) {
final String versionStr = (String)json.get(EventJsonDefines.VERSION);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DataPrepperVersion class provides a lot of this.

final String versionStr = (String)json.get(EventJsonDefines.VERSION);
final DataPrepperVersion definedVersion = DataPrepperVersion.parse(versionStr);
if(definedVersion.getMajorVersion() != DataPrepperVersion.getCurrentVersion().getMajorVersion()) {
  LOG.error("Version mismatch! Current version {} Received data version {}", DataPrepperVersion.getCurrentVersion(), definedVersion);
            return false;
}

final String[] version = versionStr.split("[.]");
final String currentVersionStr = DataPrepperVersion.getCurrentVersion().toString();
final String[] currentVersion = currentVersionStr.split("[.]");
if (version.length < 2 || currentVersion.length < 2) {
LOG.error("Invalid Version! Current version {} Received data version {}", currentVersionStr, versionStr);
return false;
}
if (!version[0].equals(currentVersion[0])) {
LOG.error("Version mismatch! Current version {} Received data version {}", currentVersionStr, versionStr);
return false;
}
return true;
}

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);
Expand All @@ -57,14 +73,12 @@ public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer
while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);
final List<Map<String, Object>> maps = (List<Map<String, Object>>)innerJson.get(EventJsonDefines.EVENTS);
final String version = (String)innerJson.get(EventJsonDefines.VERSION);
if (!version.equals(DataPrepperVersion.getCurrentVersion().toString())) {
LOG.error("Version mismatch! Current version {} Received data version {}", DataPrepperVersion.getCurrentVersion().toString(), version);
if (!isCompatibleVersion(innerJson)) {
return;
}
for (Map<String, Object> map: maps) {
final Record<Event> record = createRecord(map);
final List<Map<String, Object>> events = (List<Map<String, Object>>)innerJson.get(EventJsonDefines.EVENTS);
for (Map<String, Object> eventMap: events) {
final Record<Event> record = createRecord(eventMap);
if (record != null) {
eventConsumer.accept(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,32 @@ public void invalidVersionTest() throws Exception {

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\"2.0\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\""+EventJsonDefines.VERSION+"\":\"2\", \""+EventJsonDefines.EVENTS+"\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(0));
}


@Test
public void inCompatibleVersionTest() throws Exception {
inputCodec = createInputCodec();
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\"1.0\", \""+EventJsonDefines.EVENTS+"\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
Expand Down
Loading