Skip to content

Commit

Permalink
[Transform] Report _preview warning in the face of pipeline failure. (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Jan 4, 2022
1 parent 9790afc commit 029046b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -28,7 +30,9 @@
import java.util.Set;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -1013,6 +1017,74 @@ public void testPreviewTransformWithPipeline() throws Exception {
});
}

@SuppressWarnings("unchecked")
public void testPreviewTransformWithPipelineScript() throws Exception {
String pipelineId = "my-preview-pivot-pipeline-script";
Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
pipelineRequest.setJsonEntity("""
{
"description": "my pivot preview pipeline",
"processors": [
{
"script": {
"lang": "painless",
"source": "ctx._id = ctx['non']['existing'];"
}
}
]
}
""");
client().performRequest(pipelineRequest);

setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
final Request createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview", null);
createPreviewRequest.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(WarningsHandler.PERMISSIVE));

String config = """
{
"source": {
"index": "%s"
},
"dest": {
"pipeline": "%s"
},
"pivot": {
"group_by": {
"user.id": {
"terms": {
"field": "user_id"
}
},
"by_day": {
"date_histogram": {
"fixed_interval": "1d",
"field": "timestamp"
}
}
},
"aggregations": {
"user.avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""".formatted(REVIEWS_INDEX_NAME, pipelineId);
createPreviewRequest.setJsonEntity(config);

Response createPreviewResponse = client().performRequest(createPreviewRequest);
Map<String, Object> previewTransformResponse = entityAsMap(createPreviewResponse);
List<Map<String, Object>> preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
// Pipeline failed for all the docs so the preview is empty
assertThat(preview, is(empty()));
assertThat(createPreviewResponse.getWarnings(), hasSize(1));
assertThat(
createPreviewResponse.getWarnings().get(0),
allOf(containsString("Pipeline returned 100 errors, first error:"), containsString("type=script_exception"))
);
}

public void testPivotWithMaxOnDateField() throws Exception {
String transformId = "simple_date_histogram_pivot_with_max_time";
String transformIndex = "pivot_reviews_via_date_histogram_with_max_time";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,24 @@ private void getPreview(

ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(simulatePipelineResponse -> {
List<Map<String, Object>> docs = new ArrayList<>(simulatePipelineResponse.getResults().size());
List<Map<String, Object>> errors = new ArrayList<>();
for (var simulateDocumentResult : simulatePipelineResponse.getResults()) {
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
Map<String, Object> tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();
docs.add((Map<String, Object>) XContentMapValues.extractValue("doc._source", tempMap));
Map<String, Object> doc = (Map<String, Object>) XContentMapValues.extractValue("doc._source", tempMap);
if (doc != null) {
docs.add(doc);
}
Map<String, Object> error = (Map<String, Object>) XContentMapValues.extractValue("error", tempMap);
if (error != null) {
errors.add(error);
}
}
}
if (errors.isEmpty() == false) {
HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.get(0));
}
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
mappings.get(),
transformId,
Expand Down

0 comments on commit 029046b

Please sign in to comment.