Skip to content

Commit

Permalink
Fix merge
Browse files Browse the repository at this point in the history
carlosdelest committed Feb 7, 2024
1 parent 0ad9496 commit e5ee956
Showing 1 changed file with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -57,6 +57,8 @@
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.inference.InferenceServiceRegistry;
import org.elasticsearch.inference.ModelRegistry;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
@@ -98,6 +100,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexingPressure indexingPressure;
private final SystemIndices systemIndices;
private final OriginSettingClient rolloverClient;
private final InferenceServiceRegistry inferenceServiceRegistry;
private final ModelRegistry modelRegistry;

@Inject
public TransportBulkAction(
@@ -110,7 +114,9 @@ public TransportBulkAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexingPressure indexingPressure,
SystemIndices systemIndices
SystemIndices systemIndices,
InferenceServiceRegistry inferenceServiceRegistry,
ModelRegistry modelRegistry
) {
this(
threadPool,
@@ -123,7 +129,9 @@ public TransportBulkAction(
indexNameExpressionResolver,
indexingPressure,
systemIndices,
System::nanoTime
System::nanoTime,
inferenceServiceRegistry,
modelRegistry
);
}

@@ -138,7 +146,9 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
IndexingPressure indexingPressure,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
LongSupplier relativeTimeProvider,
InferenceServiceRegistry inferenceServiceRegistry,
ModelRegistry modelRegistry
) {
this(
BulkAction.INSTANCE,
@@ -153,7 +163,9 @@ public TransportBulkAction(
indexNameExpressionResolver,
indexingPressure,
systemIndices,
relativeTimeProvider
relativeTimeProvider,
inferenceServiceRegistry,
modelRegistry
);
}

@@ -170,7 +182,9 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
IndexingPressure indexingPressure,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
LongSupplier relativeTimeProvider,
InferenceServiceRegistry inferenceServiceRegistry,
ModelRegistry modelRegistry
) {
super(bulkAction.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
Objects.requireNonNull(relativeTimeProvider);
@@ -185,6 +199,8 @@ public TransportBulkAction(
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressure = indexingPressure;
this.systemIndices = systemIndices;
this.inferenceServiceRegistry = inferenceServiceRegistry;
this.modelRegistry = modelRegistry;
clusterService.addStateApplier(this.ingestForwarder);
this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
}
@@ -406,13 +422,13 @@ protected void createMissingIndicesAndIndexData(
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
// Optimizing when there are no prerequisite actions
if (indicesToAutoCreate.isEmpty() && dataStreamsToBeRolledOver.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
executeBulk(task, bulkRequest, startTime, executorName, responses, indicesThatCannotBeCreated, listener);
return;
}
Runnable executeBulkRunnable = () -> threadPool.executor(executorName).execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
executeBulk(task, bulkRequest, startTime, executorName, responses, indicesThatCannotBeCreated, listener);
}
});
try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) {
@@ -614,10 +630,10 @@ void executeBulk(
Task task,
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
String executorName,
AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
ActionListener<BulkResponse> listener
) {
new BulkOperation(
task,
@@ -631,6 +647,8 @@ void executeBulk(
indexNameExpressionResolver,
relativeTimeProvider,
startTimeNanos,
modelRegistry,
inferenceServiceRegistry,
listener
).run();
}

0 comments on commit e5ee956

Please sign in to comment.