Skip to content

Commit

Permalink
Modify Key Value processor to support string literal grouping (#4599)
Browse files Browse the repository at this point in the history
* Key Value Processor fixes

Signed-off-by: Krishna Kondaka <[email protected]>

* New options to KV processor

Signed-off-by: Krishna Kondaka <[email protected]>

* Add string literal support

Signed-off-by: Krishna Kondaka <[email protected]>

* Remove unnecessary changes

Signed-off-by: Krishna Kondaka <[email protected]>

* Remove unnecessary changes

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Jun 4, 2024
1 parent 7d15115 commit 530be53
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
final String delimiterBracketCheck = "[\\[\\]()<>]";
private final Set<Character> bracketSet = Set.of('[', ']', '(', ')', '<', '>');
private final List<String> tagsOnFailure;
private final Character stringLiteralCharacter;

@DataPrepperPluginConstructor
public KeyValueProcessor(final PluginMetrics pluginMetrics,
Expand All @@ -73,6 +74,8 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics,
super(pluginMetrics);
this.keyValueProcessorConfig = keyValueProcessorConfig;

this.stringLiteralCharacter = keyValueProcessorConfig.getStringLiteralCharacter();

tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure();

if (keyValueProcessorConfig.getFieldDelimiterRegex() != null
Expand Down Expand Up @@ -163,7 +166,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics,
}

validateKeySets(includeKeysSet, excludeKeysSet, defaultValuesSet);

if (!validTransformOptionSet.contains(keyValueProcessorConfig.getTransformKey())) {
throw new IllegalArgumentException(String.format("The transform_key value: %s is not a valid option", keyValueProcessorConfig.getTransformKey()));
}
Expand Down Expand Up @@ -261,11 +264,14 @@ public int skipGroup(final String str, int idx, final Character endChar) {
i++;
continue;
} else if (str.charAt(i) == endChar) {
return i-1;
return i;
} else
i++;
}
throw new RuntimeException("Bad Input, no end character found in "+str+" after index " + idx +", expected end char = "+endChar);
if (keyValueProcessorConfig.isStrictGroupingEnabled()) {
throw new RuntimeException("Bad Input, no end character found in "+str+" after index " + idx +", expected end char = "+endChar);
}
return i-1;
}

private void addPart(List<String> parts, final String str, final int start, final int end) {
Expand All @@ -279,7 +285,8 @@ public int findInStartGroup(final String str, int idx) {
for (int j = 0; j < startGroupStrings.length; j++) {
try {
if (startGroupStrings[j].equals(str.substring(idx, idx+startGroupStrings[j].length()))) {
if (j <= 1 && idx > 0 && str.charAt(idx-1) != '\\') {
// For " and ', make sure, it's not escaped
if (j <= 1 && (idx == 0 || str.charAt(idx-1) != '\\')) {
return j;
} else if (j > 1) {
return j;
Expand All @@ -291,7 +298,7 @@ public int findInStartGroup(final String str, int idx) {
}
return -1;
}

private List<String> parseWithValueGrouping(String str) {
String fieldDelimiter = keyValueProcessorConfig.getFieldSplitCharacters();
Set<Character> fieldDelimiterSet = new HashSet<>();
Expand All @@ -308,10 +315,22 @@ private List<String> parseWithValueGrouping(String str) {
i++;
continue;
}

int groupIndex = findInStartGroup(str, i);
boolean skippedGroup = false;
if (groupIndex >= 0) {
i = skipGroup(str, i+1, endGroupChars[groupIndex])+2;
} else if (fieldDelimiterSet.contains(str.charAt(i))) {
String[] s = keyValueDelimiterPattern.split(str.substring(start,i+1));
// Only handle Grouping patterns in the values, not keys
if (s.length > 1 || startGroupStrings[groupIndex].charAt(0) == stringLiteralCharacter) {
i = skipGroup(str, i+1, endGroupChars[groupIndex]);
skippedGroup = true;
}
}
if (fieldDelimiterSet.contains(str.charAt(i))) {
// If end of group character is same as field delimiter, then include that in the value if value grouping is done
if (skippedGroup) {
i++;
}
addPart(parts, str, start, i);
i++;
start = i;
Expand All @@ -322,7 +341,7 @@ private List<String> parseWithValueGrouping(String str) {
if (start != i) {
addPart(parts, str, start, i);
}

return parts;
}

Expand Down Expand Up @@ -460,7 +479,7 @@ private ObjectNode recurse(final String input, final ObjectMapper mapper) {
valueEnd = pair.length() - 1;
valueString = pair.substring(valueStart, valueEnd).stripLeading();
JsonNode child = ((ObjectNode) root).put(keyString, recurse(valueString, mapper));
}
}
} else {
valueString = pair.substring(valueStart).stripLeading();
ObjectNode child = ((ObjectNode)root).put(keyString, valueString);
Expand All @@ -484,15 +503,30 @@ private Map<String, Object> createRecursedMap(JsonNode node, ObjectMapper mapper
return mapper.convertValue(node, new TypeReference<HashMap<String, Object>>() {});
}

private boolean isIgnoredGroup(String group) {
// If a group starts and ends with stringLiteralCharacter,
// treat the entire group as key with null as the value
return stringLiteralCharacter != null &&
group.charAt(0) == stringLiteralCharacter &&
group.charAt(group.length()-1) == stringLiteralCharacter;
}

private Map<String, Object> createNonRecursedMap(String[] groups) {
Map<String, Object> nonRecursedMap = new LinkedHashMap<>();
List<Object> valueList;

for(final String group : groups) {
if (isIgnoredGroup(group)) {
if (validKeyAndValue(group, null)) {
nonRecursedMap.put(group, null);
}
continue;
}

final String[] terms = keyValueDelimiterPattern.split(group, 2);
String key = terms[0];
Object value;

if (terms.length == 2) {
value = terms[1];
} else {
Expand All @@ -508,7 +542,9 @@ private Map<String, Object> createNonRecursedMap(String[] groups) {
} else {
valueList = new ArrayList<Object>();
valueList.add(existingValue);
nonRecursedMap.put(key, valueList);
if (validKeyAndValue(key, valueList)) {
nonRecursedMap.put(key, valueList);
}
}

if (keyValueProcessorConfig.getSkipDuplicateValues()) {
Expand All @@ -519,7 +555,9 @@ private Map<String, Object> createNonRecursedMap(String[] groups) {
valueList.add(value);
}
} else {
nonRecursedMap.put(key, value);
if (validKeyAndValue(key, value)) {
nonRecursedMap.put(key, value);
}
}
}

Expand Down Expand Up @@ -581,20 +619,19 @@ private Map<String, Object> executeConfigs(Map<String, Object> map) {
LOG.debug("Skipping already included default key: '{}'", pair.getKey());
continue;
}
processed.put(pair.getKey(), pair.getValue());
if (validKeyAndValue(pair.getKey(), pair.getValue())) {
processed.put(pair.getKey(), pair.getValue());
}
}

if (keyValueProcessorConfig.getDropKeysWithNoValue()) {
processed.entrySet().removeIf(entry -> entry.getValue() == null);
}
return processed;
}

private String[] trimWhitespace(String key, Object value) {
String[] arr = {key.stripTrailing(), value.toString().stripLeading()};
return arr;
}

private String transformKey(String key) {
if (keyValueProcessorConfig.getTransformKey().equals(lowercaseKey)) {
key = key.toLowerCase();
Expand All @@ -606,9 +643,24 @@ private String transformKey(String key) {
return key;
}

private boolean validKeyAndValue(String key, Object value) {
if (key == null || key.isEmpty()) {
return false;
}

if (keyValueProcessorConfig.getDropKeysWithNoValue() && value == null) {
return false;
}
return true;
}

private void addKeyValueToMap(final Map<String, Object> parsedMap, final String key, Object value) {
Object processedValue = value;

if (!validKeyAndValue(key, value)) {
return;
}

if (value instanceof List) {
List<?> valueAsList = (List<?>) value;
if (valueAsList.size() == 1) {
Expand Down Expand Up @@ -646,8 +698,12 @@ private void addKeyValueToMap(final Map<String, Object> parsedMap, final String

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
try {
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
}
} catch (IllegalArgumentException e) {
LOG.warn("Failed to put key: "+entry.getKey()+" value : "+entry.getValue()+" into event. ", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -111,15 +112,40 @@ public class KeyValueProcessorConfig {
@JsonProperty("key_value_when")
private String keyValueWhen;

@JsonProperty("strict_grouping")
private boolean strictGrouping = false;

@JsonProperty("string_literal_character")
@Size(min = 0, max = 1, message = "string_literal_character may only have character")
private String stringLiteralCharacter = null;

@AssertTrue(message = "Invalid Configuration. value_grouping option and field_delimiter_regex are mutually exclusive")
boolean isValidValueGroupingAndFieldDelimiterRegex() {
return (!valueGrouping || fieldDelimiterRegex == null);
}

@AssertTrue(message = "Invalid Configuration. String literal character config is valid only when value_grouping is enabled, and only double quote (\") and single quote are (') are valid string literal characters.")
boolean isValidStringLiteralConfig() {
if (stringLiteralCharacter == null)
return true;
if ((!stringLiteralCharacter.equals("\"") &&
(!stringLiteralCharacter.equals("'"))))
return false;
return valueGrouping;
}

public String getSource() {
return source;
}

public Character getStringLiteralCharacter() {
return stringLiteralCharacter == null ? null : stringLiteralCharacter.charAt(0);
}

public boolean isStrictGroupingEnabled() {
return strictGrouping;
}

public String getDestination() {
return destination;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
void setup() {
final KeyValueProcessorConfig defaultConfig = new KeyValueProcessorConfig();
lenient().when(mockConfig.getSource()).thenReturn(defaultConfig.getSource());
lenient().when(mockConfig.getStringLiteralCharacter()).thenReturn(null);
lenient().when(mockConfig.getDestination()).thenReturn(defaultConfig.getDestination());
lenient().when(mockConfig.getFieldDelimiterRegex()).thenReturn(defaultConfig.getFieldDelimiterRegex());
lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(defaultConfig.getFieldSplitCharacters());
Expand Down Expand Up @@ -187,6 +188,7 @@ void testDropKeysWithNoValue() {
@MethodSource("getKeyValueGroupingTestdata")
void testMultipleKvToObjectKeyValueProcessorWithValueGrouping(String fieldDelimiters, String input, Map<String, Object> expectedResultMap) {
lenient().when(mockConfig.getValueGrouping()).thenReturn(true);
lenient().when(mockConfig.getStringLiteralCharacter()).thenReturn('\"');
lenient().when(mockConfig.getDropKeysWithNoValue()).thenReturn(true);
lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(fieldDelimiters);
final KeyValueProcessor objectUnderTest = createObjectUnderTest();
Expand All @@ -208,17 +210,66 @@ private static Stream<Arguments> getKeyValueGroupingTestdata() {
Arguments.of(", ", "key1=value1, key2=value2", Map.of("key1", "value1", "key2", "value2")),
Arguments.of(", ", "key1=It\\'sValue1, key2=value2", Map.of("key1", "It\\'sValue1", "key2", "value2")),
Arguments.of(", ", "text1 text2 key1=value1, key2=value2 text3 text4", Map.of("key1", "value1", "key2", "value2")),
Arguments.of(", ", "text1 text2 foo key1=value1 url=http://foo.com?bar=text,text&foo=zoo bar k2=\"http://bar.com?a=b&c=foo bar\" barr", Map.of("key1", "value1", "url", "http://foo.com?bar=text,text&foo=zoo", "k2", "\"http://bar.com?a=b&c=foo bar\"")),
Arguments.of(", ", "text1 text2 foo key1=value1 url=http://foo.com?bar=text,text&foo=zoo bar k2=\"http://bar.com?a=b&c=foo bar\" barr", Map.of("key1", "value1", "url", "http://foo.com?bar=text,text&foo=zoo", "k2", "\"http://bar.com?a=b&c=foo bar\"")),
Arguments.of(", ", "vendorMessage=VendorMessage(uid=1847060493-1712778523223, feedValue=https://syosetu.org/novel/147705/15.html, bundleId=, linkType=URL, vendor=DOUBLEVERIFY, platform=DESKTOP, deviceTypeId=1, bidCount=6, appStoreTld=, feedSource=DSP, regions=[APAC], timestamp=1712778523223, externalId=)", Map.of("vendorMessage", "VendorMessage(uid=1847060493-1712778523223, feedValue=https://syosetu.org/novel/147705/15.html, bundleId=, linkType=URL, vendor=DOUBLEVERIFY, platform=DESKTOP, deviceTypeId=1, bidCount=6, appStoreTld=, feedSource=DSP, regions=[APAC], timestamp=1712778523223, externalId=)")),
Arguments.of(", ()", "foo bar(key1=value1, key2=value2, key3=)", Map.of("key1", "value1", "key2", "value2", "key3","")),
Arguments.of(", ", "foo bar(key1=value1, key2=value2, key3=)", Map.of("bar(key1", "value1", "key2", "value2", "key3",")")),
Arguments.of(", ", "foo bar[key1=value1, key2=value2, key3=]", Map.of("bar[key1", "value1", "key2", "value2", "key3","]")),
Arguments.of(", ", "foo bar{key1=value1, key2=value2, key3=}", Map.of("bar{key1", "value1", "key2", "value2", "key3","}")),
Arguments.of(", ", "key1 \"key2=val2\" key3=\"value3,value4\"", Map.of("key3", "\"value3,value4\"")),
Arguments.of(", ", "key1=[value1,value2], key3=value3", Map.of("key1", "[value1,value2]", "key3", "value3")),
Arguments.of(", ", "key1=(value1, value2), key3=value3", Map.of("key1", "(value1, value2)", "key3", "value3")),
Arguments.of(", ", "key1=<value1 ,value2>, key3=value3", Map.of("key1", "<value1 ,value2>", "key3", "value3")),
Arguments.of(", ", "key1={value1,value2}, key3=value3", Map.of("key1", "{value1,value2}", "key3", "value3")),
Arguments.of(", ", "key1='value1,value2', key3=value3", Map.of("key1", "'value1,value2'", "key3", "value3")),
Arguments.of(", ", "foo key1=val1, key2=val2,key3=val3 bar", Map.of("key1", "val1", "key2", "val2", "key3", "val3")),
Arguments.of(", ", "foo,key1=(val1,key2=val2,val3),key4=val4 bar", Map.of("key1", "(val1,key2=val2,val3)", "key4", "val4")),
Arguments.of(", ", "foo,key1=(val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "(val1,key2=val2,val3,key4=val4 bar")),

Arguments.of(", ", "foo,key1=[val1,key2=val2,val3],key4=val4 bar", Map.of("key1", "[val1,key2=val2,val3]", "key4", "val4")),
Arguments.of(", ", "foo,key1=[val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "[val1,key2=val2,val3,key4=val4 bar")),

Arguments.of(", ", "foo,key1={val1,key2=val2,val3},key4=val4 bar", Map.of("key1", "{val1,key2=val2,val3}", "key4", "val4")),
Arguments.of(", ", "foo,key1={val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "{val1,key2=val2,val3,key4=val4 bar")),

Arguments.of(", ", "foo,key1=<val1,key2=val2,val3>,key4=val4 bar", Map.of("key1", "<val1,key2=val2,val3>", "key4", "val4")),
Arguments.of(", ", "foo,key1=<val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "<val1,key2=val2,val3,key4=val4 bar")),

Arguments.of(", ", "foo,key1=\"val1,key2=val2,val3\",key4=val4 bar", Map.of("key1", "\"val1,key2=val2,val3\"", "key4", "val4")),
Arguments.of(", ", "foo,key1=\"val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "\"val1,key2=val2,val3,key4=val4 bar")),

Arguments.of(", ", "foo,key1='val1,key2=val2,val3',key4=val4 bar", Map.of("key1", "'val1,key2=val2,val3'", "key4", "val4")),
Arguments.of(", ", "foo,key1='val1,key2=val2,val3,key4=val4 bar", Map.of("key1", "'val1,key2=val2,val3,key4=val4 bar")),

Arguments.of(", ", "foo \"key1=key2 bar\" key2=val2 baz", Map.of("key2", "val2")),
Arguments.of(", ", "foo key1=https://bar.baz/?key2=val2&url=https://quz.fred/ bar", Map.of("key1","https://bar.baz/?key2=val2&url=https://quz.fred/")),
Arguments.of(", ", "foo key1=\"bar \" qux\" fred", Map.of("key1", "\"bar \"")),
Arguments.of(", ", "foo key1=\"bar \\\" qux\" fred", Map.of("key1", "\"bar \\\" qux\"")),

Arguments.of(", ", "key1=\"value1,value2\", key3=value3", Map.of("key1", "\"value1,value2\"", "key3", "value3"))
);
}

@ParameterizedTest
@ValueSource(strings = {"\"", "'"})
void testStringLiteralCharacter(String literalString) {
when(mockConfig.getDestination()).thenReturn(null);
String message = literalString+"ignore this "+literalString+" key1=value1&key2=value2 "+literalString+"ignore=this&too"+literalString;
lenient().when(mockConfig.getStringLiteralCharacter()).thenReturn(literalString.charAt(0));
lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(" &");
lenient().when(mockConfig.getValueGrouping()).thenReturn(true);
final Record<Event> record = getMessage(message);
keyValueProcessor = createObjectUnderTest();
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));

final Event event = editedRecords.get(0).getData();
assertThat(event.containsKey("parsed_message"), is(false));

assertThat(event.containsKey("key1"), is(true));
assertThat(event.containsKey("key2"), is(true));
assertThat(event.get("key1", Object.class), is("value1"));
}

@Test
void testWriteToRoot() {
when(mockConfig.getDestination()).thenReturn(null);
Expand Down

0 comments on commit 530be53

Please sign in to comment.