Skip to content

Commit

Permalink
handle errors when evaluating if conditions in processors (elastic#52543
Browse files Browse the repository at this point in the history
)
  • Loading branch information
danhermann committed Feb 21, 2020
1 parent c5d7d0b commit bb4fc3c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.elasticsearch.ingest.common;

import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
Expand All @@ -31,12 +33,13 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

// Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor
// ends up being copied into this test.
Expand All @@ -56,13 +59,52 @@ protected boolean ignoreExternalCluster() {
public static class CustomScriptPlugin extends MockScriptPlugin {
@Override
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
return Collections.singletonMap("my_script", ctx -> {
return Map.of("my_script", ctx -> {
ctx.put("z", 0);
return null;
}, "throwing_script", ctx -> {
throw new RuntimeException("this script always fails");
});
}
}

public void testFailureInConditionalProcessor() {
internalCluster().ensureAtLeastNumDataNodes(1);
internalCluster().startMasterOnlyNode();
final String pipelineId = "foo";
client().admin().cluster().preparePutPipeline(pipelineId,
new BytesArray("{\n" +
" \"processors\" : [\n" +
" {\"set\" : {\"field\": \"any_field\", \"value\": \"any_value\"}},\n" +
" {\"set\" : {" + "" +
" \"if\" : " + "{\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"throwing_script\"}," +
" \"field\": \"any_field2\"," +
" \"value\": \"any_value2\"}" +
" }\n" +
" ]\n" +
"}"), XContentType.JSON).get();

Exception e = expectThrows(
Exception.class,
() ->
client().prepareIndex("index").setId("1")
.setSource("x", 0)
.setPipeline(pipelineId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get()
);
assertTrue(e.getMessage().contains("this script always fails"));

NodesStatsResponse r = client().admin().cluster().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get();
int nodeCount = r.getNodes().size();
for (int k = 0; k < nodeCount; k++) {
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
for (IngestStats.ProcessorStat st : stats) {
assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
}
}
}

public void testScriptDisabled() throws Exception {
String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10);
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (evaluate(ingestDocument)) {
final boolean matches;
try {
matches = evaluate(ingestDocument);
} catch (Exception e) {
handler.accept(null, e);
return;
}

if (matches) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public interface Processor {
* otherwise just overwrite {@link #execute(IngestDocument)}.
*/
default void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
final IngestDocument result;
try {
IngestDocument result = execute(ingestDocument);
handler.accept(result, null);
result = execute(ingestDocument);
} catch (Exception e) {
handler.accept(null, e);
return;
}
handler.accept(result, null);
}

/**
Expand Down

0 comments on commit bb4fc3c

Please sign in to comment.