-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write json processor #4514
Write json processor #4514
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-plugins:common') | ||
implementation 'io.micrometer:micrometer-core' | ||
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' | ||
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' | ||
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' | ||
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0' | ||
testImplementation project(path: ':data-prepper-test-common') | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,78 @@ | ||||||||||||
/* | ||||||||||||
* Copyright OpenSearch Contributors | ||||||||||||
* SPDX-License-Identifier: Apache-2.0 | ||||||||||||
*/ | ||||||||||||
|
||||||||||||
package org.opensearch.dataprepper.plugins.processor.write_json; | ||||||||||||
|
||||||||||||
import com.fasterxml.jackson.databind.ObjectMapper; | ||||||||||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | ||||||||||||
import io.micrometer.core.instrument.Counter; | ||||||||||||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||||||||||||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||||||||||||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||||||||||||
import org.opensearch.dataprepper.model.plugin.PluginFactory; | ||||||||||||
|
||||||||||||
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; | ||||||||||||
import org.opensearch.dataprepper.model.event.Event; | ||||||||||||
import org.opensearch.dataprepper.model.record.Record; | ||||||||||||
import org.opensearch.dataprepper.model.processor.AbstractProcessor; | ||||||||||||
import org.opensearch.dataprepper.model.processor.Processor; | ||||||||||||
|
||||||||||||
import org.slf4j.Logger; | ||||||||||||
import org.slf4j.LoggerFactory; | ||||||||||||
|
||||||||||||
import java.util.Collection; | ||||||||||||
|
||||||||||||
@DataPrepperPlugin(name = "write_json", pluginType = Processor.class, pluginConfigurationType = WriteJsonProcessorConfig.class) | ||||||||||||
public class WriteJsonProcessor extends AbstractProcessor<Record<Event>, Record<Event>> { | ||||||||||||
private static final Logger LOG = LoggerFactory.getLogger(WriteJsonProcessor.class); | ||||||||||||
private static final String WRITE_JSON_FAILED_COUNTER = "writeJsonFailedCounter"; | ||||||||||||
private final String source; | ||||||||||||
private final Counter writeJsonFailedCounter; | ||||||||||||
private String target; | ||||||||||||
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); | ||||||||||||
|
||||||||||||
@DataPrepperPluginConstructor | ||||||||||||
public WriteJsonProcessor(final WriteJsonProcessorConfig writeJsonProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { | ||||||||||||
super(pluginMetrics); | ||||||||||||
source = writeJsonProcessorConfig.getSource(); | ||||||||||||
target = writeJsonProcessorConfig.getTarget(); | ||||||||||||
if (target == null) { | ||||||||||||
target = source; | ||||||||||||
} | ||||||||||||
Comment on lines
+39
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using Java 9
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @KarstenSchnitter, I will keep that in mind for future. For now, I am keeping it as is. |
||||||||||||
writeJsonFailedCounter = pluginMetrics.counter(WRITE_JSON_FAILED_COUNTER); | ||||||||||||
} | ||||||||||||
|
||||||||||||
@Override | ||||||||||||
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) { | ||||||||||||
for (final Record<Event> record : records) { | ||||||||||||
final Event event = record.getData(); | ||||||||||||
Object value = event.get(source, Object.class); | ||||||||||||
if (value != null) { | ||||||||||||
try { | ||||||||||||
event.put(target, objectMapper.writeValueAsString(value)); | ||||||||||||
} catch (Exception e) { | ||||||||||||
LOG.error(EVENT, "Failed to convert source to json string", e); | ||||||||||||
chenqi0805 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
writeJsonFailedCounter.increment(); | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could add a unit test to cover this case and verify metric is incremented |
||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
return records; | ||||||||||||
} | ||||||||||||
|
||||||||||||
@Override | ||||||||||||
public void prepareForShutdown() { | ||||||||||||
|
||||||||||||
} | ||||||||||||
|
||||||||||||
@Override | ||||||||||||
public boolean isReadyForShutdown() { | ||||||||||||
return true; | ||||||||||||
} | ||||||||||||
|
||||||||||||
@Override | ||||||||||||
public void shutdown() { | ||||||||||||
|
||||||||||||
} | ||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.write_json; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import jakarta.validation.constraints.NotNull; | ||
|
||
public class WriteJsonProcessorConfig { | ||
@JsonProperty("source") | ||
@NotNull | ||
private String source; | ||
|
||
@JsonProperty("target") | ||
private String target; | ||
|
||
public String getSource() { | ||
return source; | ||
} | ||
|
||
public String getTarget() { | ||
return target; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.write_json; | ||
|
||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.opensearch.dataprepper.test.helper.ReflectivelySetField; | ||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
|
||
import java.util.UUID; | ||
|
||
public class WriteJsonProcessorConfigTest { | ||
WriteJsonProcessorConfig writeJsonProcessorConfig; | ||
|
||
@BeforeEach | ||
void setup() { | ||
writeJsonProcessorConfig = new WriteJsonProcessorConfig(); | ||
} | ||
|
||
@Test | ||
public void testDefaults() { | ||
assertThat(writeJsonProcessorConfig.getTarget(), equalTo(null)); | ||
} | ||
|
||
@Test | ||
public void testParameterSets() throws Exception { | ||
String sourceKey = UUID.randomUUID().toString(); | ||
String targetKey = UUID.randomUUID().toString(); | ||
ReflectivelySetField.setField(WriteJsonProcessorConfig.class, writeJsonProcessorConfig, "source", sourceKey); | ||
ReflectivelySetField.setField(WriteJsonProcessorConfig.class, writeJsonProcessorConfig, "target", targetKey); | ||
assertThat(writeJsonProcessorConfig.getSource(), equalTo(sourceKey)); | ||
assertThat(writeJsonProcessorConfig.getTarget(), equalTo(targetKey)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.write_json; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.ValueSource; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.event.JacksonEvent; | ||
import org.opensearch.dataprepper.model.plugin.PluginFactory; | ||
|
||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.mockito.Mockito.when; | ||
import static org.mockito.Mockito.mock; | ||
import org.mockito.Mock; | ||
|
||
import java.util.Map; | ||
import java.util.List; | ||
import java.util.UUID; | ||
|
||
public class WriteJsonProcessorTest { | ||
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); | ||
|
||
@Mock | ||
private WriteJsonProcessorConfig mockConfig; | ||
|
||
@Mock | ||
private PluginFactory pluginFactory; | ||
|
||
@Mock | ||
private PluginMetrics pluginMetrics; | ||
|
||
private WriteJsonProcessor writeJsonProcessor; | ||
|
||
private String sourceKey; | ||
|
||
@BeforeEach | ||
void setup() { | ||
pluginMetrics = mock(PluginMetrics.class); | ||
pluginFactory = mock(PluginFactory.class); | ||
sourceKey = UUID.randomUUID().toString(); | ||
mockConfig = mock(WriteJsonProcessorConfig.class); | ||
when(mockConfig.getSource()).thenReturn(sourceKey); | ||
} | ||
|
||
WriteJsonProcessor createObjectUnderTest() { | ||
return new WriteJsonProcessor(mockConfig, pluginMetrics, pluginFactory); | ||
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(strings = {"", "targetKey"}) | ||
public void testBasic(String target) throws Exception { | ||
String targetKey = (target.equals("")) ? sourceKey : target; | ||
when(mockConfig.getTarget()).thenReturn(targetKey); | ||
Map<String, Object> value1 = Map.of("stringKey", "testString", "intKey", 10); | ||
Map<String, Object> value = Map.of("mapKey", value1, "boolKey", true); | ||
String expectedString1 = "{\"mapKey\":{\"stringKey\":\"testString\",\"intKey\":10},\"boolKey\":true}"; | ||
String expectedString2 = "{\"boolKey\":true,\"mapKey\":{\"stringKey\":\"testString\",\"intKey\":10}}"; | ||
String expectedString3 = "{\"boolKey\":true,\"mapKey\":{\"intKey\":10,\"stringKey\":\"testString\"}}"; | ||
String expectedString4 = "{\"boolKey\":true,\"mapKey\":{\"intKey\":10,\"stringKey\":\"testString\"}}"; | ||
|
||
|
||
Map<String, Object> data = Map.of(sourceKey, value); | ||
Record<Event> record = createRecord(data); | ||
|
||
writeJsonProcessor = createObjectUnderTest(); | ||
final List<Record<Event>> outputRecords = (List<Record<Event>>) writeJsonProcessor.doExecute(List.of(record)); | ||
assertThat(outputRecords.size(), equalTo(1)); | ||
Event event = outputRecords.get(0).getData(); | ||
String expectedResult = objectMapper.writeValueAsString(value); | ||
String targetInEvent = event.get(targetKey, String.class); | ||
assertThat(targetInEvent, equalTo(expectedResult)); | ||
assertTrue(expectedString1.equals(targetInEvent)||expectedString2.equals(targetInEvent)||expectedString3.equals(targetInEvent)||expectedString4.equals(targetInEvent)); | ||
|
||
} | ||
|
||
static Record<Event> createRecord(final Map<String, Object> data) { | ||
return new Record<>(JacksonEvent.builder() | ||
.withData(data) | ||
.withEventType("event") | ||
.build()); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: could just name this
writeJsonEventsFailed
. Counter is a bit redundant.