From 8e8bdb9c0b6b32147654a2873648a3e5e370783e Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 17 Apr 2024 23:44:48 +0000 Subject: [PATCH 01/14] Event Json input and output codecs Signed-off-by: Krishna Kondaka --- .../event-json-codecs/build.gradle | 36 ++++++ .../codec/eventjson/EventJsonDefines.java | 16 +++ .../codec/eventjson/EventJsonInputCodec.java | 68 +++++++++++ .../codec/eventjson/EventJsonOutputCodec.java | 86 +++++++++++++ .../eventjson/EventJsonOutputCodecConfig.java | 9 ++ .../EventJsonInputOutputCodecTest.java | 113 ++++++++++++++++++ settings.gradle | 3 +- 7 files changed, 330 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/event-json-codecs/build.gradle create mode 100644 data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonDefines.java create mode 100644 data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java create mode 100644 data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java create mode 100644 data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodecConfig.java create mode 100644 data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java diff --git a/data-prepper-plugins/event-json-codecs/build.gradle b/data-prepper-plugins/event-json-codecs/build.gradle new file mode 100644 index 0000000000..b52eaf24ec --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/build.gradle @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' + implementation 'org.apache.parquet:parquet-common:1.13.1' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { + limit { + minimum = 1.0 + } + } + } +} + +check.dependsOn jacocoTestCoverageVerification + diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonDefines.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonDefines.java new file mode 100644 index 0000000000..1da0df760b --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonDefines.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.eventjson; + +public class EventJsonDefines { + public static final String DATA = "data"; + public static final String METADATA = "metadata"; + public static final String ATTRIBUTES = "attributes"; + public static final String TAGS = "tags"; + public static final String TIME_RECEIVED = "timeReceived"; + public static final String EXTERNAL_ORIGINATION_TIME = "externalOriginationTime"; +} + + diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java new file mode 100644 index 0000000000..bd5a006296 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.eventjson; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.function.Consumer; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * An implementation of {@link InputCodec} which parses JSON Objects for arrays. + */ +@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class) +public class EventJsonInputCodec implements InputCodec { + private final ObjectMapper objectMapper = new ObjectMapper(); + private final JsonFactory jsonFactory = new JsonFactory(); + + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputStream); + Objects.requireNonNull(eventConsumer); + + final JsonParser jsonParser = jsonFactory.createParser(inputStream); + while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { + if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { + final Map innerJson = objectMapper.readValue(jsonParser, Map.class); + + Map metadata = (Map)innerJson.get(EventJsonDefines.METADATA); + Map data = (Map)innerJson.get(EventJsonDefines.DATA); + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(data) + .withEventMetadataAttributes((Map)metadata.get(EventJsonDefines.ATTRIBUTES)) + .withTimeReceived(Instant.parse((String)metadata.get(EventJsonDefines.TIME_RECEIVED))) + .getThis(); + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + final Record record = new Record<>(event); + final String externalOriginationTime = (String)metadata.get(EventJsonDefines.EXTERNAL_ORIGINATION_TIME); + final List tags = (List)metadata.get(EventJsonDefines.TAGS); + if (tags.size() > 0) { + event.getMetadata().addTags(tags); + } + if (externalOriginationTime != null) { + event.getMetadata().setExternalOriginationTime(Instant.parse(externalOriginationTime)); + } + eventConsumer.accept(record); + } + } + } + +} + diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java new file mode 100644 index 0000000000..c0b66d3920 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.eventjson; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +@DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class) +public class EventJsonOutputCodec implements OutputCodec { + private final ObjectMapper objectMapper = new ObjectMapper(); + private static final String EVENT_JSON = "event_json"; + private static final JsonFactory factory = new JsonFactory(); + private final EventJsonOutputCodecConfig config; + private JsonGenerator generator; + private OutputCodecContext codecContext; + + @DataPrepperPluginConstructor + public EventJsonOutputCodec(final EventJsonOutputCodecConfig config) { + this.config = config; + } + + @Override + public String getExtension() { + return EVENT_JSON; + } + + @Override + public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException { + Objects.requireNonNull(outputStream); + generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); + generator.writeStartObject(); + } + + @Override + public void complete(final OutputStream outputStream) throws IOException { + generator.writeEndObject(); + generator.close(); + outputStream.flush(); + outputStream.close(); + } + + @Override + public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { + Objects.requireNonNull(event); + try { + getDataMapToSerialize(event); + } catch (Exception e){ + } + generator.flush(); + } + + private Map getDataMapToSerialize(Event event) throws Exception { + Map dataMap = event.toMap(); + generator.writeFieldName(EventJsonDefines.DATA); + objectMapper.writeValue(generator, dataMap); + Map metadataMap = new HashMap<>(); + EventMetadata metadata = event.getMetadata(); + metadataMap.put(EventJsonDefines.ATTRIBUTES, metadata.getAttributes()); + metadataMap.put(EventJsonDefines.TAGS, metadata.getTags()); + metadataMap.put(EventJsonDefines.TIME_RECEIVED, metadata.getTimeReceived().toString()); + if (metadata.getExternalOriginationTime() != null) { + metadataMap.put(EventJsonDefines.EXTERNAL_ORIGINATION_TIME, metadata.getExternalOriginationTime().toString()); + } + generator.writeFieldName(EventJsonDefines.METADATA); + objectMapper.writeValue(generator, metadataMap); + return dataMap; + } + +} diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodecConfig.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodecConfig.java new file mode 100644 index 0000000000..5e18481e69 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodecConfig.java @@ -0,0 +1,9 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.eventjson; + +public class EventJsonOutputCodecConfig { +} + diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java new file mode 100644 index 0000000000..4dab05a084 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.eventjson; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +public class EventJsonInputOutputCodecTest { + private static final Integer BYTEBUFFER_SIZE = 1024; + + private ByteArrayOutputStream outputStream; + + @Mock + private EventJsonOutputCodecConfig eventJsonOutputCodecConfig; + + private EventJsonOutputCodec outputCodec; + private EventJsonInputCodec inputCodec; + + @BeforeEach + public void setup() { + outputStream = new ByteArrayOutputStream(BYTEBUFFER_SIZE); + } + + public EventJsonOutputCodec createOutputCodec() { + return new EventJsonOutputCodec(eventJsonOutputCodecConfig); + } + + public EventJsonInputCodec createInputCodec() { + return new EventJsonInputCodec(); + } + + @Test + public void basicTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + outputCodec = createOutputCodec(); + inputCodec = createInputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), record -> { + Event e = record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + }); + } + + @Test + public void extendedTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Set tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + List tagsList = tags.stream().collect(Collectors.toList()); + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + Instant origTime = startTime.minusSeconds(5); + event.getMetadata().setExternalOriginationTime(origTime); + event.getMetadata().addTags(tagsList); + outputCodec = createOutputCodec(); + inputCodec = createInputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), record -> { + Event e = record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags(), equalTo(tags)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(origTime)); + }); + } + + + private Event createEvent(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(json) + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + + return event; + } +} + diff --git a/settings.gradle b/settings.gradle index 85727b0f81..ffe34260ee 100644 --- a/settings.gradle +++ b/settings.gradle @@ -116,6 +116,7 @@ include 'data-prepper-plugins:otel-logs-source' include 'data-prepper-plugins:blocking-buffer' include 'data-prepper-plugins:http-source' include 'data-prepper-plugins:drop-events-processor' +include 'data-prepper-plugins:event-json-codecs' include 'data-prepper-plugins:key-value-processor' include 'data-prepper-plugins:mutate-event-processors' include 'data-prepper-plugins:geoip-processor' @@ -169,4 +170,4 @@ include 'data-prepper-plugins:decompress-processor' include 'data-prepper-plugins:split-event-processor' include 'data-prepper-plugins:http-common' include 'data-prepper-plugins:flatten-processor' -include 'data-prepper-plugins:mongodb' \ No newline at end of file +include 'data-prepper-plugins:mongodb' From f255900c7326076534cc6f687d228e5e318765e5 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 18 Apr 2024 00:36:19 +0000 Subject: [PATCH 02/14] Modified test case to check for event metadata attributes Signed-off-by: Krishna Kondaka --- .../codec/eventjson/EventJsonInputOutputCodecTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java index 4dab05a084..fb8141ff20 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java @@ -74,6 +74,8 @@ public void basicTest() throws Exception { public void extendedTest() throws Exception { final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); + final String attrKey = UUID.randomUUID().toString(); + final String attrValue = UUID.randomUUID().toString(); Map data = Map.of(key, value); Set tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); @@ -83,6 +85,7 @@ public void extendedTest() throws Exception { Instant origTime = startTime.minusSeconds(5); event.getMetadata().setExternalOriginationTime(origTime); event.getMetadata().addTags(tagsList); + event.getMetadata().setAttribute(attrKey, attrValue); outputCodec = createOutputCodec(); inputCodec = createInputCodec(); outputCodec.start(outputStream, null, null); @@ -93,6 +96,7 @@ public void extendedTest() throws Exception { assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags(), equalTo(tags)); + assertThat(e.getMetadata().getAttributes(), equalTo(Map.of(attrKey, attrValue))); assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(origTime)); }); } From 26e2674c84da5265e8e4d019bd8f0b371c5ce015 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 18 Apr 2024 16:48:04 +0000 Subject: [PATCH 03/14] Modified the coverage to 0.9 Signed-off-by: Krishna Kondaka --- data-prepper-plugins/event-json-codecs/build.gradle | 2 +- .../plugins/codec/eventjson/EventJsonOutputCodec.java | 9 +++------ .../codec/eventjson/EventJsonInputOutputCodecTest.java | 1 + 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/event-json-codecs/build.gradle b/data-prepper-plugins/event-json-codecs/build.gradle index b52eaf24ec..2a4aee9005 100644 --- a/data-prepper-plugins/event-json-codecs/build.gradle +++ b/data-prepper-plugins/event-json-codecs/build.gradle @@ -26,7 +26,7 @@ jacocoTestCoverageVerification { violationRules { rule { limit { - minimum = 1.0 + minimum = 0.9 } } } diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java index c0b66d3920..922c260f6a 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java @@ -25,7 +25,7 @@ @DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class) public class EventJsonOutputCodec implements OutputCodec { private final ObjectMapper objectMapper = new ObjectMapper(); - private static final String EVENT_JSON = "event_json"; + static final String EVENT_JSON = "event_json"; private static final JsonFactory factory = new JsonFactory(); private final EventJsonOutputCodecConfig config; private JsonGenerator generator; @@ -59,14 +59,11 @@ public void complete(final OutputStream outputStream) throws IOException { @Override public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { Objects.requireNonNull(event); - try { - getDataMapToSerialize(event); - } catch (Exception e){ - } + getDataMapToSerialize(event); generator.flush(); } - private Map getDataMapToSerialize(Event event) throws Exception { + private Map getDataMapToSerialize(Event event) throws IOException { Map dataMap = event.toMap(); generator.writeFieldName(EventJsonDefines.DATA); objectMapper.writeValue(generator, dataMap); diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java index fb8141ff20..19c0e7b2ce 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java @@ -91,6 +91,7 @@ public void extendedTest() throws Exception { outputCodec.start(outputStream, null, null); outputCodec.writeEvent(event, outputStream); outputCodec.complete(outputStream); + assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON)); inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), record -> { Event e = record.getData(); assertThat(e.get(key, String.class), equalTo(value)); From 1f74b6d2284658a24d9842e195078c91c543d214 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 18 Apr 2024 21:05:23 +0000 Subject: [PATCH 04/14] Addressed review comments Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventMetadata.java | 31 +++++++++++++++++-- .../event-json-codecs/build.gradle | 2 ++ .../codec/eventjson/EventJsonInputCodec.java | 19 ++++-------- .../codec/eventjson/EventJsonOutputCodec.java | 14 ++------- 4 files changed, 40 insertions(+), 26 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index 883297d567..553d9b46ce 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.model.event; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.time.Instant; import java.util.HashSet; import java.util.List; @@ -23,16 +25,23 @@ */ public class DefaultEventMetadata implements EventMetadata { - private final String eventType; + @JsonProperty("event_type") + private String eventType; - private final Instant timeReceived; + @JsonProperty("time_received") + private Instant timeReceived; + @JsonProperty("external_origination_time") private Instant externalOriginationTime; + @JsonProperty("attributes") private Map attributes; + @JsonProperty("tags") private Set tags; + private DefaultEventMetadata() {} + private DefaultEventMetadata(final Builder builder) { checkNotNull(builder.eventType, "eventType cannot be null"); @@ -71,6 +80,16 @@ public Instant getExternalOriginationTime() { return externalOriginationTime; } +/* + public void setEventType(String eType) { + eventType = eType; + } + + public void setTimeReceived(Instant receivedTime) { + timeReceived = receivedTime; + } +*/ + @Override public void setExternalOriginationTime(Instant externalOriginationTime) { this.externalOriginationTime = externalOriginationTime; @@ -86,6 +105,10 @@ public void setAttribute(final String key, final Object value) { attributes.put(key, value); } + public void setAttributes(Map attrs) { + attributes = attrs; + } + @Override public Object getAttribute(final String attributeKey) { String key = (attributeKey.charAt(0) == '/') ? attributeKey.substring(1) : attributeKey; @@ -116,6 +139,10 @@ public void addTags(final List newTags) { } } + public void setTags(final Set newTags) { + tags = newTags; + } + @Override public boolean equals(final Object o) { if (this == o) return true; diff --git a/data-prepper-plugins/event-json-codecs/build.gradle b/data-prepper-plugins/event-json-codecs/build.gradle index 2a4aee9005..da900c6a1a 100644 --- a/data-prepper-plugins/event-json-codecs/build.gradle +++ b/data-prepper-plugins/event-json-codecs/build.gradle @@ -13,6 +13,8 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' implementation 'org.apache.parquet:parquet-common:1.13.1' testImplementation project(':data-prepper-test-common') } diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java index bd5a006296..0ae87ffbb7 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java @@ -9,19 +9,20 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.DefaultEventMetadata; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; import java.io.IOException; import java.io.InputStream; -import java.time.Instant; import java.util.function.Consumer; -import java.util.List; import java.util.Map; import java.util.Objects; @@ -30,7 +31,7 @@ */ @DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class) public class EventJsonInputCodec implements InputCodec { - private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); private final JsonFactory jsonFactory = new JsonFactory(); public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { @@ -43,22 +44,14 @@ public void parse(InputStream inputStream, Consumer> eventConsumer final Map innerJson = objectMapper.readValue(jsonParser, Map.class); Map metadata = (Map)innerJson.get(EventJsonDefines.METADATA); + EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class); Map data = (Map)innerJson.get(EventJsonDefines.DATA); final JacksonLog.Builder logBuilder = JacksonLog.builder() .withData(data) - .withEventMetadataAttributes((Map)metadata.get(EventJsonDefines.ATTRIBUTES)) - .withTimeReceived(Instant.parse((String)metadata.get(EventJsonDefines.TIME_RECEIVED))) + .withEventMetadata(eventMetadata) .getThis(); final JacksonEvent event = (JacksonEvent)logBuilder.build(); final Record record = new Record<>(event); - final String externalOriginationTime = (String)metadata.get(EventJsonDefines.EXTERNAL_ORIGINATION_TIME); - final List tags = (List)metadata.get(EventJsonDefines.TAGS); - if (tags.size() > 0) { - event.getMetadata().addTags(tags); - } - if (externalOriginationTime != null) { - event.getMetadata().setExternalOriginationTime(Instant.parse(externalOriginationTime)); - } eventConsumer.accept(record); } } diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java index 922c260f6a..5c9189e470 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java @@ -8,23 +8,22 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.IOException; import java.io.OutputStream; -import java.util.HashMap; import java.util.Map; import java.util.Objects; @DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class) public class EventJsonOutputCodec implements OutputCodec { - private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); static final String EVENT_JSON = "event_json"; private static final JsonFactory factory = new JsonFactory(); private final EventJsonOutputCodecConfig config; @@ -67,14 +66,7 @@ private Map getDataMapToSerialize(Event event) throws IOExceptio Map dataMap = event.toMap(); generator.writeFieldName(EventJsonDefines.DATA); objectMapper.writeValue(generator, dataMap); - Map metadataMap = new HashMap<>(); - EventMetadata metadata = event.getMetadata(); - metadataMap.put(EventJsonDefines.ATTRIBUTES, metadata.getAttributes()); - metadataMap.put(EventJsonDefines.TAGS, metadata.getTags()); - metadataMap.put(EventJsonDefines.TIME_RECEIVED, metadata.getTimeReceived().toString()); - if (metadata.getExternalOriginationTime() != null) { - metadataMap.put(EventJsonDefines.EXTERNAL_ORIGINATION_TIME, metadata.getExternalOriginationTime().toString()); - } + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); generator.writeFieldName(EventJsonDefines.METADATA); objectMapper.writeValue(generator, metadataMap); return dataMap; From b36af731b884bb4b8ed99015badc892f79261db1 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 18 Apr 2024 22:33:21 +0000 Subject: [PATCH 05/14] Fixes for failing coverage tests Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventMetadata.java | 18 ------------------ .../EventJsonInputOutputCodecTest.java | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index 553d9b46ce..2c34be6e83 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -80,16 +80,6 @@ public Instant getExternalOriginationTime() { return externalOriginationTime; } -/* - public void setEventType(String eType) { - eventType = eType; - } - - public void setTimeReceived(Instant receivedTime) { - timeReceived = receivedTime; - } -*/ - @Override public void setExternalOriginationTime(Instant externalOriginationTime) { this.externalOriginationTime = externalOriginationTime; @@ -105,10 +95,6 @@ public void setAttribute(final String key, final Object value) { attributes.put(key, value); } - public void setAttributes(Map attrs) { - attributes = attrs; - } - @Override public Object getAttribute(final String attributeKey) { String key = (attributeKey.charAt(0) == '/') ? attributeKey.substring(1) : attributeKey; @@ -139,10 +125,6 @@ public void addTags(final List newTags) { } } - public void setTags(final Set newTags) { - tags = newTags; - } - @Override public boolean equals(final Object o) { if (this == o) return true; diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java index 19c0e7b2ce..781df4ccc8 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java @@ -48,6 +48,20 @@ public EventJsonInputCodec createInputCodec() { return new EventJsonInputCodec(); } + @Test + public void emptyTest() throws Exception { + ByteArrayInputStream inputStream = new ByteArrayInputStream("".getBytes()); + inputCodec = createInputCodec(); + inputCodec.parse(inputStream, record -> { }); + inputStream = new ByteArrayInputStream("[]".getBytes()); + inputCodec.parse(inputStream, record -> { }); + inputStream = new ByteArrayInputStream("[{}]".getBytes()); + inputCodec.parse(inputStream, record -> { }); + inputStream = new ByteArrayInputStream("{}".getBytes()); + inputCodec.parse(inputStream, record -> { }); + + } + @Test public void basicTest() throws Exception { final String key = UUID.randomUUID().toString(); From c8c6d3d75844943d98b93ccfc759e9c205769839 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 19 Apr 2024 05:26:08 +0000 Subject: [PATCH 06/14] Addressed review comments Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventMetadata.java | 15 ++- .../EventJsonDefines.java | 2 +- .../codec/event_json/EventJsonInputCodec.java | 92 +++++++++++++++++++ .../EventJsonOutputCodec.java | 8 +- .../EventJsonOutputCodecConfig.java | 2 +- .../codec/eventjson/EventJsonInputCodec.java | 61 ------------ .../EventJsonInputOutputCodecTest.java | 83 +++++++++++++---- 7 files changed, 178 insertions(+), 85 deletions(-) rename data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/{eventjson => event_json}/EventJsonDefines.java (88%) create mode 100644 data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java rename data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/{eventjson => event_json}/EventJsonOutputCodec.java (95%) rename data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/{eventjson => event_json}/EventJsonOutputCodecConfig.java (67%) delete mode 100644 data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java rename data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/{eventjson => event_json}/EventJsonInputOutputCodecTest.java (61%) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index 2c34be6e83..fa53cd6ad1 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -54,7 +54,8 @@ private DefaultEventMetadata(final Builder builder) { this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes); this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags); - this.externalOriginationTime = null; + + this.externalOriginationTime = builder.externalOriginationTime; } private DefaultEventMetadata(final EventMetadata eventMetadata) { @@ -172,6 +173,7 @@ static EventMetadata fromEventMetadata(final EventMetadata eventMetadata) { public static class Builder { private String eventType; private Instant timeReceived; + private Instant externalOriginationTime; private Map attributes; private Set tags; @@ -197,6 +199,17 @@ public Builder withTimeReceived(final Instant timeReceived) { return this; } + /** + * Sets the external origination Time. + * @param externalOriginationTime the time an event was received + * @return returns the builder + * @since 2.8 + */ + public Builder withExternalOriginationTime(final Instant externalOriginationTime) { + this.externalOriginationTime = externalOriginationTime; + return this; + } + /** * Sets the attributes. An empty immutable map is the default value. * @param attributes a map of key-value pair attributes diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonDefines.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java similarity index 88% rename from data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonDefines.java rename to data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java index 1da0df760b..2601e390b9 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonDefines.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.codec.eventjson; +package org.opensearch.dataprepper.plugins.codec.event_json; public class EventJsonDefines { public static final String DATA = "data"; diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java new file mode 100644 index 0000000000..8fb7ce1e48 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.DefaultEventMetadata; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.function.Consumer; +import java.util.Map; +import java.util.Objects; + +/** + * An implementation of {@link InputCodec} which parses JSON Objects for arrays. + */ +@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class, pluginConfigurationType = EventJsonInputCodecConfig.class) +public class EventJsonInputCodec implements InputCodec { + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + private final JsonFactory jsonFactory = new JsonFactory(); + private final Boolean overrideTimeReceived; + + @DataPrepperPluginConstructor + public EventJsonInputCodec(final EventJsonInputCodecConfig config) { + this.overrideTimeReceived = config.getOverrideTimeReceived(); + } + + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputStream); + Objects.requireNonNull(eventConsumer); + + final JsonParser jsonParser = jsonFactory.createParser(inputStream); + + while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { + if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) { + parseRecordsArray(jsonParser, eventConsumer); + } + } + } + + private void parseRecordsArray(final JsonParser jsonParser, final Consumer> eventConsumer) throws IOException { + while (jsonParser.nextToken() != JsonToken.END_ARRAY) { + final Map innerJson = objectMapper.readValue(jsonParser, Map.class); + + final Record record = createRecord(innerJson); + if (record != null) { + eventConsumer.accept(record); + } + } + } + + private Record createRecord(final Map innerJson) { + Map metadata = (Map)innerJson.get(EventJsonDefines.METADATA); + EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class); + Map data = (Map)innerJson.get(EventJsonDefines.DATA); + if (data == null) { + return null; + } + if (overrideTimeReceived) { + eventMetadata = new DefaultEventMetadata.Builder() + .withAttributes(eventMetadata.getAttributes()) + .withTimeReceived(Instant.now()) + .withTags(eventMetadata.getTags()) + .withExternalOriginationTime(eventMetadata.getExternalOriginationTime()) + .build(); + } + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(data) + .withEventMetadata(eventMetadata) + .getThis(); + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + final Record record = new Record<>(event); + return record; + } +} diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java similarity index 95% rename from data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java rename to data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java index 5c9189e470..18f617fa01 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.codec.eventjson; +package org.opensearch.dataprepper.plugins.codec.event_json; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; @@ -44,12 +44,12 @@ public String getExtension() { public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException { Objects.requireNonNull(outputStream); generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); - generator.writeStartObject(); + generator.writeStartArray(); } @Override public void complete(final OutputStream outputStream) throws IOException { - generator.writeEndObject(); + generator.writeEndArray(); generator.close(); outputStream.flush(); outputStream.close(); @@ -57,9 +57,11 @@ public void complete(final OutputStream outputStream) throws IOException { @Override public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { + generator.writeStartObject(); Objects.requireNonNull(event); getDataMapToSerialize(event); generator.flush(); + generator.writeEndObject(); } private Map getDataMapToSerialize(Event event) throws IOException { diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodecConfig.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecConfig.java similarity index 67% rename from data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodecConfig.java rename to data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecConfig.java index 5e18481e69..352580737e 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonOutputCodecConfig.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecConfig.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.codec.eventjson; +package org.opensearch.dataprepper.plugins.codec.event_json; public class EventJsonOutputCodecConfig { } diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java deleted file mode 100644 index 0ae87ffbb7..0000000000 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputCodec.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.codec.eventjson; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; - -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.codec.InputCodec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventMetadata; -import org.opensearch.dataprepper.model.event.DefaultEventMetadata; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.model.record.Record; - -import java.io.IOException; -import java.io.InputStream; -import java.util.function.Consumer; -import java.util.Map; -import java.util.Objects; - -/** - * An implementation of {@link InputCodec} which parses JSON Objects for arrays. - */ -@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class) -public class EventJsonInputCodec implements InputCodec { - private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); - private final JsonFactory jsonFactory = new JsonFactory(); - - public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { - Objects.requireNonNull(inputStream); - Objects.requireNonNull(eventConsumer); - - final JsonParser jsonParser = jsonFactory.createParser(inputStream); - while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { - if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { - final Map innerJson = objectMapper.readValue(jsonParser, Map.class); - - Map metadata = (Map)innerJson.get(EventJsonDefines.METADATA); - EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class); - Map data = (Map)innerJson.get(EventJsonDefines.DATA); - final JacksonLog.Builder logBuilder = JacksonLog.builder() - .withData(data) - .withEventMetadata(eventMetadata) - .getThis(); - final JacksonEvent event = (JacksonEvent)logBuilder.build(); - final Record record = new Record<>(event); - eventConsumer.accept(record); - } - } - } - -} - diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java similarity index 61% rename from data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java rename to data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java index 781df4ccc8..2f929cd5a0 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/eventjson/EventJsonInputOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java @@ -2,26 +2,35 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.codec.eventjson; +package org.opensearch.dataprepper.plugins.codec.event_json; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; import org.mockito.Mock; +import static org.mockito.Mockito.verifyNoInteractions; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; + import java.io.ByteArrayOutputStream; import java.io.ByteArrayInputStream; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import java.time.Instant; import java.util.List; +import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import java.util.stream.Collectors; public class EventJsonInputOutputCodecTest { @@ -32,12 +41,16 @@ public class EventJsonInputOutputCodecTest { @Mock private EventJsonOutputCodecConfig eventJsonOutputCodecConfig; + @Mock + private EventJsonInputCodecConfig eventJsonInputCodecConfig; + private EventJsonOutputCodec outputCodec; private EventJsonInputCodec inputCodec; @BeforeEach public void setup() { outputStream = new ByteArrayOutputStream(BYTEBUFFER_SIZE); + eventJsonInputCodecConfig = mock(EventJsonInputCodecConfig.class); } public EventJsonOutputCodec createOutputCodec() { @@ -45,20 +58,18 @@ public EventJsonOutputCodec createOutputCodec() { } public EventJsonInputCodec createInputCodec() { - return new EventJsonInputCodec(); + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(false); + return new EventJsonInputCodec(eventJsonInputCodecConfig); } - @Test - public void emptyTest() throws Exception { - ByteArrayInputStream inputStream = new ByteArrayInputStream("".getBytes()); + @ParameterizedTest + @ValueSource(strings = {"", "[]", "[{}]", "{}"}) + public void emptyTest(String input) throws Exception { + ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes()); inputCodec = createInputCodec(); - inputCodec.parse(inputStream, record -> { }); - inputStream = new ByteArrayInputStream("[]".getBytes()); - inputCodec.parse(inputStream, record -> { }); - inputStream = new ByteArrayInputStream("[{}]".getBytes()); - inputCodec.parse(inputStream, record -> { }); - inputStream = new ByteArrayInputStream("{}".getBytes()); - inputCodec.parse(inputStream, record -> { }); + Consumer> consumer = mock(Consumer.class); + inputCodec.parse(inputStream, consumer); + verifyNoInteractions(consumer); } @@ -75,13 +86,45 @@ public void basicTest() throws Exception { outputCodec.start(outputStream, null, null); outputCodec.writeEvent(event, outputStream); outputCodec.complete(outputStream); - inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), record -> { - Event e = record.getData(); + List> records = new LinkedList<>(); + inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); + + assertThat(records.size(), equalTo(1)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + } + } + + @Test + public void multipleEventsTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + outputCodec = createOutputCodec(); + inputCodec = createInputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.writeEvent(event, outputStream); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + List> records = new LinkedList<>(); + inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); + + assertThat(records.size(), equalTo(3)); + for(Record record : records) { + Event e = (Event)record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags().size(), equalTo(0)); assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); - }); + } } @Test @@ -106,14 +149,18 @@ public void extendedTest() throws Exception { outputCodec.writeEvent(event, outputStream); outputCodec.complete(outputStream); assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON)); - inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), record -> { - Event e = record.getData(); + List> records = new LinkedList<>(); +inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); + + assertThat(records.size(), equalTo(1)); + for(Record record : records) { + Event e = (Event)record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags(), equalTo(tags)); assertThat(e.getMetadata().getAttributes(), equalTo(Map.of(attrKey, attrValue))); assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(origTime)); - }); + } } From 6ce0c8be31e320ca517dc29b66c928b01b691be7 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 19 Apr 2024 06:11:32 +0000 Subject: [PATCH 07/14] Fixed test coverage Signed-off-by: Krishna Kondaka --- .../model/event/DefaultEventMetadataTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java index 479e7be0c2..c87bf1a101 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java @@ -134,6 +134,17 @@ public void testSetAttribute(String key, final Object value) { assertThat(eventMetadata.getAttribute(key), equalTo(value)); } + @Test + public void test_with_ExternalOriginationTime() { + Instant now = Instant.now(); + eventMetadata = DefaultEventMetadata.builder() + .withEventType(testEventType) + .withTimeReceived(testTimeReceived) + .withExternalOriginationTime(now) + .build(); + assertThat(eventMetadata.getExternalOriginationTime(), equalTo(now)); + } + @Test public void testAttributes_without_attributes_is_empty() { eventMetadata = DefaultEventMetadata.builder() From fe5c7fcf91b01cc1787672f5606a24ec2bc41f02 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 19 Apr 2024 06:14:03 +0000 Subject: [PATCH 08/14] Added more tests Signed-off-by: Krishna Kondaka --- .../event_json/EventJsonInputCodecConfig.java | 18 ++++ .../event_json/EventJsonInputCodecTest.java | 92 +++++++++++++++++++ .../event_json/EventJsonOutputCodecTest.java | 83 +++++++++++++++++ 3 files changed, 193 insertions(+) create mode 100644 data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java create mode 100644 data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java create mode 100644 data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java new file mode 100644 index 0000000000..e1ffc37fb3 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class EventJsonInputCodecConfig { + @JsonProperty("override_time_received") + private Boolean overrideTimeReceived = false; + + public Boolean getOverrideTimeReceived() { + return overrideTimeReceived; + } +} + + diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java new file mode 100644 index 0000000000..58d80624ac --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +//import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import org.mockito.Mock; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; + +import java.io.ByteArrayInputStream; + +import java.time.Instant; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; + +public class EventJsonInputCodecTest { + private static final Integer BYTEBUFFER_SIZE = 1024; + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + @Mock + private EventJsonInputCodecConfig eventJsonInputCodecConfig; + + private EventJsonInputCodec inputCodec; + private ByteArrayInputStream inputStream; + + @BeforeEach + public void setup() { + inputCodec = createInputCodec(); + } + + public EventJsonInputCodec createInputCodec() { + eventJsonInputCodecConfig = mock(EventJsonInputCodecConfig.class); + return new EventJsonInputCodec(eventJsonInputCodecConfig); + } + + @Test + public void basicTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String input = "["; + String comma = ""; + for (int i = 0; i < 2; i++) { + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + input += "]"; + inputStream = new ByteArrayInputStream(input.getBytes()); + List> records = new LinkedList<>(); + inputCodec.parse(inputStream, records::add); + assertThat(records.size(), equalTo(2)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + } + } + + private Event createEvent(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(json) + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + + return event; + } +} + diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java new file mode 100644 index 0000000000..6e54a103eb --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + + +import java.io.ByteArrayOutputStream; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; + +import java.time.Instant; +import java.util.Map; +import java.util.UUID; + +public class EventJsonOutputCodecTest { + private static final Integer BYTEBUFFER_SIZE = 1024; + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + @Mock + private EventJsonOutputCodecConfig eventJsonOutputCodecConfig; + + private EventJsonOutputCodec outputCodec; + private ByteArrayOutputStream outputStream; + + @BeforeEach + public void setup() { + outputStream = new ByteArrayOutputStream(BYTEBUFFER_SIZE); + } + + public EventJsonOutputCodec createOutputCodec() { + return new EventJsonOutputCodec(eventJsonOutputCodecConfig); + } + + @Test + public void basicTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + outputCodec = createOutputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String expectedOutput = "["; + String comma = ""; + for (int i = 0; i < 2; i++) { + expectedOutput += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + expectedOutput += "]"; + String output = outputStream.toString(); + assertThat(output, equalTo(expectedOutput)); + + } + + private Event createEvent(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(json) + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + + return event; + } +} From 24c6d7959ed140d998212bde340b13b946af8d23 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 19 Apr 2024 07:21:55 +0000 Subject: [PATCH 09/14] Added more tests for coverage Signed-off-by: Krishna Kondaka --- .../codec/event_json/EventJsonInputCodec.java | 2 + .../event_json/EventJsonInputCodecTest.java | 40 +++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java index 8fb7ce1e48..180085bdd8 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.DefaultEventMetadata; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -75,6 +76,7 @@ private Record createRecord(final Map innerJson) { } if (overrideTimeReceived) { eventMetadata = new DefaultEventMetadata.Builder() + .withEventType(EventType.LOG.toString()) .withAttributes(eventMetadata.getAttributes()) .withTimeReceived(Instant.now()) .withTags(eventMetadata.getTags()) diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index 58d80624ac..972d1197dd 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -9,11 +9,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -//import static org.mockito.Mockito.when; +import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; import org.mockito.Mock; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.CoreMatchers.not; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -39,16 +40,16 @@ public class EventJsonInputCodecTest { @BeforeEach public void setup() { - inputCodec = createInputCodec(); + eventJsonInputCodecConfig = mock(EventJsonInputCodecConfig.class); } public EventJsonInputCodec createInputCodec() { - eventJsonInputCodecConfig = mock(EventJsonInputCodecConfig.class); return new EventJsonInputCodec(eventJsonInputCodecConfig); } @Test public void basicTest() throws Exception { + inputCodec = createInputCodec(); final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); @@ -77,6 +78,39 @@ public void basicTest() throws Exception { } } + @Test + public void test_with_timeReceivedOverridden() throws Exception { + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(true); + inputCodec = createInputCodec(); + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + Instant startTime = Instant.now().minusSeconds(5); + Event event = createEvent(data, startTime); + + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String input = "["; + String comma = ""; + for (int i = 0; i < 2; i++) { + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + input += "]"; + inputStream = new ByteArrayInputStream(input.getBytes()); + List> records = new LinkedList<>(); + inputCodec.parse(inputStream, records::add); + assertThat(records.size(), equalTo(2)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime))); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + } + } + + private Event createEvent(final Map json, final Instant timeReceived) { final JacksonLog.Builder logBuilder = JacksonLog.builder() .withData(json) From 8fb9d504b18d5cad75a7be5a88ae9a65f84987e5 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 20 Apr 2024 01:59:05 +0000 Subject: [PATCH 10/14] Addressed review comments Signed-off-by: Krishna Kondaka --- .../codec/event_json/EventJsonDefines.java | 2 ++ .../codec/event_json/EventJsonInputCodec.java | 23 +++++++++++++--- .../event_json/EventJsonOutputCodec.java | 6 +++++ .../event_json/EventJsonInputCodecTest.java | 26 +++++++++++++++---- .../EventJsonInputOutputCodecTest.java | 17 +----------- .../event_json/EventJsonOutputCodecTest.java | 8 +++--- 6 files changed, 55 insertions(+), 27 deletions(-) diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java index 2601e390b9..c4d2a25dc0 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.codec.event_json; public class EventJsonDefines { + public static final String VERSION = "version"; + public static final String EVENTS = "events"; public static final String DATA = "data"; public static final String METADATA = "metadata"; public static final String ATTRIBUTES = "attributes"; diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java index 180085bdd8..66672b2d70 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.InputCodec; @@ -21,12 +22,15 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.time.Instant; import java.util.function.Consumer; import java.util.Map; +import java.util.List; import java.util.Objects; /** @@ -34,6 +38,7 @@ */ @DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class, pluginConfigurationType = EventJsonInputCodecConfig.class) public class EventJsonInputCodec implements InputCodec { + private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class); private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); private final JsonFactory jsonFactory = new JsonFactory(); private final Boolean overrideTimeReceived; @@ -50,8 +55,20 @@ public void parse(InputStream inputStream, Consumer> eventConsumer final JsonParser jsonParser = jsonFactory.createParser(inputStream); while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { - if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) { - parseRecordsArray(jsonParser, eventConsumer); + if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { + final Map innerJson = objectMapper.readValue(jsonParser, Map.class); + final List> maps = (List>)innerJson.get(EventJsonDefines.EVENTS); + final String version = (String)innerJson.get(EventJsonDefines.VERSION); + if (!version.equals(DataPrepperVersion.getCurrentVersion().toString())) { + LOG.error("Version mismatch! Current version {} Received data version {}", DataPrepperVersion.getCurrentVersion().toString(), version); + return; + } + for (Map map: maps) { + final Record record = createRecord(map); + if (record != null) { + eventConsumer.accept(record); + } + } } } } @@ -74,7 +91,7 @@ private Record createRecord(final Map innerJson) { if (data == null) { return null; } - if (overrideTimeReceived) { + if (!overrideTimeReceived) { eventMetadata = new DefaultEventMetadata.Builder() .withEventType(EventType.LOG.toString()) .withAttributes(eventMetadata.getAttributes()) diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java index 18f617fa01..f444cd79e6 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; @@ -44,12 +45,17 @@ public String getExtension() { public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException { Objects.requireNonNull(outputStream); generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); + generator.writeStartObject(); + generator.writeFieldName(EventJsonDefines.VERSION); + objectMapper.writeValue(generator, DataPrepperVersion.getCurrentVersion().toString()); + generator.writeFieldName(EventJsonDefines.EVENTS); generator.writeStartArray(); } @Override public void complete(final OutputStream outputStream) throws IOException { generator.writeEndArray(); + generator.writeEndObject(); generator.close(); outputStream.flush(); outputStream.close(); diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index 972d1197dd..03d4469cd4 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -9,13 +9,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; import org.mockito.Mock; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.mockito.Mockito.verifyNoInteractions; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -28,6 +32,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.UUID; +import java.util.function.Consumer; public class EventJsonInputCodecTest { private static final Integer BYTEBUFFER_SIZE = 1024; @@ -41,14 +46,26 @@ public class EventJsonInputCodecTest { @BeforeEach public void setup() { eventJsonInputCodecConfig = mock(EventJsonInputCodecConfig.class); + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(false); } public EventJsonInputCodec createInputCodec() { return new EventJsonInputCodec(eventJsonInputCodecConfig); } + @ParameterizedTest + @ValueSource(strings = {"", "{}"}) + public void emptyTest(String input) throws Exception { + input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes()); + inputCodec = createInputCodec(); + Consumer> consumer = mock(Consumer.class); + inputCodec.parse(inputStream, consumer); + verifyNoInteractions(consumer); + } @Test public void basicTest() throws Exception { + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(true); inputCodec = createInputCodec(); final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); @@ -58,13 +75,13 @@ public void basicTest() throws Exception { Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "["; + String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; comma = ","; } - input += "]"; + input += "]}"; inputStream = new ByteArrayInputStream(input.getBytes()); List> records = new LinkedList<>(); inputCodec.parse(inputStream, records::add); @@ -80,7 +97,6 @@ public void basicTest() throws Exception { @Test public void test_with_timeReceivedOverridden() throws Exception { - when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(true); inputCodec = createInputCodec(); final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); @@ -90,13 +106,13 @@ public void test_with_timeReceivedOverridden() throws Exception { Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "["; + String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; comma = ","; } - input += "]"; + input += "]}"; inputStream = new ByteArrayInputStream(input.getBytes()); List> records = new LinkedList<>(); inputCodec.parse(inputStream, records::add); diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java index 2f929cd5a0..b0224454a5 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java @@ -6,12 +6,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; import org.mockito.Mock; -import static org.mockito.Mockito.verifyNoInteractions; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -30,7 +27,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.function.Consumer; import java.util.stream.Collectors; public class EventJsonInputOutputCodecTest { @@ -58,21 +54,10 @@ public EventJsonOutputCodec createOutputCodec() { } public EventJsonInputCodec createInputCodec() { - when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(false); + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(true); return new EventJsonInputCodec(eventJsonInputCodecConfig); } - @ParameterizedTest - @ValueSource(strings = {"", "[]", "[{}]", "{}"}) - public void emptyTest(String input) throws Exception { - ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes()); - inputCodec = createInputCodec(); - Consumer> consumer = mock(Consumer.class); - inputCodec.parse(inputStream, consumer); - verifyNoInteractions(consumer); - - } - @Test public void basicTest() throws Exception { final String key = UUID.randomUUID().toString(); diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java index 6e54a103eb..51dda545cb 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.codec.event_json; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; @@ -57,13 +58,14 @@ public void basicTest() throws Exception { outputCodec.complete(outputStream); Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String expectedOutput = "["; + //String expectedOutput = "{\"version\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; + String expectedOutput = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { - expectedOutput += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + expectedOutput += comma+"{\""+EventJsonDefines.DATA+"\":"+objectMapper.writeValueAsString(dataMap)+","+"\""+EventJsonDefines.METADATA+"\":"+objectMapper.writeValueAsString(metadataMap)+"}"; comma = ","; } - expectedOutput += "]"; + expectedOutput += "]}"; String output = outputStream.toString(); assertThat(output, equalTo(expectedOutput)); From 4f3b17e097578713d88d54c1b648bcb384e24dc2 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 20 Apr 2024 04:55:17 +0000 Subject: [PATCH 11/14] Fixed code coverage failure Signed-off-by: Krishna Kondaka --- .../codec/event_json/EventJsonInputCodec.java | 11 -------- .../event_json/EventJsonInputCodecTest.java | 25 +++++++++++++++++++ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java index 66672b2d70..c72188f20c 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -73,17 +73,6 @@ public void parse(InputStream inputStream, Consumer> eventConsumer } } - private void parseRecordsArray(final JsonParser jsonParser, final Consumer> eventConsumer) throws IOException { - while (jsonParser.nextToken() != JsonToken.END_ARRAY) { - final Map innerJson = objectMapper.readValue(jsonParser, Map.class); - - final Record record = createRecord(innerJson); - if (record != null) { - eventConsumer.accept(record); - } - } - } - private Record createRecord(final Map innerJson) { Map metadata = (Map)innerJson.get(EventJsonDefines.METADATA); EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class); diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index 03d4469cd4..6c8f66e7d3 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -63,6 +63,31 @@ public void emptyTest(String input) throws Exception { inputCodec.parse(inputStream, consumer); verifyNoInteractions(consumer); } + + @Test + public void invalidVersionTest() throws Exception { + inputCodec = createInputCodec(); + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String input = "{\""+EventJsonDefines.VERSION+"\":\"2.0\", \""+EventJsonDefines.EVENTS+"\":["; + String comma = ""; + for (int i = 0; i < 2; i++) { + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + input += "]}"; + inputStream = new ByteArrayInputStream(input.getBytes()); + List> records = new LinkedList<>(); + inputCodec.parse(inputStream, records::add); + assertThat(records.size(), equalTo(0)); + } + @Test public void basicTest() throws Exception { when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(true); From cf1ae561788d977e544fddb877ad41b769165754 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 22 Apr 2024 17:50:25 +0000 Subject: [PATCH 12/14] Addressed review comments Signed-off-by: Krishna Kondaka --- .../codec/event_json/EventJsonInputCodec.java | 26 +++++++++++++----- .../event_json/EventJsonInputCodecTest.java | 27 ++++++++++++++++++- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java index c72188f20c..cc807670ef 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -48,6 +48,22 @@ public EventJsonInputCodec(final EventJsonInputCodecConfig config) { this.overrideTimeReceived = config.getOverrideTimeReceived(); } + private boolean isCompatibleVersion(Map json) { + final String versionStr = (String)json.get(EventJsonDefines.VERSION); + final String[] version = versionStr.split("[.]"); + final String currentVersionStr = DataPrepperVersion.getCurrentVersion().toString(); + final String[] currentVersion = currentVersionStr.split("[.]"); + if (version.length < 2 || currentVersion.length < 2) { + LOG.error("Invalid Version! Current version {} Received data version {}", currentVersionStr, versionStr); + return false; + } + if (!version[0].equals(currentVersion[0])) { + LOG.error("Version mismatch! Current version {} Received data version {}", currentVersionStr, versionStr); + return false; + } + return true; + } + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { Objects.requireNonNull(inputStream); Objects.requireNonNull(eventConsumer); @@ -57,14 +73,12 @@ public void parse(InputStream inputStream, Consumer> eventConsumer while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { final Map innerJson = objectMapper.readValue(jsonParser, Map.class); - final List> maps = (List>)innerJson.get(EventJsonDefines.EVENTS); - final String version = (String)innerJson.get(EventJsonDefines.VERSION); - if (!version.equals(DataPrepperVersion.getCurrentVersion().toString())) { - LOG.error("Version mismatch! Current version {} Received data version {}", DataPrepperVersion.getCurrentVersion().toString(), version); + if (!isCompatibleVersion(innerJson)) { return; } - for (Map map: maps) { - final Record record = createRecord(map); + final List> events = (List>)innerJson.get(EventJsonDefines.EVENTS); + for (Map eventMap: events) { + final Record record = createRecord(eventMap); if (record != null) { eventConsumer.accept(record); } diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index 6c8f66e7d3..1d823cd13d 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -75,7 +75,32 @@ public void invalidVersionTest() throws Exception { Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\""+EventJsonDefines.VERSION+"\":\"2.0\", \""+EventJsonDefines.EVENTS+"\":["; + String input = "{\""+EventJsonDefines.VERSION+"\":\"2\", \""+EventJsonDefines.EVENTS+"\":["; + String comma = ""; + for (int i = 0; i < 2; i++) { + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + input += "]}"; + inputStream = new ByteArrayInputStream(input.getBytes()); + List> records = new LinkedList<>(); + inputCodec.parse(inputStream, records::add); + assertThat(records.size(), equalTo(0)); + } + + + @Test + public void inCompatibleVersionTest() throws Exception { + inputCodec = createInputCodec(); + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String input = "{\""+EventJsonDefines.VERSION+"\":\"1.0\", \""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; From 8957ec6722e5d71c91bc8cc86de82c7f2786ee22 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 22 Apr 2024 20:32:27 +0000 Subject: [PATCH 13/14] Addressed review comments Signed-off-by: Krishna Kondaka --- .../codec/event_json/EventJsonInputCodec.java | 14 +++++------ .../event_json/EventJsonInputCodecTest.java | 25 ------------------- 2 files changed, 6 insertions(+), 33 deletions(-) diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java index cc807670ef..b440e2ef09 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -51,16 +51,14 @@ public EventJsonInputCodec(final EventJsonInputCodecConfig config) { private boolean isCompatibleVersion(Map json) { final String versionStr = (String)json.get(EventJsonDefines.VERSION); final String[] version = versionStr.split("[.]"); - final String currentVersionStr = DataPrepperVersion.getCurrentVersion().toString(); - final String[] currentVersion = currentVersionStr.split("[.]"); - if (version.length < 2 || currentVersion.length < 2) { - LOG.error("Invalid Version! Current version {} Received data version {}", currentVersionStr, versionStr); - return false; - } - if (!version[0].equals(currentVersion[0])) { - LOG.error("Version mismatch! Current version {} Received data version {}", currentVersionStr, versionStr); + final DataPrepperVersion currentVersion = DataPrepperVersion.getCurrentVersion(); + + final DataPrepperVersion definedVersion = DataPrepperVersion.parse(versionStr); + if(definedVersion.getMajorVersion() != currentVersion.getMajorVersion()) { + LOG.error("Version mismatch! Current version {} Received data version {}", currentVersion, versionStr); return false; } + return true; } diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index 1d823cd13d..67ec486cfa 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -64,31 +64,6 @@ public void emptyTest(String input) throws Exception { verifyNoInteractions(consumer); } - @Test - public void invalidVersionTest() throws Exception { - inputCodec = createInputCodec(); - final String key = UUID.randomUUID().toString(); - final String value = UUID.randomUUID().toString(); - Map data = Map.of(key, value); - Instant startTime = Instant.now(); - Event event = createEvent(data, startTime); - - Map dataMap = event.toMap(); - Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\""+EventJsonDefines.VERSION+"\":\"2\", \""+EventJsonDefines.EVENTS+"\":["; - String comma = ""; - for (int i = 0; i < 2; i++) { - input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; - comma = ","; - } - input += "]}"; - inputStream = new ByteArrayInputStream(input.getBytes()); - List> records = new LinkedList<>(); - inputCodec.parse(inputStream, records::add); - assertThat(records.size(), equalTo(0)); - } - - @Test public void inCompatibleVersionTest() throws Exception { inputCodec = createInputCodec(); From d47a923546e9d1abe845dc44952a050fdcc3cf2c Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 22 Apr 2024 20:46:39 +0000 Subject: [PATCH 14/14] Addressed review comments Signed-off-by: Krishna Kondaka --- .../codec/event_json/EventJsonInputCodec.java | 12 +++--------- .../codec/event_json/EventJsonInputCodecTest.java | 2 +- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java index b440e2ef09..229d6f46c7 100644 --- a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -50,16 +50,10 @@ public EventJsonInputCodec(final EventJsonInputCodecConfig config) { private boolean isCompatibleVersion(Map json) { final String versionStr = (String)json.get(EventJsonDefines.VERSION); - final String[] version = versionStr.split("[.]"); - final DataPrepperVersion currentVersion = DataPrepperVersion.getCurrentVersion(); - - final DataPrepperVersion definedVersion = DataPrepperVersion.parse(versionStr); - if(definedVersion.getMajorVersion() != currentVersion.getMajorVersion()) { - LOG.error("Version mismatch! Current version {} Received data version {}", currentVersion, versionStr); - return false; - } + final DataPrepperVersion version = DataPrepperVersion.parse(versionStr); - return true; + final DataPrepperVersion currentVersion = DataPrepperVersion.getCurrentVersion(); + return currentVersion.compatibleWith(version); } public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index 67ec486cfa..f85d1c6605 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -75,7 +75,7 @@ public void inCompatibleVersionTest() throws Exception { Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\""+EventJsonDefines.VERSION+"\":\"1.0\", \""+EventJsonDefines.EVENTS+"\":["; + String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";