Skip to content

Commit

Permalink
Tag events that fail for all exceptions in the grok processor. Resolves
Browse files Browse the repository at this point in the history
opensearch-project#4031 (opensearch-project#4032)

Tags events that fail for all exceptions. Resolves opensearch-project#4031

Adds a tags_on_timeout configuration which tags events that timeout differently from those that fail for other reasons. Configure the default behavior of tags_on_timeout to take on the value of tags_on_match_failure.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Jan 30, 2024
1 parent ef103d1 commit 96007d6
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class GrokProcessor extends AbstractProcessor<Record<Event>, Record<Event
private final Set<String> keysToOverwrite;
private final ExecutorService executorService;
private final List<String> tagsOnMatchFailure;

private final List<String> tagsOnTimeout;
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
Expand All @@ -99,6 +99,7 @@ public GrokProcessor(final PluginSetting pluginSetting, final ExpressionEvaluato
this.executorService = executorService;
this.expressionEvaluator = expressionEvaluator;
this.tagsOnMatchFailure = grokProcessorConfig.getTagsOnMatchFailure();
this.tagsOnTimeout = grokProcessorConfig.getTagsOnTimeout();
grokProcessingMatchCounter = pluginMetrics.counter(GROK_PROCESSING_MATCH);
grokProcessingMismatchCounter = pluginMetrics.counter(GROK_PROCESSING_MISMATCH);
grokProcessingErrorsCounter = pluginMetrics.counter(GROK_PROCESSING_ERRORS);
Expand Down Expand Up @@ -131,18 +132,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
runWithTimeout(() -> grokProcessingTime.record(() -> matchAndMerge(event)));
}

} catch (TimeoutException e) {
} catch (final TimeoutException e) {
event.getMetadata().addTags(tagsOnTimeout);
LOG.error(EVENT, "Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokProcessorConfig.getTimeoutMillis());
grokProcessingTimeoutsCounter.increment();
} catch (ExecutionException e) {
LOG.error(EVENT, "An exception occurred while matching on record [{}]", record.getData(), e);
grokProcessingErrorsCounter.increment();
} catch (InterruptedException e) {
LOG.error(EVENT, "Matching on record [{}] was interrupted", record.getData(), e);
grokProcessingErrorsCounter.increment();
} catch (RuntimeException e) {
} catch (final ExecutionException | InterruptedException | RuntimeException e) {
event.getMetadata().addTags(tagsOnMatchFailure);
LOG.error(EVENT, "Unknown exception occurred when matching record [{}]", record.getData(), e);
LOG.error(EVENT, "An exception occurred when matching record [{}]", record.getData(), e);
grokProcessingErrorsCounter.increment();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class GrokProcessorConfig {
static final String TARGET_KEY = "target_key";
static final String GROK_WHEN = "grok_when";
static final String TAGS_ON_MATCH_FAILURE = "tags_on_match_failure";
static final String TAGS_ON_TIMEOUT = "tags_on_timeout";

static final boolean DEFAULT_BREAK_ON_MATCH = true;
static final boolean DEFAULT_KEEP_EMPTY_CAPTURES = false;
Expand All @@ -43,6 +44,7 @@ public class GrokProcessorConfig {
private final String targetKey;
private final String grokWhen;
private final List<String> tagsOnMatchFailure;
private final List<String> tagsOnTimeout;

private GrokProcessorConfig(final boolean breakOnMatch,
final boolean keepEmptyCaptures,
Expand All @@ -55,7 +57,8 @@ private GrokProcessorConfig(final boolean breakOnMatch,
final int timeoutMillis,
final String targetKey,
final String grokWhen,
final List<String> tagsOnMatchFailure) {
final List<String> tagsOnMatchFailure,
final List<String> tagsOnTimeout) {

this.breakOnMatch = breakOnMatch;
this.keepEmptyCaptures = keepEmptyCaptures;
Expand All @@ -69,6 +72,7 @@ private GrokProcessorConfig(final boolean breakOnMatch,
this.targetKey = targetKey;
this.grokWhen = grokWhen;
this.tagsOnMatchFailure = tagsOnMatchFailure;
this.tagsOnTimeout = tagsOnTimeout.isEmpty() ? tagsOnMatchFailure : tagsOnTimeout;
}

public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting) {
Expand All @@ -83,7 +87,8 @@ public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting)
pluginSetting.getIntegerOrDefault(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS),
pluginSetting.getStringOrDefault(TARGET_KEY, DEFAULT_TARGET_KEY),
pluginSetting.getStringOrDefault(GROK_WHEN, null),
pluginSetting.getTypedList(TAGS_ON_MATCH_FAILURE, String.class));
pluginSetting.getTypedList(TAGS_ON_MATCH_FAILURE, String.class),
pluginSetting.getTypedList(TAGS_ON_TIMEOUT, String.class));
}

public boolean isBreakOnMatch() {
Expand Down Expand Up @@ -132,4 +137,7 @@ public List<String> getTagsOnMatchFailure() {
return tagsOnMatchFailure;
}

public List<String> getTagsOnTimeout() {
return tagsOnTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -75,6 +76,7 @@ public void testDefault() {
assertThat(grokProcessorConfig.getTimeoutMillis(), equalTo(DEFAULT_TIMEOUT_MILLIS));
assertThat(grokProcessorConfig.getGrokWhen(), equalTo(null));
assertThat(grokProcessorConfig.getTagsOnMatchFailure(), equalTo(Collections.emptyList()));
assertThat(grokProcessorConfig.getTagsOnTimeout(), equalTo(Collections.emptyList()));
}

@Test
Expand Down Expand Up @@ -148,4 +150,48 @@ private PluginSetting completePluginSettingForGrokProcessor(final boolean breakO

return new PluginSetting(PLUGIN_NAME, settings);
}

@Test
void getTagsOnMatchFailure_returns_tagOnMatch() {
final List<String> tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
Map.of(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch)
));

assertThat(objectUnderTest.getTagsOnMatchFailure(), equalTo(tagsOnMatch));
}

@Test
void getTagsOnTimeout_returns_tagsOnMatch_if_no_tagsOnTimeout() {
final List<String> tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
Map.of(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch)
));

assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnMatch));
}

@Test
void getTagsOnTimeout_returns_tagsOnTimeout_if_present() {
final List<String> tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final List<String> tagsOnTimeout = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
Map.of(
GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch,
GrokProcessorConfig.TAGS_ON_TIMEOUT, tagsOnTimeout
)
));

assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnTimeout));
}

@Test
void getTagsOnTimeout_returns_tagsOnTimeout_if_present_and_no_tagsOnMatch() {
final List<String> tagsOnTimeout = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
Map.of(GrokProcessorConfig.TAGS_ON_TIMEOUT, tagsOnTimeout)
));

assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnTimeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

package org.opensearch.dataprepper.plugins.processor.grok;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -23,9 +17,17 @@
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -41,9 +43,10 @@
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -513,30 +516,101 @@ public void testNoCaptures() throws JsonProcessingException {
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
}

@Test
public void testNoCapturesWithTag() throws JsonProcessingException {
final String tagOnMatchFailure1 = UUID.randomUUID().toString();
final String tagOnMatchFailure2 = UUID.randomUUID().toString();
pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2));

grokProcessor = createObjectUnderTest();
lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch);
lenient().when(secondMatch.capture()).thenReturn(secondCapture);

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure1));
assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure2));
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
@Nested
class WithTags {
private String tagOnMatchFailure1;
private String tagOnMatchFailure2;
private String tagOnTimeout1;
private String tagOnTimeout2;

@BeforeEach
void setUp() {
tagOnMatchFailure1 = UUID.randomUUID().toString();
tagOnMatchFailure2 = UUID.randomUUID().toString();
tagOnTimeout1 = UUID.randomUUID().toString();
tagOnTimeout2 = UUID.randomUUID().toString();
pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2));
pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_TIMEOUT, List.of(tagOnTimeout1, tagOnTimeout2));
}

@Test
public void testNoCapturesWithTag() throws JsonProcessingException {
grokProcessor = createObjectUnderTest();
lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch);
lenient().when(secondMatch.capture()).thenReturn(secondCapture);

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1));
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2));
assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnTimeout1)));
assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnTimeout2)));
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
}

@Test
public void timeout_exception_tags_the_event() throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException {
when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(TimeoutException.class);

grokProcessor = createObjectUnderTest();

capture.put("key_capture_1", "value_capture_1");
capture.put("key_capture_2", "value_capture_2");
capture.put("key_capture_3", "value_capture_3");

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnTimeout1));
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnTimeout2));
assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnMatchFailure1)));
assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnMatchFailure2)));
verify(grokProcessingTimeoutsCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMismatchCounter);
}

@ParameterizedTest
@ValueSource(classes = {ExecutionException.class, InterruptedException.class, RuntimeException.class})
public void execution_exception_tags_the_event(Class<Exception> exceptionClass) throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException {
when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(exceptionClass);

grokProcessor = createObjectUnderTest();

capture.put("key_capture_1", "value_capture_1");
capture.put("key_capture_2", "value_capture_2");
capture.put("key_capture_3", "value_capture_3");

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1));
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2));
verify(grokProcessingErrorsCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingTimeoutsCounter, grokProcessingMismatchCounter);
}
}

@Test
Expand Down

0 comments on commit 96007d6

Please sign in to comment.