Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Event Json input and output codecs #4436

Merged
merged 14 commits into from
Apr 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.event;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
import java.util.HashSet;
import java.util.List;
Expand All @@ -23,16 +25,23 @@
*/
public class DefaultEventMetadata implements EventMetadata {

private final String eventType;
@JsonProperty("event_type")
private String eventType;

private final Instant timeReceived;
@JsonProperty("time_received")
private Instant timeReceived;

@JsonProperty("external_origination_time")
private Instant externalOriginationTime;

@JsonProperty("attributes")
private Map<String, Object> attributes;

@JsonProperty("tags")
private Set<String> tags;

private DefaultEventMetadata() {}

private DefaultEventMetadata(final Builder builder) {

checkNotNull(builder.eventType, "eventType cannot be null");
Expand All @@ -45,7 +54,8 @@ private DefaultEventMetadata(final Builder builder) {
this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes);

this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags);
this.externalOriginationTime = null;

this.externalOriginationTime = builder.externalOriginationTime;
}

private DefaultEventMetadata(final EventMetadata eventMetadata) {
Expand Down Expand Up @@ -163,6 +173,7 @@ static EventMetadata fromEventMetadata(final EventMetadata eventMetadata) {
public static class Builder {
private String eventType;
private Instant timeReceived;
private Instant externalOriginationTime;
private Map<String, Object> attributes;
private Set<String> tags;

Expand All @@ -188,6 +199,17 @@ public Builder withTimeReceived(final Instant timeReceived) {
return this;
}

/**
* Sets the external origination Time.
* @param externalOriginationTime the time an event was received
* @return returns the builder
* @since 2.8
*/
public Builder withExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
return this;
}

/**
* Sets the attributes. An empty immutable map is the default value.
* @param attributes a map of key-value pair attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ public void testSetAttribute(String key, final Object value) {
assertThat(eventMetadata.getAttribute(key), equalTo(value));
}

@Test
public void test_with_ExternalOriginationTime() {
Instant now = Instant.now();
eventMetadata = DefaultEventMetadata.builder()
.withEventType(testEventType)
.withTimeReceived(testTimeReceived)
.withExternalOriginationTime(now)
.build();
assertThat(eventMetadata.getExternalOriginationTime(), equalTo(now));
}

@Test
public void testAttributes_without_attributes_is_empty() {
eventMetadata = DefaultEventMetadata.builder()
Expand Down
38 changes: 38 additions & 0 deletions data-prepper-plugins/event-json-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
implementation 'org.apache.parquet:parquet-common:1.13.1'
testImplementation project(':data-prepper-test-common')
}

test {
useJUnitPlatform()
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 0.9
}
}
}
}

check.dependsOn jacocoTestCoverageVerification

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

public class EventJsonDefines {
public static final String VERSION = "version";
public static final String EVENTS = "events";
public static final String DATA = "data";
public static final String METADATA = "metadata";
public static final String ATTRIBUTES = "attributes";
public static final String TAGS = "tags";
public static final String TIME_RECEIVED = "timeReceived";
public static final String EXTERNAL_ORIGINATION_TIME = "externalOriginationTime";
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.function.Consumer;
import java.util.Map;
import java.util.List;
import java.util.Objects;

/**
* An implementation of {@link InputCodec} which parses JSON Objects for arrays.
*/
@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class, pluginConfigurationType = EventJsonInputCodecConfig.class)
public class EventJsonInputCodec implements InputCodec {
private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class);
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
private final JsonFactory jsonFactory = new JsonFactory();
private final Boolean overrideTimeReceived;

@DataPrepperPluginConstructor
public EventJsonInputCodec(final EventJsonInputCodecConfig config) {
this.overrideTimeReceived = config.getOverrideTimeReceived();
}

private boolean isCompatibleVersion(Map<String, Object> json) {
final String versionStr = (String)json.get(EventJsonDefines.VERSION);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DataPrepperVersion class provides a lot of this.

final String versionStr = (String)json.get(EventJsonDefines.VERSION);
final DataPrepperVersion definedVersion = DataPrepperVersion.parse(versionStr);
if(definedVersion.getMajorVersion() != DataPrepperVersion.getCurrentVersion().getMajorVersion()) {
  LOG.error("Version mismatch! Current version {} Received data version {}", DataPrepperVersion.getCurrentVersion(), definedVersion);
            return false;
}

final DataPrepperVersion version = DataPrepperVersion.parse(versionStr);

final DataPrepperVersion currentVersion = DataPrepperVersion.getCurrentVersion();
return currentVersion.compatibleWith(version);
}

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);
if (!isCompatibleVersion(innerJson)) {
return;
}
final List<Map<String, Object>> events = (List<Map<String, Object>>)innerJson.get(EventJsonDefines.EVENTS);
for (Map<String, Object> eventMap: events) {
final Record<Event> record = createRecord(eventMap);
if (record != null) {
eventConsumer.accept(record);
}
}
}
}
}

private Record<Event> createRecord(final Map<String, Object> innerJson) {
Map<String, Object> metadata = (Map<String, Object>)innerJson.get(EventJsonDefines.METADATA);
EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class);
Map<String, Object> data = (Map<String, Object>)innerJson.get(EventJsonDefines.DATA);
if (data == null) {
return null;
}
if (!overrideTimeReceived) {
eventMetadata = new DefaultEventMetadata.Builder()
.withEventType(EventType.LOG.toString())
.withAttributes(eventMetadata.getAttributes())
.withTimeReceived(Instant.now())
.withTags(eventMetadata.getTags())
.withExternalOriginationTime(eventMetadata.getExternalOriginationTime())
.build();
}
final JacksonLog.Builder logBuilder = JacksonLog.builder()
.withData(data)
.withEventMetadata(eventMetadata)
.getThis();
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final Record<Event> record = new Record<>(event);
return record;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.annotation.JsonProperty;

public class EventJsonInputCodecConfig {
@JsonProperty("override_time_received")
private Boolean overrideTimeReceived = false;

public Boolean getOverrideTimeReceived() {
return overrideTimeReceived;
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;

@DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class)
public class EventJsonOutputCodec implements OutputCodec {
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
static final String EVENT_JSON = "event_json";
private static final JsonFactory factory = new JsonFactory();
private final EventJsonOutputCodecConfig config;
private JsonGenerator generator;
private OutputCodecContext codecContext;

@DataPrepperPluginConstructor
public EventJsonOutputCodec(final EventJsonOutputCodecConfig config) {
this.config = config;
}

@Override
public String getExtension() {
return EVENT_JSON;
}

@Override
public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException {
Objects.requireNonNull(outputStream);
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
generator.writeStartObject();
generator.writeFieldName(EventJsonDefines.VERSION);
objectMapper.writeValue(generator, DataPrepperVersion.getCurrentVersion().toString());
generator.writeFieldName(EventJsonDefines.EVENTS);
generator.writeStartArray();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep the writeStartObject.

This could allow us to also include metadata such as the version.

e.g.

{
  "metadata" : {"version" : "2.8"},
  "events" : [
    {"data":...,"metadata"},
    {"data":...,"metadata"},
    {"data":...,"metadata"},
  ]
}

See this code:

generator.writeStartObject();
generator.writeFieldName(config.getKeyName());
generator.writeStartArray();

Though, I don't think we need to make the events key configurable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. But the following should be enough.

{
  "version" : "2.8",
  "events" : [
    {"data":...,"metadata"},
    {"data":...,"metadata"},
    {"data":...,"metadata"},
  ]
}

}

@Override
public void complete(final OutputStream outputStream) throws IOException {
generator.writeEndArray();
generator.writeEndObject();
generator.close();
outputStream.flush();
outputStream.close();
}

@Override
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
generator.writeStartObject();
Objects.requireNonNull(event);
getDataMapToSerialize(event);
generator.flush();
generator.writeEndObject();
}

private Map<String, Object> getDataMapToSerialize(Event event) throws IOException {
Map<String, Object> dataMap = event.toMap();
generator.writeFieldName(EventJsonDefines.DATA);
objectMapper.writeValue(generator, dataMap);
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
generator.writeFieldName(EventJsonDefines.METADATA);
objectMapper.writeValue(generator, metadataMap);
return dataMap;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

public class EventJsonOutputCodecConfig {
}

Loading
Loading