Skip to content

Commit

Permalink
Standardize error code when bulk body is invalid (elastic#114869)
Browse files Browse the repository at this point in the history
Currently the incremental and non-incremental bulk variations will
return different error codes when the json body provided is invalid.
This commit ensures both version return status code 400. Additionally,
this renames the incremental rest tests to bulk tests and ensures that
all tests work with both bulk api versions. We set these tests to
randomize which version of the api we test each run.
  • Loading branch information
Tim-Brooks committed Oct 16, 2024
1 parent 575cd69 commit a9374a9
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 25 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/114869.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114869
summary: Standardize error code when bulk body is invalid
area: CRUD
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.http;

import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -19,24 +21,30 @@
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
public class BulkRestIT extends HttpSmokeTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true)
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), seventyFivePercentOfTheTime())
.build();
}

private static boolean seventyFivePercentOfTheTime() {
return (randomBoolean() && randomBoolean()) == false;
}

public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException {
Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty");
Response response = getRestClient().performRequest(request);
Expand All @@ -51,6 +59,26 @@ public void testBulkMissingBody() throws IOException {
assertThat(responseException.getMessage(), containsString("request body is required"));
}

public void testBulkInvalidIndexNameString() throws IOException {
Request request = new Request("POST", "/_bulk");

byte[] bytes1 = "{\"create\":{\"_index\":\"".getBytes(StandardCharsets.UTF_8);
byte[] bytes2 = new byte[] { (byte) 0xfe, (byte) 0xfe, (byte) 0xff, (byte) 0xff };
byte[] bytes3 = "\",\"_id\":\"1\"}}\n{\"field\":1}\n\r\n".getBytes(StandardCharsets.UTF_8);
byte[] bulkBody = new byte[bytes1.length + bytes2.length + bytes3.length];
System.arraycopy(bytes1, 0, bulkBody, 0, bytes1.length);
System.arraycopy(bytes2, 0, bulkBody, bytes1.length, bytes2.length);
System.arraycopy(bytes3, 0, bulkBody, bytes1.length + bytes2.length, bytes3.length);

request.setEntity(new ByteArrayEntity(bulkBody, ContentType.APPLICATION_JSON));

ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(BAD_REQUEST.getStatus()));
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
assertThat(responseException.getMessage(), containsString("json_parse_exception"));
assertThat(responseException.getMessage(), containsString("Invalid UTF-8"));
}

public void testBulkRequestBodyImproperlyTerminated() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
// missing final line of the bulk body. cannot process
Expand All @@ -61,10 +89,10 @@ public void testBulkRequestBodyImproperlyTerminated() throws IOException {
);
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
}

public void testIncrementalBulk() throws IOException {
public void testBulkRequest() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
Expand All @@ -81,7 +109,6 @@ public void testIncrementalBulk() throws IOException {

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
Expand Down Expand Up @@ -113,7 +140,6 @@ public void testBulkWithIncrementalDisabled() throws IOException {

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
Expand All @@ -137,7 +163,7 @@ public void testBulkWithIncrementalDisabled() throws IOException {
}
}

public void testIncrementalMalformed() throws IOException {
public void testMalformedActionLineBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
Expand All @@ -154,7 +180,6 @@ public void testIncrementalMalformed() throws IOException {

Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":1}\n");
Expand All @@ -170,7 +195,6 @@ public void testIncrementalMalformed() throws IOException {
private static void sendLargeBulk() throws IOException {
Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,23 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);
try {
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);
} catch (Exception e) {
return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
}

return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
} else {
Expand All @@ -137,6 +141,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}
}

private static Exception parseFailureException(Exception e) {
if (e instanceof IllegalArgumentException) {
return e;
} else {
// TODO: Maybe improve in follow-up to be XContentParseException and include line number and column
return new ElasticsearchParseException("could not parse bulk request body", e);
}
}

static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {

private final boolean allowExplicitIndex;
Expand Down Expand Up @@ -229,9 +242,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo

} catch (Exception e) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(
new ElasticsearchParseException("could not parse bulk request body", e)
);
new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
return;
}
}
Expand Down

0 comments on commit a9374a9

Please sign in to comment.