Skip to content

Commit

Permalink
Catch exception instead of shutting down in date processor (opensearc…
Browse files Browse the repository at this point in the history
…h-project#4108) (opensearch-project#4117)

Signed-off-by: Taylor Gray <[email protected]>
(cherry picked from commit 0841ac1)
  • Loading branch information
graytaylor0 authored Feb 12, 2024
1 parent 57911c9 commit 8c9025e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.date;

import io.micrometer.core.instrument.Counter;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -13,7 +15,6 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,7 +28,6 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

@DataPrepperPlugin(name = "date", pluginType = Processor.class, pluginConfigurationType = DateProcessorConfig.class)
public class DateProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand Down Expand Up @@ -62,31 +62,36 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for(final Record<Event> record : records) {

if (Objects.nonNull(dateProcessorConfig.getDateWhen()) && !expressionEvaluator.evaluateConditional(dateProcessorConfig.getDateWhen(), record.getData())) {
continue;
}

String zonedDateTime = null;

if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived()))
zonedDateTime = getDateTimeFromTimeReceived(record);
try {
if (Objects.nonNull(dateProcessorConfig.getDateWhen()) && !expressionEvaluator.evaluateConditional(dateProcessorConfig.getDateWhen(), record.getData())) {
continue;
}

else if (keyToParse != null && !keyToParse.isEmpty()) {
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Instant timeStamp = result.getRight();
if (dateProcessorConfig.getToOriginationMetadata()) {
Event event = (Event)record.getData();
event.getMetadata().setExternalOriginationTime(timeStamp);
event.getEventHandle().setExternalOriginationTime(timeStamp);
String zonedDateTime = null;

if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) {
zonedDateTime = getDateTimeFromTimeReceived(record);

} else if (keyToParse != null && !keyToParse.isEmpty()) {
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Instant timeStamp = result.getRight();
if (dateProcessorConfig.getToOriginationMetadata()) {
Event event = (Event)record.getData();
event.getMetadata().setExternalOriginationTime(timeStamp);
event.getEventHandle().setExternalOriginationTime(timeStamp);
}
}
populateDateProcessorMetrics(zonedDateTime);
}
populateDateProcessorMetrics(zonedDateTime);
}

if (zonedDateTime != null)
record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime);
if (zonedDateTime != null) {
record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime);
}
} catch (final Exception e) {
LOG.error("An exception occurred while attempting to process Event: ", e);
}
}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package org.opensearch.dataprepper.plugins.processor.date;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.apache.commons.lang3.LocaleUtils;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -21,6 +16,11 @@
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Instant;
import java.time.LocalDate;
Expand All @@ -38,6 +38,8 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -419,6 +421,24 @@ void match_without_year_test(String pattern) {
verify(dateProcessingMatchSuccessCounter, times(1)).increment();
}

@Test
void date_processor_catches_exceptions_instead_of_throwing() {
when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString());
when(expressionEvaluator.evaluateConditional(any(String.class), any(Event.class)))
.thenThrow(RuntimeException.class);

dateProcessor = createObjectUnderTest();

final Record<Event> record = buildRecordWithEvent(testData);
final List<Record<Event>> processedRecords = (List<Record<Event>>) dateProcessor.doExecute(Collections.singletonList(record));

assertThat(processedRecords, notNullValue());
assertThat(processedRecords.size(), equalTo(1));
assertThat(processedRecords.get(0), notNullValue());
assertThat(processedRecords.get(0).getData(), notNullValue());
assertThat(processedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap()));
}

static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
Expand Down

0 comments on commit 8c9025e

Please sign in to comment.