Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Event Json input and output codecs #4436

Merged
merged 14 commits into from
Apr 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.event;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
import java.util.HashSet;
import java.util.List;
Expand All @@ -23,16 +25,23 @@
*/
public class DefaultEventMetadata implements EventMetadata {

private final String eventType;
@JsonProperty("event_type")
private String eventType;

private final Instant timeReceived;
@JsonProperty("time_received")
private Instant timeReceived;

@JsonProperty("external_origination_time")
private Instant externalOriginationTime;

@JsonProperty("attributes")
private Map<String, Object> attributes;

@JsonProperty("tags")
private Set<String> tags;

private DefaultEventMetadata() {}

private DefaultEventMetadata(final Builder builder) {

checkNotNull(builder.eventType, "eventType cannot be null");
Expand All @@ -45,7 +54,8 @@ private DefaultEventMetadata(final Builder builder) {
this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes);

this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags);
this.externalOriginationTime = null;

this.externalOriginationTime = builder.externalOriginationTime;
}

private DefaultEventMetadata(final EventMetadata eventMetadata) {
Expand Down Expand Up @@ -163,6 +173,7 @@ static EventMetadata fromEventMetadata(final EventMetadata eventMetadata) {
public static class Builder {
private String eventType;
private Instant timeReceived;
private Instant externalOriginationTime;
private Map<String, Object> attributes;
private Set<String> tags;

Expand All @@ -188,6 +199,17 @@ public Builder withTimeReceived(final Instant timeReceived) {
return this;
}

/**
* Sets the external origination Time.
* @param externalOriginationTime the time an event was received
* @return returns the builder
* @since 2.8
*/
public Builder withExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
return this;
}

/**
* Sets the attributes. An empty immutable map is the default value.
* @param attributes a map of key-value pair attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ public void testSetAttribute(String key, final Object value) {
assertThat(eventMetadata.getAttribute(key), equalTo(value));
}

@Test
public void test_with_ExternalOriginationTime() {
Instant now = Instant.now();
eventMetadata = DefaultEventMetadata.builder()
.withEventType(testEventType)
.withTimeReceived(testTimeReceived)
.withExternalOriginationTime(now)
.build();
assertThat(eventMetadata.getExternalOriginationTime(), equalTo(now));
}

@Test
public void testAttributes_without_attributes_is_empty() {
eventMetadata = DefaultEventMetadata.builder()
Expand Down
38 changes: 38 additions & 0 deletions data-prepper-plugins/event-json-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
implementation 'org.apache.parquet:parquet-common:1.13.1'
testImplementation project(':data-prepper-test-common')
}

test {
useJUnitPlatform()
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 0.9
}
}
}
}

check.dependsOn jacocoTestCoverageVerification

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

public class EventJsonDefines {
public static final String DATA = "data";
public static final String METADATA = "metadata";
public static final String ATTRIBUTES = "attributes";
public static final String TAGS = "tags";
public static final String TIME_RECEIVED = "timeReceived";
public static final String EXTERNAL_ORIGINATION_TIME = "externalOriginationTime";
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.function.Consumer;
import java.util.Map;
import java.util.Objects;

/**
* An implementation of {@link InputCodec} which parses JSON Objects for arrays.
*/
@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class, pluginConfigurationType = EventJsonInputCodecConfig.class)
public class EventJsonInputCodec implements InputCodec {
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
private final JsonFactory jsonFactory = new JsonFactory();
private final Boolean overrideTimeReceived;

@DataPrepperPluginConstructor
public EventJsonInputCodec(final EventJsonInputCodecConfig config) {
this.overrideTimeReceived = config.getOverrideTimeReceived();
}

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
parseRecordsArray(jsonParser, eventConsumer);
}
}
}

private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Record<Event>> eventConsumer) throws IOException {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);

final Record<Event> record = createRecord(innerJson);
if (record != null) {
eventConsumer.accept(record);
}
}
}

private Record<Event> createRecord(final Map<String, Object> innerJson) {
Map<String, Object> metadata = (Map<String, Object>)innerJson.get(EventJsonDefines.METADATA);
EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class);
Map<String, Object> data = (Map<String, Object>)innerJson.get(EventJsonDefines.DATA);
if (data == null) {
return null;
}
if (overrideTimeReceived) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to run this part if override_time_received is false

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean, the overrideTimeReceived flag "true" means it will override with the value from the data and "false" means it will take Instant.now()?

eventMetadata = new DefaultEventMetadata.Builder()
.withEventType(EventType.LOG.toString())
.withAttributes(eventMetadata.getAttributes())
.withTimeReceived(Instant.now())
.withTags(eventMetadata.getTags())
.withExternalOriginationTime(eventMetadata.getExternalOriginationTime())
.build();
}
final JacksonLog.Builder logBuilder = JacksonLog.builder()
.withData(data)
.withEventMetadata(eventMetadata)
.getThis();
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final Record<Event> record = new Record<>(event);
return record;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.annotation.JsonProperty;

public class EventJsonInputCodecConfig {
@JsonProperty("override_time_received")
private Boolean overrideTimeReceived = false;

public Boolean getOverrideTimeReceived() {
return overrideTimeReceived;
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;

@DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class)
public class EventJsonOutputCodec implements OutputCodec {
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
static final String EVENT_JSON = "event_json";
private static final JsonFactory factory = new JsonFactory();
private final EventJsonOutputCodecConfig config;
private JsonGenerator generator;
private OutputCodecContext codecContext;

@DataPrepperPluginConstructor
public EventJsonOutputCodec(final EventJsonOutputCodecConfig config) {
this.config = config;
}

@Override
public String getExtension() {
return EVENT_JSON;
}

@Override
public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException {
Objects.requireNonNull(outputStream);
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
generator.writeStartArray();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep the writeStartObject.

This could allow us to also include metadata such as the version.

e.g.

{
  "metadata" : {"version" : "2.8"},
  "events" : [
    {"data":...,"metadata"},
    {"data":...,"metadata"},
    {"data":...,"metadata"},
  ]
}

See this code:

generator.writeStartObject();
generator.writeFieldName(config.getKeyName());
generator.writeStartArray();

Though, I don't think we need to make the events key configurable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. But the following should be enough.

{
  "version" : "2.8",
  "events" : [
    {"data":...,"metadata"},
    {"data":...,"metadata"},
    {"data":...,"metadata"},
  ]
}

}

@Override
public void complete(final OutputStream outputStream) throws IOException {
generator.writeEndArray();
generator.close();
outputStream.flush();
outputStream.close();
}

@Override
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
generator.writeStartObject();
Objects.requireNonNull(event);
getDataMapToSerialize(event);
generator.flush();
generator.writeEndObject();
}

private Map<String, Object> getDataMapToSerialize(Event event) throws IOException {
Map<String, Object> dataMap = event.toMap();
generator.writeFieldName(EventJsonDefines.DATA);
objectMapper.writeValue(generator, dataMap);
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
generator.writeFieldName(EventJsonDefines.METADATA);
objectMapper.writeValue(generator, metadataMap);
return dataMap;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

public class EventJsonOutputCodecConfig {
}

Loading
Loading