diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java index d0808053b2..2240f36630 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.date; +import io.micrometer.core.instrument.Counter; +import org.apache.commons.lang3.tuple.Pair; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; @@ -13,7 +15,6 @@ import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; -import io.micrometer.core.instrument.Counter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +28,6 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; @DataPrepperPlugin(name = "date", pluginType = Processor.class, pluginConfigurationType = DateProcessorConfig.class) public class DateProcessor extends AbstractProcessor, Record> { @@ -62,31 +62,36 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date public Collection> doExecute(Collection> records) { for(final Record record : records) { - if (Objects.nonNull(dateProcessorConfig.getDateWhen()) && !expressionEvaluator.evaluateConditional(dateProcessorConfig.getDateWhen(), record.getData())) { - continue; - } - - String zonedDateTime = null; - - if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) - zonedDateTime = getDateTimeFromTimeReceived(record); + try { + if (Objects.nonNull(dateProcessorConfig.getDateWhen()) && !expressionEvaluator.evaluateConditional(dateProcessorConfig.getDateWhen(), record.getData())) { + continue; + } - else if (keyToParse != null && !keyToParse.isEmpty()) { - Pair result = getDateTimeFromMatch(record); - if (result != null) { - zonedDateTime = result.getLeft(); - Instant timeStamp = result.getRight(); - if (dateProcessorConfig.getToOriginationMetadata()) { - Event event = (Event)record.getData(); - event.getMetadata().setExternalOriginationTime(timeStamp); - event.getEventHandle().setExternalOriginationTime(timeStamp); + String zonedDateTime = null; + + if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) { + zonedDateTime = getDateTimeFromTimeReceived(record); + + } else if (keyToParse != null && !keyToParse.isEmpty()) { + Pair result = getDateTimeFromMatch(record); + if (result != null) { + zonedDateTime = result.getLeft(); + Instant timeStamp = result.getRight(); + if (dateProcessorConfig.getToOriginationMetadata()) { + Event event = (Event)record.getData(); + event.getMetadata().setExternalOriginationTime(timeStamp); + event.getEventHandle().setExternalOriginationTime(timeStamp); + } } + populateDateProcessorMetrics(zonedDateTime); } - populateDateProcessorMetrics(zonedDateTime); - } - if (zonedDateTime != null) - record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime); + if (zonedDateTime != null) { + record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime); + } + } catch (final Exception e) { + LOG.error("An exception occurred while attempting to process Event: ", e); + } } return records; } diff --git a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java index ce3906a635..7299aacb35 100644 --- a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java +++ b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java @@ -5,11 +5,6 @@ package org.opensearch.dataprepper.plugins.processor.date; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; import io.micrometer.core.instrument.Counter; import org.apache.commons.lang3.LocaleUtils; import org.junit.jupiter.api.AfterEach; @@ -21,6 +16,11 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; import java.time.Instant; import java.time.LocalDate; @@ -38,6 +38,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -419,6 +421,24 @@ void match_without_year_test(String pattern) { verify(dateProcessingMatchSuccessCounter, times(1)).increment(); } + @Test + void date_processor_catches_exceptions_instead_of_throwing() { + when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString()); + when(expressionEvaluator.evaluateConditional(any(String.class), any(Event.class))) + .thenThrow(RuntimeException.class); + + dateProcessor = createObjectUnderTest(); + + final Record record = buildRecordWithEvent(testData); + final List> processedRecords = (List>) dateProcessor.doExecute(Collections.singletonList(record)); + + assertThat(processedRecords, notNullValue()); + assertThat(processedRecords.size(), equalTo(1)); + assertThat(processedRecords.get(0), notNullValue()); + assertThat(processedRecords.get(0).getData(), notNullValue()); + assertThat(processedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap())); + } + static Record buildRecordWithEvent(final Map data) { return new Record<>(JacksonEvent.builder() .withData(data)