diff --git a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java index 5174eb9c61..1c3c9e6d15 100644 --- a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java +++ b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java @@ -82,7 +82,7 @@ public class GrokProcessor extends AbstractProcessor, Record keysToOverwrite; private final ExecutorService executorService; private final List tagsOnMatchFailure; - + private final List tagsOnTimeout; private final ExpressionEvaluator expressionEvaluator; @DataPrepperPluginConstructor @@ -99,6 +99,7 @@ public GrokProcessor(final PluginSetting pluginSetting, final ExpressionEvaluato this.executorService = executorService; this.expressionEvaluator = expressionEvaluator; this.tagsOnMatchFailure = grokProcessorConfig.getTagsOnMatchFailure(); + this.tagsOnTimeout = grokProcessorConfig.getTagsOnTimeout(); grokProcessingMatchCounter = pluginMetrics.counter(GROK_PROCESSING_MATCH); grokProcessingMismatchCounter = pluginMetrics.counter(GROK_PROCESSING_MISMATCH); grokProcessingErrorsCounter = pluginMetrics.counter(GROK_PROCESSING_ERRORS); @@ -131,18 +132,13 @@ public Collection> doExecute(final Collection> recor runWithTimeout(() -> grokProcessingTime.record(() -> matchAndMerge(event))); } - } catch (TimeoutException e) { + } catch (final TimeoutException e) { + event.getMetadata().addTags(tagsOnTimeout); LOG.error(EVENT, "Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokProcessorConfig.getTimeoutMillis()); grokProcessingTimeoutsCounter.increment(); - } catch (ExecutionException e) { - LOG.error(EVENT, "An exception occurred while matching on record [{}]", record.getData(), e); - grokProcessingErrorsCounter.increment(); - } catch (InterruptedException e) { - LOG.error(EVENT, "Matching on record [{}] was interrupted", record.getData(), e); - grokProcessingErrorsCounter.increment(); - } catch (RuntimeException e) { + } catch (final ExecutionException | InterruptedException | RuntimeException e) { event.getMetadata().addTags(tagsOnMatchFailure); - LOG.error(EVENT, "Unknown exception occurred when matching record [{}]", record.getData(), e); + LOG.error(EVENT, "An exception occurred when matching record [{}]", record.getData(), e); grokProcessingErrorsCounter.increment(); } } diff --git a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java index c8ec705ec3..cea6f58c04 100644 --- a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java +++ b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java @@ -23,6 +23,7 @@ public class GrokProcessorConfig { static final String TARGET_KEY = "target_key"; static final String GROK_WHEN = "grok_when"; static final String TAGS_ON_MATCH_FAILURE = "tags_on_match_failure"; + static final String TAGS_ON_TIMEOUT = "tags_on_timeout"; static final boolean DEFAULT_BREAK_ON_MATCH = true; static final boolean DEFAULT_KEEP_EMPTY_CAPTURES = false; @@ -43,6 +44,7 @@ public class GrokProcessorConfig { private final String targetKey; private final String grokWhen; private final List tagsOnMatchFailure; + private final List tagsOnTimeout; private GrokProcessorConfig(final boolean breakOnMatch, final boolean keepEmptyCaptures, @@ -55,7 +57,8 @@ private GrokProcessorConfig(final boolean breakOnMatch, final int timeoutMillis, final String targetKey, final String grokWhen, - final List tagsOnMatchFailure) { + final List tagsOnMatchFailure, + final List tagsOnTimeout) { this.breakOnMatch = breakOnMatch; this.keepEmptyCaptures = keepEmptyCaptures; @@ -69,6 +72,7 @@ private GrokProcessorConfig(final boolean breakOnMatch, this.targetKey = targetKey; this.grokWhen = grokWhen; this.tagsOnMatchFailure = tagsOnMatchFailure; + this.tagsOnTimeout = tagsOnTimeout.isEmpty() ? tagsOnMatchFailure : tagsOnTimeout; } public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting) { @@ -83,7 +87,8 @@ public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting) pluginSetting.getIntegerOrDefault(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), pluginSetting.getStringOrDefault(TARGET_KEY, DEFAULT_TARGET_KEY), pluginSetting.getStringOrDefault(GROK_WHEN, null), - pluginSetting.getTypedList(TAGS_ON_MATCH_FAILURE, String.class)); + pluginSetting.getTypedList(TAGS_ON_MATCH_FAILURE, String.class), + pluginSetting.getTypedList(TAGS_ON_TIMEOUT, String.class)); } public boolean isBreakOnMatch() { @@ -132,4 +137,7 @@ public List getTagsOnMatchFailure() { return tagsOnMatchFailure; } + public List getTagsOnTimeout() { + return tagsOnTimeout; + } } diff --git a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfigTests.java b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfigTests.java index 544ba7b185..7bd3ac662e 100644 --- a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfigTests.java +++ b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfigTests.java @@ -14,6 +14,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -75,6 +76,7 @@ public void testDefault() { assertThat(grokProcessorConfig.getTimeoutMillis(), equalTo(DEFAULT_TIMEOUT_MILLIS)); assertThat(grokProcessorConfig.getGrokWhen(), equalTo(null)); assertThat(grokProcessorConfig.getTagsOnMatchFailure(), equalTo(Collections.emptyList())); + assertThat(grokProcessorConfig.getTagsOnTimeout(), equalTo(Collections.emptyList())); } @Test @@ -148,4 +150,48 @@ private PluginSetting completePluginSettingForGrokProcessor(final boolean breakO return new PluginSetting(PLUGIN_NAME, settings); } + + @Test + void getTagsOnMatchFailure_returns_tagOnMatch() { + final List tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME, + Map.of(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch) + )); + + assertThat(objectUnderTest.getTagsOnMatchFailure(), equalTo(tagsOnMatch)); + } + + @Test + void getTagsOnTimeout_returns_tagsOnMatch_if_no_tagsOnTimeout() { + final List tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME, + Map.of(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch) + )); + + assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnMatch)); + } + + @Test + void getTagsOnTimeout_returns_tagsOnTimeout_if_present() { + final List tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final List tagsOnTimeout = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME, + Map.of( + GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch, + GrokProcessorConfig.TAGS_ON_TIMEOUT, tagsOnTimeout + ) + )); + + assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnTimeout)); + } + + @Test + void getTagsOnTimeout_returns_tagsOnTimeout_if_present_and_no_tagsOnMatch() { + final List tagsOnTimeout = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME, + Map.of(GrokProcessorConfig.TAGS_ON_TIMEOUT, tagsOnTimeout) + )); + + assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnTimeout)); + } } diff --git a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java index e8f7e49982..bb3b6857c7 100644 --- a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java +++ b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java @@ -5,12 +5,6 @@ package org.opensearch.dataprepper.plugins.processor.grok; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -23,9 +17,17 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; import java.util.ArrayList; import java.util.Arrays; @@ -41,9 +43,10 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.hamcrest.Matchers.not; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; @@ -513,30 +516,101 @@ public void testNoCaptures() throws JsonProcessingException { verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter); } - @Test - public void testNoCapturesWithTag() throws JsonProcessingException { - final String tagOnMatchFailure1 = UUID.randomUUID().toString(); - final String tagOnMatchFailure2 = UUID.randomUUID().toString(); - pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2)); - - grokProcessor = createObjectUnderTest(); - lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch); - lenient().when(secondMatch.capture()).thenReturn(secondCapture); - - final Map testData = new HashMap(); - testData.put("message", messageInput); - final Record record = buildRecordWithEvent(testData); - - final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); - - assertThat(grokkedRecords.size(), equalTo(1)); - assertThat(grokkedRecords.get(0), notNullValue()); - assertRecordsAreEqual(grokkedRecords.get(0), record); - assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure1)); - assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure2)); - verify(grokProcessingMismatchCounter, times(1)).increment(); - verify(grokProcessingTime, times(1)).record(any(Runnable.class)); - verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter); + @Nested + class WithTags { + private String tagOnMatchFailure1; + private String tagOnMatchFailure2; + private String tagOnTimeout1; + private String tagOnTimeout2; + + @BeforeEach + void setUp() { + tagOnMatchFailure1 = UUID.randomUUID().toString(); + tagOnMatchFailure2 = UUID.randomUUID().toString(); + tagOnTimeout1 = UUID.randomUUID().toString(); + tagOnTimeout2 = UUID.randomUUID().toString(); + pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2)); + pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_TIMEOUT, List.of(tagOnTimeout1, tagOnTimeout2)); + } + + @Test + public void testNoCapturesWithTag() throws JsonProcessingException { + grokProcessor = createObjectUnderTest(); + lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch); + lenient().when(secondMatch.capture()).thenReturn(secondCapture); + + final Map testData = new HashMap(); + testData.put("message", messageInput); + final Record record = buildRecordWithEvent(testData); + + final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); + + assertThat(grokkedRecords.size(), equalTo(1)); + assertThat(grokkedRecords.get(0), notNullValue()); + assertRecordsAreEqual(grokkedRecords.get(0), record); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1)); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2)); + assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnTimeout1))); + assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnTimeout2))); + verify(grokProcessingMismatchCounter, times(1)).increment(); + verify(grokProcessingTime, times(1)).record(any(Runnable.class)); + verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter); + } + + @Test + public void timeout_exception_tags_the_event() throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException { + when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(TimeoutException.class); + + grokProcessor = createObjectUnderTest(); + + capture.put("key_capture_1", "value_capture_1"); + capture.put("key_capture_2", "value_capture_2"); + capture.put("key_capture_3", "value_capture_3"); + + final Map testData = new HashMap(); + testData.put("message", messageInput); + final Record record = buildRecordWithEvent(testData); + + final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); + + assertThat(grokkedRecords.size(), equalTo(1)); + assertThat(grokkedRecords.get(0), notNullValue()); + assertRecordsAreEqual(grokkedRecords.get(0), record); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnTimeout1)); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnTimeout2)); + assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnMatchFailure1))); + assertThat(record.getData().getMetadata().getTags(), not(hasItem(tagOnMatchFailure2))); + verify(grokProcessingTimeoutsCounter, times(1)).increment(); + verify(grokProcessingTime, times(1)).record(any(Runnable.class)); + verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMismatchCounter); + } + + @ParameterizedTest + @ValueSource(classes = {ExecutionException.class, InterruptedException.class, RuntimeException.class}) + public void execution_exception_tags_the_event(Class exceptionClass) throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException { + when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(exceptionClass); + + grokProcessor = createObjectUnderTest(); + + capture.put("key_capture_1", "value_capture_1"); + capture.put("key_capture_2", "value_capture_2"); + capture.put("key_capture_3", "value_capture_3"); + + final Map testData = new HashMap(); + testData.put("message", messageInput); + final Record record = buildRecordWithEvent(testData); + + final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); + + assertThat(grokkedRecords.size(), equalTo(1)); + assertThat(grokkedRecords.get(0), notNullValue()); + assertRecordsAreEqual(grokkedRecords.get(0), record); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1)); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2)); + verify(grokProcessingErrorsCounter, times(1)).increment(); + verify(grokProcessingTime, times(1)).record(any(Runnable.class)); + verifyNoInteractions(grokProcessingTimeoutsCounter, grokProcessingMismatchCounter); + } } @Test