Skip to content

Commit

Permalink
Catch exception instead of shutting down in date processor (#4108)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Feb 12, 2024
1 parent 41c4f39 commit 0841ac1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.date;

import io.micrometer.core.instrument.Counter;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -13,23 +15,21 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

@DataPrepperPlugin(name = "date", pluginType = Processor.class, pluginConfigurationType = DateProcessorConfig.class)
public class DateProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand Down Expand Up @@ -69,32 +69,37 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for(final Record<Event> record : records) {

if (Objects.nonNull(dateProcessorConfig.getDateWhen()) && !expressionEvaluator.evaluateConditional(dateProcessorConfig.getDateWhen(), record.getData())) {
continue;
}

String zonedDateTime = null;

if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) {
zonedDateTime = getDateTimeFromTimeReceived(record);
try {
if (Objects.nonNull(dateProcessorConfig.getDateWhen()) && !expressionEvaluator.evaluateConditional(dateProcessorConfig.getDateWhen(), record.getData())) {
continue;
}

} else if (keyToParse != null && !keyToParse.isEmpty()) {
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Instant timeStamp = result.getRight();
if (dateProcessorConfig.getToOriginationMetadata()) {
Event event = (Event)record.getData();
event.getMetadata().setExternalOriginationTime(timeStamp);
event.getEventHandle().setExternalOriginationTime(timeStamp);
String zonedDateTime = null;

if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) {
zonedDateTime = getDateTimeFromTimeReceived(record);

} else if (keyToParse != null && !keyToParse.isEmpty()) {
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Instant timeStamp = result.getRight();
if (dateProcessorConfig.getToOriginationMetadata()) {
Event event = (Event)record.getData();
event.getMetadata().setExternalOriginationTime(timeStamp);
event.getEventHandle().setExternalOriginationTime(timeStamp);
}
}
populateDateProcessorMetrics(zonedDateTime);
}
populateDateProcessorMetrics(zonedDateTime);
}

if (zonedDateTime != null) {
record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime);
if (zonedDateTime != null) {
record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime);
}
} catch (final Exception e) {
LOG.error("An exception occurred while attempting to process Event: ", e);
}

}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package org.opensearch.dataprepper.plugins.processor.date;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.apache.commons.lang3.LocaleUtils;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -19,15 +14,20 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand All @@ -37,13 +37,15 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;
import java.util.Random;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -514,6 +516,24 @@ void match_without_year_test(String pattern) {
verify(dateProcessingMatchSuccessCounter, times(1)).increment();
}

@Test
void date_processor_catches_exceptions_instead_of_throwing() {
when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString());
when(expressionEvaluator.evaluateConditional(any(String.class), any(Event.class)))
.thenThrow(RuntimeException.class);

dateProcessor = createObjectUnderTest();

final Record<Event> record = buildRecordWithEvent(testData);
final List<Record<Event>> processedRecords = (List<Record<Event>>) dateProcessor.doExecute(Collections.singletonList(record));

assertThat(processedRecords, notNullValue());
assertThat(processedRecords.size(), equalTo(1));
assertThat(processedRecords.get(0), notNullValue());
assertThat(processedRecords.get(0).getData(), notNullValue());
assertThat(processedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap()));
}

static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
Expand Down

0 comments on commit 0841ac1

Please sign in to comment.