Skip to content

Commit

Permalink
Fix IngestServiceTests.testBulkRequestExecutionWithFailures
Browse files Browse the repository at this point in the history
The test would previously fail if the randomness led to only a single
indexing request being included in the bulk payload. This change
guarantees multiple indexing requests in order to ensure the batch logic
kicks in.

Also replace some unneeded mocks with real classes.

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Jul 23, 2024
1 parent 3497081 commit eb43971
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.hamcrest.MatcherAssert;
import org.opensearch.OpenSearchParseException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.Version;
Expand Down Expand Up @@ -104,8 +105,10 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -1106,27 +1109,23 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception {
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}

public void testBulkRequestExecutionWithFailures() throws Exception {
public void testBulkRequestExecutionWithFailures() {
BulkRequest bulkRequest = new BulkRequest();
String pipelineId = "_id";

int numRequest = scaledRandomIntBetween(8, 64);
int numIndexRequests = 0;
for (int i = 0; i < numRequest; i++) {
DocWriteRequest request;
int numIndexRequests = scaledRandomIntBetween(4, 32);
for (int i = 0; i < numIndexRequests; i++) {
IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
bulkRequest.add(indexRequest);
}
int numOtherRequests = scaledRandomIntBetween(4, 32);
for (int i = 0; i < numOtherRequests; i++) {
if (randomBoolean()) {
if (randomBoolean()) {
request = new DeleteRequest("_index", "_id");
} else {
request = new UpdateRequest("_index", "_id");
}
bulkRequest.add(new DeleteRequest("_index", "_id"));
} else {
IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
request = indexRequest;
numIndexRequests++;
bulkRequest.add(new UpdateRequest("_index", "_id"));
}
bulkRequest.add(request);
}

CompoundProcessor processor = mock(CompoundProcessor.class);
Expand Down Expand Up @@ -1155,23 +1154,22 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

@SuppressWarnings("unchecked")
BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
final Map<Integer, Exception> errorHandler = new HashMap<>();
final Map<Thread, Exception> completionHandler = new HashMap<>();
ingestService.executeBulkRequest(
numRequest,
numIndexRequests + numOtherRequests,
bulkRequest.requests(),
requestItemErrorHandler,
completionHandler,
errorHandler::put,
completionHandler::put,
indexReq -> {},
Names.WRITE,
bulkRequest
);

verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(o -> o.getCause().equals(error)));
MatcherAssert.assertThat(errorHandler.entrySet(), hasSize(numIndexRequests));
errorHandler.values().forEach(e -> assertEquals(e.getCause(), error));

verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
MatcherAssert.assertThat(completionHandler.keySet(), contains(Thread.currentThread()));
}

public void testBulkRequestExecution() throws Exception {
Expand Down

0 comments on commit eb43971

Please sign in to comment.