Skip to content

Commit

Permalink
Renamed some methods even though that implies tests get impacted
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Oct 24, 2023
1 parent 8305723 commit 2adde5d
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 115 deletions.
83 changes: 0 additions & 83 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
Expand All @@ -45,7 +44,6 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -719,87 +717,6 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

public void executeBulkRequest(
final int numberOfActionRequests,
final Iterable<DocWriteRequest<?>> actionRequests,
final IntConsumer onDropped,
final BiConsumer<Integer, Exception> onFailure,
final BiConsumer<Thread, Exception> onCompletion,
final String executorName
) {
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";

threadPool.executor(executorName).execute(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
onCompletion.accept(null, e);
}

@Override
protected void doRun() {
final Thread originalThread = Thread.currentThread();
try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) {
int slot = 0;
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
processIndexRequest(indexRequest, slot, refs, onDropped, onFailure);
}
slot++;
}
}
}
});
}

private void executePipelines(
DocWriteRequest<?> actionRequest,
final int slot,
final Releasable ref,
IndexRequest indexRequest,
PipelineIterator pipelines,
IntConsumer onDropped,
BiConsumer<Integer, Exception> onFailure
) {
// start the stopwatch and acquire a ref to indicate that we're working on this document
final long startTimeInNanos = System.nanoTime();
totalMetrics.preIngest();
// the document listener gives us three-way logic: a document can fail processing (1), or it can
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).
final ActionListener<Boolean> documentListener = ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Boolean kept) {
assert kept != null;
if (kept == false) {
onDropped.accept(slot);
}
}

@Override
public void onFailure(Exception e) {
totalMetrics.ingestFailed();
onFailure.accept(slot, e);
}
}, () -> {
// regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate
// that we're finished with this document
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
ref.close();
});
DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get();

IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver);

executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
indexRequest.setPipelinesHaveRun();

assert actionRequest.index() != null;
documentParsingObserver.setIndexName(actionRequest.index());
documentParsingObserver.close();
}

/**
* Returns the pipelines of the request, and updates the request so that it no longer references
* any pipelines (both the default and final pipeline are set to the noop pipeline).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ public void testIngestLocal() throws Exception {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
Expand Down Expand Up @@ -325,7 +326,8 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(1),
bulkDocsItr.capture(),
any(),
Expand Down Expand Up @@ -371,7 +373,8 @@ public void testIngestSystemLocal() throws Exception {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
Expand Down Expand Up @@ -408,7 +411,7 @@ public void testIngestForward() throws Exception {
ActionTestUtils.execute(action, null, bulkRequest, listener);

// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
verify(ingestService, never()).processBulkRequest(any(), anyInt(), any(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
Expand Down Expand Up @@ -448,7 +451,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener);

// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
verify(ingestService, never()).processBulkRequest(any(), anyInt(), any(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
Expand Down Expand Up @@ -528,7 +531,8 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
Expand Down Expand Up @@ -576,7 +580,8 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
assertFalse(action.indexCreated); // no index yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(1),
bulkDocsItr.capture(),
any(),
Expand Down Expand Up @@ -670,7 +675,8 @@ public void testFindDefaultPipelineFromTemplateMatch() {
);

assertEquals("pipeline2", indexRequest.getPipeline());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(1),
bulkDocsItr.capture(),
any(),
Expand Down Expand Up @@ -714,7 +720,8 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
);

assertEquals("pipeline2", indexRequest.getPipeline());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(1),
bulkDocsItr.capture(),
any(),
Expand All @@ -741,7 +748,8 @@ public void testIngestCallbackExceptionHandled() throws Exception {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(bulkRequest.numberOfActions()),
bulkDocsItr.capture(),
any(),
Expand Down Expand Up @@ -778,7 +786,8 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(ingestService).executeBulkRequest(
verify(ingestService).processBulkRequest(
any(),
eq(1),
bulkDocsItr.capture(),
any(),
Expand Down
Loading

0 comments on commit 2adde5d

Please sign in to comment.