Skip to content

Commit

Permalink
Add join function (opensearch-project#4075)
Browse files Browse the repository at this point in the history
* Add join function

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Feb 5, 2024
1 parent 2f4c8c9 commit 5df1621
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.expression;

import org.opensearch.dataprepper.model.event.Event;

import java.util.HashMap;
import java.util.List;
import javax.inject.Named;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

@Named
public class JoinExpressionFunction implements ExpressionFunction {

private static final String FUNCTION_NAME = "join";

@Override
public String getFunctionName() {
return FUNCTION_NAME;
}

@Override
public Object evaluate(final List<Object> args, Event event, Function<Object, Object> convertLiteralType) {
if (args.isEmpty() || args.size() > 2) {
throw new IllegalArgumentException(FUNCTION_NAME + "() takes one or two arguments");
}

final List<String> argStrings;
try {
argStrings = args.stream()
.map(arg -> ((String)arg).trim())
.collect(Collectors.toList());
} catch (Exception e) {
throw new IllegalArgumentException(
"Arguments in " + FUNCTION_NAME + "() function should be of Json Pointer type or String type");
}

final String delimiter;
final String sourceKey;
if (argStrings.size() == 2) {
final String trimmedDelimiter = argStrings.get(0).substring(1, argStrings.get(0).length() - 1);

// remove slashes used to escape comma
delimiter = trimmedDelimiter.replace("\\\\,", ",");
sourceKey = argStrings.get(1);
} else {
delimiter = ",";
sourceKey = argStrings.get(0);
}

try {
final List<Object> sourceList = event.get(sourceKey, List.class);
return joinList(sourceList, delimiter);
} catch (Exception e) {
try {
final Map<String, Object> sourceMap = event.get(sourceKey, Map.class);
return joinListsInMap(sourceMap, delimiter);
} catch (Exception ex) {
throw new RuntimeException("Unable to perform join function on " + sourceKey, ex);
}
}
}

private String joinList(final List<Object> sourceList, final String delimiter) {
final List<String> stringList = sourceList.stream()
.map(Object::toString)
.collect(Collectors.toList());
return String.join(delimiter, stringList);
}

private Map<String, Object> joinListsInMap(final Map<String, Object> sourceMap, final String delimiter) {
final Map<String, Object> resultMap = new HashMap<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
try {
final String joinedEntryValue = joinList((List<Object>)entry.getValue(), delimiter);
resultMap.put(entry.getKey(), joinedEntryValue);
} catch (Exception e) {
resultMap.put(entry.getKey(), entry.getValue());
}
}
return resultMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ public Object coercePrimaryTerminalNode(final TerminalNode node, final Event eve
final String functionName = nodeStringValue.substring(0, funcNameIndex);
final int argsEndIndex = nodeStringValue.indexOf(")", funcNameIndex);
final String argsStr = nodeStringValue.substring(funcNameIndex+1, argsEndIndex);
final String[] args = argsStr.split(",");
// Split at commas if there's no backslash before the commas, because commas can be part of a function parameter
final String[] args = argsStr.split("(?<!\\\\),");
List<Object> argList = new ArrayList<>();
for (final String arg: args) {
String trimmedArg = arg.trim();
if (trimmedArg.charAt(0) == '/') {
argList.add(trimmedArg);
} else if (trimmedArg.charAt(0) == '"') {
if (trimmedArg.length() < 2 || trimmedArg.charAt(trimmedArg.length()-1) != '"') {
throw new RuntimeException("Invalid string argument. Missing double quote at the end");
throw new RuntimeException("Invalid string argument: check if any argument is missing a closing double quote or contains comma that's not escaped with `\\`.");
}
argList.add(trimmedArg);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.dataprepper.expression;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -28,14 +27,12 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.commons.lang3.RandomStringUtils;

class GenericExpressionEvaluator_StringIT {
class GenericExpressionEvaluator_MultiTypeIT {

private AnnotationConfigApplicationContext applicationContext;

Expand All @@ -46,45 +43,29 @@ void beforeEach() {
applicationContext.refresh();
}

@Test
void testStringExpressionEvaluatorBeanAvailable() {
@ParameterizedTest
@MethodSource("validStringExpressionArguments")
void testStringExpressionEvaluator(final String expression, final Event event, final String expected, final Class expectedClass) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);
assertThat(evaluator, isA(GenericExpressionEvaluator.class));
}

@Test
void testStringExpressionEvaluatorBeanSingleton() {
final GenericExpressionEvaluator instanceA = applicationContext.getBean(GenericExpressionEvaluator.class);
final GenericExpressionEvaluator instanceB = applicationContext.getBean(GenericExpressionEvaluator.class);
assertThat(instanceA, sameInstance(instanceB));
}

@Test
void testParserBeanInstanceOfMultiThreadParser() {
final Parser instance = applicationContext.getBean(Parser.class);
assertThat(instance, instanceOf(MultiThreadParser.class));
}
final String actual = (String)evaluator.evaluate(expression, event);

@Test
void testSingleThreadParserBeanNotSingleton() {
final Parser instanceA = applicationContext.getBean(ParseTreeParser.SINGLE_THREAD_PARSER_NAME, Parser.class);
final Parser instanceB = applicationContext.getBean(ParseTreeParser.SINGLE_THREAD_PARSER_NAME, Parser.class);
assertThat(instanceA, not(sameInstance(instanceB)));
assertThat(actual, is(expected));
assertThat(actual, instanceOf(expectedClass));
}

@ParameterizedTest
@MethodSource("validExpressionArguments")
void testStringExpressionEvaluator(final String expression, final Event event, final String expected, final Class expectedClass) {
@MethodSource("validMapExpressionArguments")
void testMapExpressionEvaluator(final String expression, final Event event, final Map<String, Object> expected) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);

final String actual = (String)evaluator.evaluate(expression, event);
final Map<String, Object> actual = (Map<String, Object>)evaluator.evaluate(expression, event);

assertThat(actual, is(expected));
assertThat(actual, instanceOf(expectedClass));
}

@ParameterizedTest
@MethodSource("validExpressionArguments")
@MethodSource("validStringExpressionArguments")
void testStringExpressionEvaluatorWithMultipleThreads(final String expression, final Event event, final String expected, final Class expectedClass) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);

Expand All @@ -107,9 +88,32 @@ void testStringExpressionEvaluatorWithMultipleThreads(final String expression, f
}
}

@ParameterizedTest
@MethodSource("validMapExpressionArguments")
void testMapExpressionEvaluatorWithMultipleThreads(final String expression, final Event event, final Map<String, Object> expected) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);

final int numberOfThreads = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

List<Map<String, Object>> evaluationResults = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(() -> evaluationResults.add((Map<String, Object>)evaluator.evaluate(expression, event)));
}

await().atMost(5, TimeUnit.SECONDS)
.until(() -> evaluationResults.size() == numberOfThreads);

assertThat(evaluationResults.size(), equalTo(numberOfThreads));
for (Map<String, Object> evaluationResult : evaluationResults) {
assertThat(evaluationResult, equalTo(expected));
}
}

@ParameterizedTest
@MethodSource("exceptionExpressionArguments")
void testArithmeticExpressionEvaluatorInvalidInput(final String expression, final Event event) {
void testExpressionEvaluatorCausesException(final String expression, final Event event) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);
assertThrows(ExpressionEvaluationException.class, () -> evaluator.evaluate(expression, event));
}
Expand All @@ -123,20 +127,44 @@ void testStringExpressionEvaluatorInvalidInput(final String expression, final Ev
assertThat(result, not(instanceOf(expectedClass)));
}

private static Stream<Arguments> validExpressionArguments() {
private static Stream<Arguments> validStringExpressionArguments() {
Random random = new Random();
int testStringLength = random.nextInt(30);
String testString = RandomStringUtils.randomAlphabetic(testStringLength);
String testString2 = RandomStringUtils.randomAlphabetic(testStringLength);
Map<String, Object> attributes = Map.of("strAttr", testString);
String testData = "{\"key\": \"value\"}";
JacksonEvent testEvent = JacksonEvent.builder().withEventType("event").withEventMetadataAttributes(attributes).withData(testData).build();
String testData = "{\"key\": \"value\", \"list\":[\"string\", 1, true]}";
JacksonEvent testEvent = JacksonEvent.builder().withEventType("event").withEventMetadataAttributes(attributes).withData(testData).build();
return Stream.of(
Arguments.of("\""+testString+"\"", event("{}"), testString, String.class),
Arguments.of("/status_message", event("{\"status_message\": \""+testString+"\"}"), testString, String.class),
Arguments.of("\""+testString+"\"+\""+testString2+"\"", event("{}"), testString+testString2, String.class),
Arguments.of("/status_message+/message", event("{\"status_message\": \""+testString+"\", \"message\":\""+testString2+"\"}"), testString+testString2, String.class),
Arguments.of("getMetadata(\"strAttr\")+\""+testString2+"\"+/key", testEvent, testString+testString2+"value", String.class)
Arguments.of("getMetadata(\"strAttr\")+\""+testString2+"\"+/key", testEvent, testString+testString2+"value", String.class),
Arguments.of("join(/list)", testEvent, "string,1,true", String.class),
Arguments.of("join(\"\\\\, \", /list)", testEvent, "string, 1, true", String.class),
Arguments.of("join(\" \", /list)", testEvent, "string 1 true", String.class)
);
}

private static Stream<Arguments> validMapExpressionArguments() {
Event testEvent1 = event("{\"list\":{\"key\": [\"string\", 1, true]}}");
Event testEvent2 = event("{\"list\":{\"key1\": [\"string\", 1, true], \"key2\": [1,2,3], \"key3\": \"value3\"}}");
return Stream.of(
Arguments.of("join(/list)", testEvent1, Map.of("key", "string,1,true")),
Arguments.of("join(\"\\\\, \", /list)", testEvent1, Map.of("key", "string, 1, true")),
Arguments.of("join(\" \", /list)", testEvent1, Map.of("key", "string 1 true")),
Arguments.of("join(/list)", testEvent2, Map.of("key1", "string,1,true", "key2", "1,2,3", "key3", "value3"))
);
}

private static Stream<Arguments> exceptionExpressionArguments() {
return Stream.of(
// Can't mix Numbers and Strings when using operators
Arguments.of("/status + /message", event("{\"status\": 200, \"message\":\"msg\"}")),
// Wrong number of arguments
Arguments.of("join()", event("{\"list\":[\"string\", 1, true]}")),
Arguments.of("join(/list, \" \", \"third_arg\")", event("{\"list\":[\"string\", 1, true]}"))
);
}

Expand All @@ -148,14 +176,8 @@ private static Stream<Arguments> invalidExpressionArguments() {
return Stream.of(
Arguments.of("/missing", event("{}"), String.class),
Arguments.of("/value", event("{\"value\": "+randomInt+"}"), String.class),
Arguments.of("length(/message)", event("{\"message\": \""+testString+"\"}"), String.class)
);
}

private static Stream<Arguments> exceptionExpressionArguments() {
return Stream.of(
// Can't mix Numbers and Strings when using operators
Arguments.of("/status + /message", event("{\"status\": 200, \"message\":\"msg\"}"))
Arguments.of("length(/message)", event("{\"message\": \""+testString+"\"}"), String.class),
Arguments.of("join(/list)", event("{\"list\":{\"key\": [\"string\", 1, true]}}"), String.class)
);
}

Expand Down
Loading

0 comments on commit 5df1621

Please sign in to comment.