forked from opensearch-project/data-prepper
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support for Event Json input and output codecs (opensearch-project#4436)
* Event Json input and output codecs Signed-off-by: Krishna Kondaka <[email protected]> * Modified test case to check for event metadata attributes Signed-off-by: Krishna Kondaka <[email protected]> * Modified the coverage to 0.9 Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Fixes for failing coverage tests Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Fixed test coverage Signed-off-by: Krishna Kondaka <[email protected]> * Added more tests Signed-off-by: Krishna Kondaka <[email protected]> * Added more tests for coverage Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Fixed code coverage failure Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> --------- Signed-off-by: Krishna Kondaka <[email protected]> Co-authored-by: Krishna Kondaka <[email protected]>
- Loading branch information
1 parent
9f01409
commit 9dffec5
Showing
12 changed files
with
725 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-plugins:common') | ||
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' | ||
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' | ||
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' | ||
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' | ||
implementation 'org.apache.parquet:parquet-common:1.13.1' | ||
testImplementation project(':data-prepper-test-common') | ||
} | ||
|
||
test { | ||
useJUnitPlatform() | ||
} | ||
|
||
jacocoTestCoverageVerification { | ||
dependsOn jacocoTestReport | ||
violationRules { | ||
rule { | ||
limit { | ||
minimum = 0.9 | ||
} | ||
} | ||
} | ||
} | ||
|
||
check.dependsOn jacocoTestCoverageVerification | ||
|
18 changes: 18 additions & 0 deletions
18
...s/src/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonDefines.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.codec.event_json; | ||
|
||
public class EventJsonDefines { | ||
public static final String VERSION = "version"; | ||
public static final String EVENTS = "events"; | ||
public static final String DATA = "data"; | ||
public static final String METADATA = "metadata"; | ||
public static final String ATTRIBUTES = "attributes"; | ||
public static final String TAGS = "tags"; | ||
public static final String TIME_RECEIVED = "timeReceived"; | ||
public static final String EXTERNAL_ORIGINATION_TIME = "externalOriginationTime"; | ||
} | ||
|
||
|
106 changes: 106 additions & 0 deletions
106
...rc/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodec.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.codec.event_json; | ||
|
||
import com.fasterxml.jackson.core.JsonFactory; | ||
import com.fasterxml.jackson.core.JsonParser; | ||
import com.fasterxml.jackson.core.JsonToken; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | ||
|
||
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import org.opensearch.dataprepper.model.codec.InputCodec; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.event.EventType; | ||
import org.opensearch.dataprepper.model.event.EventMetadata; | ||
import org.opensearch.dataprepper.model.event.DefaultEventMetadata; | ||
import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
import org.opensearch.dataprepper.model.log.JacksonLog; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.time.Instant; | ||
import java.util.function.Consumer; | ||
import java.util.Map; | ||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
/** | ||
* An implementation of {@link InputCodec} which parses JSON Objects for arrays. | ||
*/ | ||
@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class, pluginConfigurationType = EventJsonInputCodecConfig.class) | ||
public class EventJsonInputCodec implements InputCodec { | ||
private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class); | ||
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); | ||
private final JsonFactory jsonFactory = new JsonFactory(); | ||
private final Boolean overrideTimeReceived; | ||
|
||
@DataPrepperPluginConstructor | ||
public EventJsonInputCodec(final EventJsonInputCodecConfig config) { | ||
this.overrideTimeReceived = config.getOverrideTimeReceived(); | ||
} | ||
|
||
private boolean isCompatibleVersion(Map<String, Object> json) { | ||
final String versionStr = (String)json.get(EventJsonDefines.VERSION); | ||
final DataPrepperVersion version = DataPrepperVersion.parse(versionStr); | ||
|
||
final DataPrepperVersion currentVersion = DataPrepperVersion.getCurrentVersion(); | ||
return currentVersion.compatibleWith(version); | ||
} | ||
|
||
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException { | ||
Objects.requireNonNull(inputStream); | ||
Objects.requireNonNull(eventConsumer); | ||
|
||
final JsonParser jsonParser = jsonFactory.createParser(inputStream); | ||
|
||
while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { | ||
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) { | ||
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class); | ||
if (!isCompatibleVersion(innerJson)) { | ||
return; | ||
} | ||
final List<Map<String, Object>> events = (List<Map<String, Object>>)innerJson.get(EventJsonDefines.EVENTS); | ||
for (Map<String, Object> eventMap: events) { | ||
final Record<Event> record = createRecord(eventMap); | ||
if (record != null) { | ||
eventConsumer.accept(record); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
private Record<Event> createRecord(final Map<String, Object> innerJson) { | ||
Map<String, Object> metadata = (Map<String, Object>)innerJson.get(EventJsonDefines.METADATA); | ||
EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class); | ||
Map<String, Object> data = (Map<String, Object>)innerJson.get(EventJsonDefines.DATA); | ||
if (data == null) { | ||
return null; | ||
} | ||
if (!overrideTimeReceived) { | ||
eventMetadata = new DefaultEventMetadata.Builder() | ||
.withEventType(EventType.LOG.toString()) | ||
.withAttributes(eventMetadata.getAttributes()) | ||
.withTimeReceived(Instant.now()) | ||
.withTags(eventMetadata.getTags()) | ||
.withExternalOriginationTime(eventMetadata.getExternalOriginationTime()) | ||
.build(); | ||
} | ||
final JacksonLog.Builder logBuilder = JacksonLog.builder() | ||
.withData(data) | ||
.withEventMetadata(eventMetadata) | ||
.getThis(); | ||
final JacksonEvent event = (JacksonEvent)logBuilder.build(); | ||
final Record<Event> record = new Record<>(event); | ||
return record; | ||
} | ||
} |
18 changes: 18 additions & 0 deletions
18
...n/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.codec.event_json; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
||
public class EventJsonInputCodecConfig { | ||
@JsonProperty("override_time_received") | ||
private Boolean overrideTimeReceived = false; | ||
|
||
public Boolean getOverrideTimeReceived() { | ||
return overrideTimeReceived; | ||
} | ||
} | ||
|
||
|
83 changes: 83 additions & 0 deletions
83
...c/main/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodec.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.codec.event_json; | ||
|
||
import com.fasterxml.jackson.core.JsonEncoding; | ||
import com.fasterxml.jackson.core.JsonFactory; | ||
import com.fasterxml.jackson.core.JsonGenerator; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | ||
|
||
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import org.opensearch.dataprepper.model.codec.OutputCodec; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.sink.OutputCodecContext; | ||
|
||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
@DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class) | ||
public class EventJsonOutputCodec implements OutputCodec { | ||
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); | ||
static final String EVENT_JSON = "event_json"; | ||
private static final JsonFactory factory = new JsonFactory(); | ||
private final EventJsonOutputCodecConfig config; | ||
private JsonGenerator generator; | ||
private OutputCodecContext codecContext; | ||
|
||
@DataPrepperPluginConstructor | ||
public EventJsonOutputCodec(final EventJsonOutputCodecConfig config) { | ||
this.config = config; | ||
} | ||
|
||
@Override | ||
public String getExtension() { | ||
return EVENT_JSON; | ||
} | ||
|
||
@Override | ||
public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException { | ||
Objects.requireNonNull(outputStream); | ||
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); | ||
generator.writeStartObject(); | ||
generator.writeFieldName(EventJsonDefines.VERSION); | ||
objectMapper.writeValue(generator, DataPrepperVersion.getCurrentVersion().toString()); | ||
generator.writeFieldName(EventJsonDefines.EVENTS); | ||
generator.writeStartArray(); | ||
} | ||
|
||
@Override | ||
public void complete(final OutputStream outputStream) throws IOException { | ||
generator.writeEndArray(); | ||
generator.writeEndObject(); | ||
generator.close(); | ||
outputStream.flush(); | ||
outputStream.close(); | ||
} | ||
|
||
@Override | ||
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { | ||
generator.writeStartObject(); | ||
Objects.requireNonNull(event); | ||
getDataMapToSerialize(event); | ||
generator.flush(); | ||
generator.writeEndObject(); | ||
} | ||
|
||
private Map<String, Object> getDataMapToSerialize(Event event) throws IOException { | ||
Map<String, Object> dataMap = event.toMap(); | ||
generator.writeFieldName(EventJsonDefines.DATA); | ||
objectMapper.writeValue(generator, dataMap); | ||
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); | ||
generator.writeFieldName(EventJsonDefines.METADATA); | ||
objectMapper.writeValue(generator, metadataMap); | ||
return dataMap; | ||
} | ||
|
||
} |
9 changes: 9 additions & 0 deletions
9
.../java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.codec.event_json; | ||
|
||
public class EventJsonOutputCodecConfig { | ||
} | ||
|
Oops, something went wrong.