diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java index 883297d567..fa53cd6ad1 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.model.event; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.time.Instant; import java.util.HashSet; import java.util.List; @@ -23,16 +25,23 @@ */ public class DefaultEventMetadata implements EventMetadata { - private final String eventType; + @JsonProperty("event_type") + private String eventType; - private final Instant timeReceived; + @JsonProperty("time_received") + private Instant timeReceived; + @JsonProperty("external_origination_time") private Instant externalOriginationTime; + @JsonProperty("attributes") private Map attributes; + @JsonProperty("tags") private Set tags; + private DefaultEventMetadata() {} + private DefaultEventMetadata(final Builder builder) { checkNotNull(builder.eventType, "eventType cannot be null"); @@ -45,7 +54,8 @@ private DefaultEventMetadata(final Builder builder) { this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes); this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags); - this.externalOriginationTime = null; + + this.externalOriginationTime = builder.externalOriginationTime; } private DefaultEventMetadata(final EventMetadata eventMetadata) { @@ -163,6 +173,7 @@ static EventMetadata fromEventMetadata(final EventMetadata eventMetadata) { public static class Builder { private String eventType; private Instant timeReceived; + private Instant externalOriginationTime; private Map attributes; private Set tags; @@ -188,6 +199,17 @@ public Builder withTimeReceived(final Instant timeReceived) { return this; } + /** + * Sets the external origination Time. + * @param externalOriginationTime the time an event was received + * @return returns the builder + * @since 2.8 + */ + public Builder withExternalOriginationTime(final Instant externalOriginationTime) { + this.externalOriginationTime = externalOriginationTime; + return this; + } + /** * Sets the attributes. An empty immutable map is the default value. * @param attributes a map of key-value pair attributes diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java index 479e7be0c2..c87bf1a101 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java @@ -134,6 +134,17 @@ public void testSetAttribute(String key, final Object value) { assertThat(eventMetadata.getAttribute(key), equalTo(value)); } + @Test + public void test_with_ExternalOriginationTime() { + Instant now = Instant.now(); + eventMetadata = DefaultEventMetadata.builder() + .withEventType(testEventType) + .withTimeReceived(testTimeReceived) + .withExternalOriginationTime(now) + .build(); + assertThat(eventMetadata.getExternalOriginationTime(), equalTo(now)); + } + @Test public void testAttributes_without_attributes_is_empty() { eventMetadata = DefaultEventMetadata.builder() diff --git a/data-prepper-plugins/event-json-codecs/build.gradle b/data-prepper-plugins/event-json-codecs/build.gradle new file mode 100644 index 0000000000..da900c6a1a --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/build.gradle @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' + implementation 'org.apache.parquet:parquet-common:1.13.1' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { + limit { + minimum = 0.9 + } + } + } +} + +check.dependsOn jacocoTestCoverageVerification + diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java new file mode 100644 index 0000000000..c4d2a25dc0 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +public class EventJsonDefines { + public static final String VERSION = "version"; + public static final String EVENTS = "events"; + public static final String DATA = "data"; + public static final String METADATA = "metadata"; + public static final String ATTRIBUTES = "attributes"; + public static final String TAGS = "tags"; + public static final String TIME_RECEIVED = "timeReceived"; + public static final String EXTERNAL_ORIGINATION_TIME = "externalOriginationTime"; +} + + diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java new file mode 100644 index 0000000000..229d6f46c7 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventType; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.DefaultEventMetadata; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.function.Consumer; +import java.util.Map; +import java.util.List; +import java.util.Objects; + +/** + * An implementation of {@link InputCodec} which parses JSON Objects for arrays. + */ +@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class, pluginConfigurationType = EventJsonInputCodecConfig.class) +public class EventJsonInputCodec implements InputCodec { + private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class); + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + private final JsonFactory jsonFactory = new JsonFactory(); + private final Boolean overrideTimeReceived; + + @DataPrepperPluginConstructor + public EventJsonInputCodec(final EventJsonInputCodecConfig config) { + this.overrideTimeReceived = config.getOverrideTimeReceived(); + } + + private boolean isCompatibleVersion(Map json) { + final String versionStr = (String)json.get(EventJsonDefines.VERSION); + final DataPrepperVersion version = DataPrepperVersion.parse(versionStr); + + final DataPrepperVersion currentVersion = DataPrepperVersion.getCurrentVersion(); + return currentVersion.compatibleWith(version); + } + + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputStream); + Objects.requireNonNull(eventConsumer); + + final JsonParser jsonParser = jsonFactory.createParser(inputStream); + + while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { + if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { + final Map innerJson = objectMapper.readValue(jsonParser, Map.class); + if (!isCompatibleVersion(innerJson)) { + return; + } + final List> events = (List>)innerJson.get(EventJsonDefines.EVENTS); + for (Map eventMap: events) { + final Record record = createRecord(eventMap); + if (record != null) { + eventConsumer.accept(record); + } + } + } + } + } + + private Record createRecord(final Map innerJson) { + Map metadata = (Map)innerJson.get(EventJsonDefines.METADATA); + EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class); + Map data = (Map)innerJson.get(EventJsonDefines.DATA); + if (data == null) { + return null; + } + if (!overrideTimeReceived) { + eventMetadata = new DefaultEventMetadata.Builder() + .withEventType(EventType.LOG.toString()) + .withAttributes(eventMetadata.getAttributes()) + .withTimeReceived(Instant.now()) + .withTags(eventMetadata.getTags()) + .withExternalOriginationTime(eventMetadata.getExternalOriginationTime()) + .build(); + } + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(data) + .withEventMetadata(eventMetadata) + .getThis(); + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + final Record record = new Record<>(event); + return record; + } +} diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java new file mode 100644 index 0000000000..e1ffc37fb3 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class EventJsonInputCodecConfig { + @JsonProperty("override_time_received") + private Boolean overrideTimeReceived = false; + + public Boolean getOverrideTimeReceived() { + return overrideTimeReceived; + } +} + + diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java new file mode 100644 index 0000000000..f444cd79e6 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Objects; + +@DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class) +public class EventJsonOutputCodec implements OutputCodec { + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + static final String EVENT_JSON = "event_json"; + private static final JsonFactory factory = new JsonFactory(); + private final EventJsonOutputCodecConfig config; + private JsonGenerator generator; + private OutputCodecContext codecContext; + + @DataPrepperPluginConstructor + public EventJsonOutputCodec(final EventJsonOutputCodecConfig config) { + this.config = config; + } + + @Override + public String getExtension() { + return EVENT_JSON; + } + + @Override + public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException { + Objects.requireNonNull(outputStream); + generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); + generator.writeStartObject(); + generator.writeFieldName(EventJsonDefines.VERSION); + objectMapper.writeValue(generator, DataPrepperVersion.getCurrentVersion().toString()); + generator.writeFieldName(EventJsonDefines.EVENTS); + generator.writeStartArray(); + } + + @Override + public void complete(final OutputStream outputStream) throws IOException { + generator.writeEndArray(); + generator.writeEndObject(); + generator.close(); + outputStream.flush(); + outputStream.close(); + } + + @Override + public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { + generator.writeStartObject(); + Objects.requireNonNull(event); + getDataMapToSerialize(event); + generator.flush(); + generator.writeEndObject(); + } + + private Map getDataMapToSerialize(Event event) throws IOException { + Map dataMap = event.toMap(); + generator.writeFieldName(EventJsonDefines.DATA); + objectMapper.writeValue(generator, dataMap); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + generator.writeFieldName(EventJsonDefines.METADATA); + objectMapper.writeValue(generator, metadataMap); + return dataMap; + } + +} diff --git a/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecConfig.java b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecConfig.java new file mode 100644 index 0000000000..352580737e --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecConfig.java @@ -0,0 +1,9 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +public class EventJsonOutputCodecConfig { +} + diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java new file mode 100644 index 0000000000..f85d1c6605 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import org.mockito.Mock; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.mockito.Mockito.verifyNoInteractions; + +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; + +import java.io.ByteArrayInputStream; + +import java.time.Instant; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +public class EventJsonInputCodecTest { + private static final Integer BYTEBUFFER_SIZE = 1024; + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + @Mock + private EventJsonInputCodecConfig eventJsonInputCodecConfig; + + private EventJsonInputCodec inputCodec; + private ByteArrayInputStream inputStream; + + @BeforeEach + public void setup() { + eventJsonInputCodecConfig = mock(EventJsonInputCodecConfig.class); + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(false); + } + + public EventJsonInputCodec createInputCodec() { + return new EventJsonInputCodec(eventJsonInputCodecConfig); + } + + @ParameterizedTest + @ValueSource(strings = {"", "{}"}) + public void emptyTest(String input) throws Exception { + input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes()); + inputCodec = createInputCodec(); + Consumer> consumer = mock(Consumer.class); + inputCodec.parse(inputStream, consumer); + verifyNoInteractions(consumer); + } + + @Test + public void inCompatibleVersionTest() throws Exception { + inputCodec = createInputCodec(); + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":["; + String comma = ""; + for (int i = 0; i < 2; i++) { + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + input += "]}"; + inputStream = new ByteArrayInputStream(input.getBytes()); + List> records = new LinkedList<>(); + inputCodec.parse(inputStream, records::add); + assertThat(records.size(), equalTo(0)); + } + + @Test + public void basicTest() throws Exception { + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(true); + inputCodec = createInputCodec(); + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; + String comma = ""; + for (int i = 0; i < 2; i++) { + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + input += "]}"; + inputStream = new ByteArrayInputStream(input.getBytes()); + List> records = new LinkedList<>(); + inputCodec.parse(inputStream, records::add); + assertThat(records.size(), equalTo(2)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + } + } + + @Test + public void test_with_timeReceivedOverridden() throws Exception { + inputCodec = createInputCodec(); + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + Instant startTime = Instant.now().minusSeconds(5); + Event event = createEvent(data, startTime); + + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; + String comma = ""; + for (int i = 0; i < 2; i++) { + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + input += "]}"; + inputStream = new ByteArrayInputStream(input.getBytes()); + List> records = new LinkedList<>(); + inputCodec.parse(inputStream, records::add); + assertThat(records.size(), equalTo(2)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime))); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + } + } + + + private Event createEvent(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(json) + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + + return event; + } +} + diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java new file mode 100644 index 0000000000..b0224454a5 --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java @@ -0,0 +1,164 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import org.mockito.Mock; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; + +import java.time.Instant; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +public class EventJsonInputOutputCodecTest { + private static final Integer BYTEBUFFER_SIZE = 1024; + + private ByteArrayOutputStream outputStream; + + @Mock + private EventJsonOutputCodecConfig eventJsonOutputCodecConfig; + + @Mock + private EventJsonInputCodecConfig eventJsonInputCodecConfig; + + private EventJsonOutputCodec outputCodec; + private EventJsonInputCodec inputCodec; + + @BeforeEach + public void setup() { + outputStream = new ByteArrayOutputStream(BYTEBUFFER_SIZE); + eventJsonInputCodecConfig = mock(EventJsonInputCodecConfig.class); + } + + public EventJsonOutputCodec createOutputCodec() { + return new EventJsonOutputCodec(eventJsonOutputCodecConfig); + } + + public EventJsonInputCodec createInputCodec() { + when(eventJsonInputCodecConfig.getOverrideTimeReceived()).thenReturn(true); + return new EventJsonInputCodec(eventJsonInputCodecConfig); + } + + @Test + public void basicTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + outputCodec = createOutputCodec(); + inputCodec = createInputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + List> records = new LinkedList<>(); + inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); + + assertThat(records.size(), equalTo(1)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + } + } + + @Test + public void multipleEventsTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + outputCodec = createOutputCodec(); + inputCodec = createInputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.writeEvent(event, outputStream); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + List> records = new LinkedList<>(); + inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); + + assertThat(records.size(), equalTo(3)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags().size(), equalTo(0)); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(null)); + } + } + + @Test + public void extendedTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + final String attrKey = UUID.randomUUID().toString(); + final String attrValue = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Set tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + List tagsList = tags.stream().collect(Collectors.toList()); + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + Instant origTime = startTime.minusSeconds(5); + event.getMetadata().setExternalOriginationTime(origTime); + event.getMetadata().addTags(tagsList); + event.getMetadata().setAttribute(attrKey, attrValue); + outputCodec = createOutputCodec(); + inputCodec = createInputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON)); + List> records = new LinkedList<>(); +inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); + + assertThat(records.size(), equalTo(1)); + for(Record record : records) { + Event e = (Event)record.getData(); + assertThat(e.get(key, String.class), equalTo(value)); + assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); + assertThat(e.getMetadata().getTags(), equalTo(tags)); + assertThat(e.getMetadata().getAttributes(), equalTo(Map.of(attrKey, attrValue))); + assertThat(e.getMetadata().getExternalOriginationTime(), equalTo(origTime)); + } + } + + + private Event createEvent(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(json) + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + + return event; + } +} + diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java new file mode 100644 index 0000000000..51dda545cb --- /dev/null +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.codec.event_json; + +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + + +import java.io.ByteArrayOutputStream; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; + +import java.time.Instant; +import java.util.Map; +import java.util.UUID; + +public class EventJsonOutputCodecTest { + private static final Integer BYTEBUFFER_SIZE = 1024; + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + @Mock + private EventJsonOutputCodecConfig eventJsonOutputCodecConfig; + + private EventJsonOutputCodec outputCodec; + private ByteArrayOutputStream outputStream; + + @BeforeEach + public void setup() { + outputStream = new ByteArrayOutputStream(BYTEBUFFER_SIZE); + } + + public EventJsonOutputCodec createOutputCodec() { + return new EventJsonOutputCodec(eventJsonOutputCodecConfig); + } + + @Test + public void basicTest() throws Exception { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + Map data = Map.of(key, value); + + Instant startTime = Instant.now(); + Event event = createEvent(data, startTime); + outputCodec = createOutputCodec(); + outputCodec.start(outputStream, null, null); + outputCodec.writeEvent(event, outputStream); + outputCodec.writeEvent(event, outputStream); + outputCodec.complete(outputStream); + Map dataMap = event.toMap(); + Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); + //String expectedOutput = "{\"version\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; + String expectedOutput = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; + String comma = ""; + for (int i = 0; i < 2; i++) { + expectedOutput += comma+"{\""+EventJsonDefines.DATA+"\":"+objectMapper.writeValueAsString(dataMap)+","+"\""+EventJsonDefines.METADATA+"\":"+objectMapper.writeValueAsString(metadataMap)+"}"; + comma = ","; + } + expectedOutput += "]}"; + String output = outputStream.toString(); + assertThat(output, equalTo(expectedOutput)); + + } + + private Event createEvent(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() + .withData(json) + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); + + return event; + } +} diff --git a/settings.gradle b/settings.gradle index 85727b0f81..ffe34260ee 100644 --- a/settings.gradle +++ b/settings.gradle @@ -116,6 +116,7 @@ include 'data-prepper-plugins:otel-logs-source' include 'data-prepper-plugins:blocking-buffer' include 'data-prepper-plugins:http-source' include 'data-prepper-plugins:drop-events-processor' +include 'data-prepper-plugins:event-json-codecs' include 'data-prepper-plugins:key-value-processor' include 'data-prepper-plugins:mutate-event-processors' include 'data-prepper-plugins:geoip-processor' @@ -169,4 +170,4 @@ include 'data-prepper-plugins:decompress-processor' include 'data-prepper-plugins:split-event-processor' include 'data-prepper-plugins:http-common' include 'data-prepper-plugins:flatten-processor' -include 'data-prepper-plugins:mongodb' \ No newline at end of file +include 'data-prepper-plugins:mongodb'