From f58ed21720d186a3af5ee9eabc1e96a83a0aa6ca Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 10 Aug 2018 10:15:01 +0200 Subject: [PATCH] Refactor TransportShardBulkAction to better support retries (#31821) Processing bulk request goes item by item. Sometimes during processing, we need to stop execution and wait for a new mapping update to be processed by the node. This is currently achieved by throwing a `RetryOnPrimaryException`, which is caught higher up. When the exception is caught, we wait for the next cluster state to arrive and process the request again. Sadly this is a problem because all operations that were already done until the mapping change was required are applied again and get new sequence numbers. This in turn means that the previously issued sequence numbers are never replicated to the replicas. That causes the local checkpoint of those shards to be stuck and with it all the seq# based infrastructure. This commit refactors how we deal with retries with the goal of removing `RetryOnPrimaryException` and `RetryOnReplicaException` (not done yet). It achieves so by introducing a class `BulkPrimaryExecutionContext` that is used the capture the execution state and allows continuing from where the execution stopped. The class also formalizes the steps each item has to go through: 1) A translation phase for updates 2) Execution phase (always index/delete) 3) Waiting for a mapping update to come in, if needed 4) Requires a retry (for updates and cases where the mapping are still not available after the put mapping call returns) 5) A finalization phase which allows updates to the index/delete result to an update result. --- .../action/bulk/BulkItemResultHolder.java | 48 -- .../bulk/BulkPrimaryExecutionContext.java | 345 ++++++++ .../action/bulk/TransportShardBulkAction.java | 512 +++++------- .../replication/TransportWriteAction.java | 2 +- .../BulkPrimaryExecutionContextTests.java | 158 ++++ .../action/bulk/BulkWithUpdatesIT.java | 14 +- .../bulk/TransportShardBulkActionTests.java | 778 +++++++++--------- .../elasticsearch/document/ShardInfoIT.java | 1 + .../ESIndexLevelReplicationTestCase.java | 3 +- 9 files changed, 1113 insertions(+), 748 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java create mode 100644 server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java create mode 100644 server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java deleted file mode 100644 index 3e7ee41b914b7..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.bulk; - -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.VersionConflictEngineException; - -/** - * A struct-like holder for a bulk items reponse, result, and the resulting - * replica operation to be executed. - */ -class BulkItemResultHolder { - public final @Nullable DocWriteResponse response; - public final @Nullable Engine.Result operationResult; - public final BulkItemRequest replicaRequest; - - BulkItemResultHolder(@Nullable DocWriteResponse response, - @Nullable Engine.Result operationResult, - BulkItemRequest replicaRequest) { - this.response = response; - this.operationResult = operationResult; - this.replicaRequest = replicaRequest; - } - - public boolean isVersionConflict() { - return operationResult == null ? false : - operationResult.getFailure() instanceof VersionConflictEngineException; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java new file mode 100644 index 0000000000000..85ce28d2d52db --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -0,0 +1,345 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; + +import java.util.Arrays; + +/** + * This is a utility class that holds the per request state needed to perform bulk operations on the primary. + * More specifically, it maintains an index to the current executing bulk item, which allows execution + * to stop and wait for external events such as mapping updates. + */ +class BulkPrimaryExecutionContext { + + enum ItemProcessingState { + /** Item execution is ready to start, no operations have been performed yet */ + INITIAL, + /** + * The incoming request has been translated to a request that can be executed on the shard. + * This is used to convert update requests to a fully specified index or delete requests. + */ + TRANSLATED, + /** + * the request can not execute with the current mapping and should wait for a new mapping + * to arrive from the master. A mapping request for the needed changes has already been + * submitted + */ + WAIT_FOR_MAPPING_UPDATE, + /** + * The request should be executed again, but there is no need to wait for an external event. + * This is needed to support retry on conflicts during updates. + */ + IMMEDIATE_RETRY, + /** The request has been executed on the primary shard (successfully or not) */ + EXECUTED, + /** + * No further handling of current request is needed. The result has been converted to a user response + * and execution can continue to the next item (if available). + */ + COMPLETED + } + + private final BulkShardRequest request; + private final IndexShard primary; + private Translog.Location locationToSync = null; + private int currentIndex = -1; + + private ItemProcessingState currentItemState; + private DocWriteRequest requestToExecute; + private BulkItemResponse executionResult; + private int retryCounter; + + + BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) { + this.request = request; + this.primary = primary; + advance(); + } + + + private int findNextNonAborted(int startIndex) { + final int length = request.items().length; + while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) { + startIndex++; + } + return startIndex; + } + + private static boolean isAborted(BulkItemResponse response) { + return response != null && response.isFailed() && response.getFailure().isAborted(); + } + + /** move to the next item to execute */ + private void advance() { + assert currentItemState == ItemProcessingState.COMPLETED || currentIndex == -1 : + "moving to next but current item wasn't completed (state: " + currentItemState + ")"; + currentItemState = ItemProcessingState.INITIAL; + currentIndex = findNextNonAborted(currentIndex + 1); + retryCounter = 0; + requestToExecute = null; + executionResult = null; + assert assertInvariants(ItemProcessingState.INITIAL); + } + + /** gets the current, untranslated item request */ + public DocWriteRequest getCurrent() { + return getCurrentItem().request(); + } + + public BulkShardRequest getBulkShardRequest() { + return request; + } + + /** returns the result of the request that has been executed on the shard */ + public BulkItemResponse getExecutionResult() { + assert assertInvariants(ItemProcessingState.EXECUTED); + return executionResult; + } + + /** returns the number of times the current operation has been retried */ + public int getRetryCounter() { + return retryCounter; + } + + /** returns true if the current request has been executed on the primary */ + public boolean isOperationExecuted() { + return currentItemState == ItemProcessingState.EXECUTED; + } + + /** returns true if the request needs to wait for a mapping update to arrive from the master */ + public boolean requiresWaitingForMappingUpdate() { + return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE; + } + + /** returns true if the current request should be retried without waiting for an external event */ + public boolean requiresImmediateRetry() { + return currentItemState == ItemProcessingState.IMMEDIATE_RETRY; + } + + /** + * returns true if the current request has been completed and it's result translated to a user + * facing response + */ + public boolean isCompleted() { + return currentItemState == ItemProcessingState.COMPLETED; + } + + /** + * returns true if the current request is in INITIAL state + */ + public boolean isInitial() { + return currentItemState == ItemProcessingState.INITIAL; + } + + /** + * returns true if {@link #advance()} has moved the current item beyond the + * end of the {@link BulkShardRequest#items()} array. + */ + public boolean hasMoreOperationsToExecute() { + return currentIndex < request.items().length; + } + + + /** returns the name of the index the current request used */ + public String getConcreteIndex() { + return getCurrentItem().index(); + } + + /** returns any primary response that was set by a previous primary */ + public BulkItemResponse getPreviousPrimaryResponse() { + return getCurrentItem().getPrimaryResponse(); + } + + /** returns a translog location that is needed to be synced in order to persist all operations executed so far */ + public Translog.Location getLocationToSync() { + assert hasMoreOperationsToExecute() == false; + // we always get to the end of the list by using advance, which in turn sets the state to INITIAL + assert assertInvariants(ItemProcessingState.INITIAL); + return locationToSync; + } + + private BulkItemRequest getCurrentItem() { + return request.items()[currentIndex]; + } + + /** returns the primary shard */ + public IndexShard getPrimary() { + return primary; + } + + /** + * sets the request that should actually be executed on the primary. This can be different then the request + * received from the user (specifically, an update request is translated to an indexing or delete request). + */ + public void setRequestToExecute(DocWriteRequest writeRequest) { + assert assertInvariants(ItemProcessingState.INITIAL); + requestToExecute = writeRequest; + currentItemState = ItemProcessingState.TRANSLATED; + assert assertInvariants(ItemProcessingState.TRANSLATED); + } + + /** returns the request that should be executed on the shard. */ + public > T getRequestToExecute() { + assert assertInvariants(ItemProcessingState.TRANSLATED); + return (T) requestToExecute; + } + + /** indicates that the current operation can not be completed and needs to wait for a new mapping from the master */ + public void markAsRequiringMappingUpdate() { + assert assertInvariants(ItemProcessingState.TRANSLATED); + currentItemState = ItemProcessingState.WAIT_FOR_MAPPING_UPDATE; + requestToExecute = null; + assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE); + } + + /** resets the current item state, prepare for a new execution */ + public void resetForExecutionForRetry() { + assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED); + currentItemState = ItemProcessingState.INITIAL; + requestToExecute = null; + executionResult = null; + assertInvariants(ItemProcessingState.INITIAL); + } + + /** completes the operation without doing anything on the primary */ + public void markOperationAsNoOp(DocWriteResponse response) { + assertInvariants(ItemProcessingState.INITIAL); + executionResult = new BulkItemResponse(getCurrentItem().id(), getCurrentItem().request().opType(), response); + currentItemState = ItemProcessingState.EXECUTED; + assertInvariants(ItemProcessingState.EXECUTED); + } + + /** indicates that the operation needs to be failed as the required mapping didn't arrive in time */ + public void failOnMappingUpdate(Exception cause) { + assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE); + currentItemState = ItemProcessingState.EXECUTED; + final DocWriteRequest docWriteRequest = getCurrentItem().request(); + executionResult = new BulkItemResponse(getCurrentItem().id(), docWriteRequest.opType(), + // Make sure to use getCurrentItem().index() here, if you use docWriteRequest.index() it will use the + // concrete index instead of an alias if used! + new BulkItemResponse.Failure(getCurrentItem().index(), docWriteRequest.type(), docWriteRequest.id(), cause)); + markAsCompleted(executionResult); + } + + /** the current operation has been executed on the primary with the specified result */ + public void markOperationAsExecuted(Engine.Result result) { + assertInvariants(ItemProcessingState.TRANSLATED); + final BulkItemRequest current = getCurrentItem(); + DocWriteRequest docWriteRequest = getRequestToExecute(); + switch (result.getResultType()) { + case SUCCESS: + final DocWriteResponse response; + if (result.getOperationType() == Engine.Operation.TYPE.INDEX) { + Engine.IndexResult indexResult = (Engine.IndexResult) result; + response = new IndexResponse(primary.shardId(), requestToExecute.type(), requestToExecute.id(), + result.getSeqNo(), result.getTerm(), indexResult.getVersion(), indexResult.isCreated()); + } else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) { + Engine.DeleteResult deleteResult = (Engine.DeleteResult) result; + response = new DeleteResponse(primary.shardId(), requestToExecute.type(), requestToExecute.id(), + deleteResult.getSeqNo(), result.getTerm(), deleteResult.getVersion(), deleteResult.isFound()); + + } else { + throw new AssertionError("unknown result type :" + result.getResultType()); + } + executionResult = new BulkItemResponse(current.id(), current.request().opType(), response); + // set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though. + executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo()); + locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation()); + break; + case FAILURE: + executionResult = new BulkItemResponse(current.id(), docWriteRequest.opType(), + // Make sure to use request.index() here, if you + // use docWriteRequest.index() it will use the + // concrete index instead of an alias if used! + new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), + result.getFailure(), result.getSeqNo())); + break; + default: + throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType()); + } + currentItemState = ItemProcessingState.EXECUTED; + } + + /** finishes the execution of the current request, with the response that should be returned to the user */ + public void markAsCompleted(BulkItemResponse translatedResponse) { + assertInvariants(ItemProcessingState.EXECUTED); + assert executionResult == null || translatedResponse.getItemId() == executionResult.getItemId(); + assert translatedResponse.getItemId() == getCurrentItem().id(); + + if (translatedResponse.isFailed() == false && requestToExecute != getCurrent()) { + request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute); + } + getCurrentItem().setPrimaryResponse(translatedResponse); + currentItemState = ItemProcessingState.COMPLETED; + advance(); + } + + /** builds the bulk shard response to return to the user */ + public BulkShardResponse buildShardResponse() { + assert hasMoreOperationsToExecute() == false; + return new BulkShardResponse(request.shardId(), + Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new)); + } + + private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { + assert Arrays.asList(expectedCurrentState).contains(currentItemState): + "expected current state [" + currentItemState + "] to be one of " + Arrays.toString(expectedCurrentState); + assert currentIndex >= 0 : currentIndex; + assert retryCounter >= 0 : retryCounter; + switch (currentItemState) { + case INITIAL: + assert requestToExecute == null : requestToExecute; + assert executionResult == null : executionResult; + break; + case TRANSLATED: + assert requestToExecute != null; + assert executionResult == null : executionResult; + break; + case WAIT_FOR_MAPPING_UPDATE: + assert requestToExecute == null; + assert executionResult == null : executionResult; + break; + case IMMEDIATE_RETRY: + assert requestToExecute != null; + assert executionResult == null : executionResult; + break; + case EXECUTED: + // requestToExecute can be null if the update ended up as NOOP + assert executionResult != null; + break; + case COMPLETED: + assert requestToExecute != null; + assert executionResult != null; + assert getCurrentItem().getPrimaryResponse() != null; + break; + } + return true; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index ed99c739afb43..9c134ba4012da 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -29,30 +29,34 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; -import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -60,12 +64,14 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -108,174 +114,167 @@ protected boolean resolveIndex() { } @Override - public WritePrimaryResult shardOperationOnPrimary( - BulkShardRequest request, IndexShard primary) throws Exception { - return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); + protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) + throws Exception { + ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); + CheckedRunnable waitForMappingUpdate = () -> { + PlainActionFuture waitingFuture = new PlainActionFuture<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + waitingFuture.onResponse(null); + } + + @Override + public void onClusterServiceClose() { + waitingFuture.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + waitingFuture.onFailure( + new MapperException("timed out while waiting for a dynamic mapping update")); + } + }); + waitingFuture.get(); + }; + return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, + new ConcreteMappingUpdatePerformer(), waitForMappingUpdate); } public static WritePrimaryResult performOnPrimary( - BulkShardRequest request, - IndexShard primary, - UpdateHelper updateHelper, - LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater) throws Exception { - final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); - Translog.Location location = null; - for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { - if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) { - location = executeBulkItemRequest(metaData, primary, request, location, requestIndex, - updateHelper, nowInMillisSupplier, mappingUpdater); - } - } - BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; - BulkItemRequest[] items = request.items(); - for (int i = 0; i < items.length; i++) { - responses[i] = items[i].getPrimaryResponse(); - } - BulkShardResponse response = new BulkShardResponse(request.shardId(), responses); - return new WritePrimaryResult<>(request, response, location, null, primary, logger); + BulkShardRequest request, + IndexShard primary, + UpdateHelper updateHelper, + LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, + CheckedRunnable waitForMappingUpdate) throws Exception { + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); + return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); } - private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest, - final BulkItemRequest bulkItemRequest, - final IndexShard primary, - final MappingUpdatePerformer mappingUpdater) throws Exception { - Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); - switch (indexResult.getResultType()) { - case SUCCESS: - IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), - indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated()); - return new BulkItemResultHolder(response, indexResult, bulkItemRequest); - case FAILURE: - return new BulkItemResultHolder(null, indexResult, bulkItemRequest); - default: - throw new AssertionError("unknown result type for " + indexRequest + ": " + indexResult.getResultType()); + private static WritePrimaryResult performOnPrimary( + BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { + + while (context.hasMoreOperationsToExecute()) { + executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); + assert context.isInitial(); // either completed and moved to next or reset } + return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), + null, context.getPrimary(), logger); } - private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest deleteRequest, - final BulkItemRequest bulkItemRequest, - final IndexShard primary, - final MappingUpdatePerformer mappingUpdater) throws Exception { - Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater); - switch (deleteResult.getResultType()) { - case SUCCESS: - DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(), - deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound()); - return new BulkItemResultHolder(response, deleteResult, bulkItemRequest); - case FAILURE: - return new BulkItemResultHolder(null, deleteResult, bulkItemRequest); - case MAPPING_UPDATE_REQUIRED: - throw new AssertionError("delete operation leaked a mapping update " + deleteRequest); - default: - throw new AssertionError("unknown result type for " + deleteRequest + ": " + deleteResult.getResultType()); + /** Executes bulk item requests and handles request execution exceptions */ + static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) + throws Exception { + final DocWriteRequest.OpType opType = context.getCurrent().opType(); + + final UpdateHelper.Result updateResult; + if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + try { + updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); + } catch (Exception failure) { + // we may fail translating a update to index or delete operation + // we use index result to communicate failure while translating update request + final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); + context.setRequestToExecute(updateRequest); + context.markOperationAsExecuted(result); + context.markAsCompleted(context.getExecutionResult()); + return; + } + // execute translated update request + switch (updateResult.getResponseResult()) { + case CREATED: + case UPDATED: + IndexRequest indexRequest = updateResult.action(); + IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData(); + MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); + indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex()); + context.setRequestToExecute(indexRequest); + break; + case DELETED: + context.setRequestToExecute(updateResult.action()); + break; + case NOOP: + context.markOperationAsNoOp(updateResult.action()); + context.markAsCompleted(context.getExecutionResult()); + return; + default: + throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); + } + } else { + context.setRequestToExecute(context.getCurrent()); + updateResult = null; } - } - static Translog.Location calculateTranslogLocation(final Translog.Location originalLocation, - final BulkItemResultHolder bulkItemResult) { - final Engine.Result operationResult = bulkItemResult.operationResult; - if (operationResult != null && operationResult.getResultType() == Engine.Result.Type.SUCCESS) { - return locationToSync(originalLocation, operationResult.getTranslogLocation()); + assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state + + if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { + executeDeleteRequestOnPrimary(context, mappingUpdater); } else { - return originalLocation; + executeIndexRequestOnPrimary(context, mappingUpdater); } + + if (context.requiresWaitingForMappingUpdate()) { + try { + waitForMappingUpdate.run(); + context.resetForExecutionForRetry(); + } catch (Exception e) { + context.failOnMappingUpdate(e); + } + return; + } + + assert context.isOperationExecuted(); + + if (opType == DocWriteRequest.OpType.UPDATE && + context.getExecutionResult().isFailed() && + isConflictException(context.getExecutionResult().getFailure().getCause())) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + if (context.getRetryCounter() < updateRequest.retryOnConflict()) { + context.resetForExecutionForRetry(); + return; + } + } + + finalizePrimaryOperationOnCompletion(context, opType, updateResult); } - // Visible for unit testing - /** - * Creates a BulkItemResponse for the primary operation and returns it. If no bulk response is - * needed (because one already exists and the operation failed), then return null. - */ - static BulkItemResponse createPrimaryResponse(BulkItemResultHolder bulkItemResult, - final DocWriteRequest.OpType opType, - BulkShardRequest request) { - final Engine.Result operationResult = bulkItemResult.operationResult; - final DocWriteResponse response = bulkItemResult.response; - final BulkItemRequest replicaRequest = bulkItemResult.replicaRequest; - - if (operationResult == null) { // in case of noop update operation - assert response.getResult() == DocWriteResponse.Result.NOOP : "only noop updates can have a null operation"; - return new BulkItemResponse(replicaRequest.id(), opType, response); - - } else if (operationResult.getResultType() == Engine.Result.Type.SUCCESS) { - BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response); - // set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though. - primaryResponse.getResponse().setShardInfo(new ShardInfo()); - return primaryResponse; - - } else if (operationResult.getResultType() == Engine.Result.Type.FAILURE) { - DocWriteRequest docWriteRequest = replicaRequest.request(); - Exception failure = operationResult.getFailure(); - if (isConflictException(failure)) { + private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionContext context, DocWriteRequest.OpType opType, + UpdateHelper.Result updateResult) { + final BulkItemResponse executionResult = context.getExecutionResult(); + if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + context.markAsCompleted( + processUpdateResponse(updateRequest, context.getConcreteIndex(), executionResult, updateResult)); + } else if (executionResult.isFailed()) { + final Exception failure = executionResult.getFailure().getCause(); + final DocWriteRequest docWriteRequest = context.getCurrent(); + if (TransportShardBulkAction.isConflictException(failure)) { logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); + context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); } else { logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); + context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); } + final BulkItemResponse primaryResponse; // if it's a conflict failure, and we already executed the request on a primary (and we execute it // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) // then just use the response we got from the failed execution - if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) { - return new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(), - // Make sure to use request.index() here, if you - // use docWriteRequest.index() it will use the - // concrete index instead of an alias if used! - new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), - failure, operationResult.getSeqNo())); + if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) { + primaryResponse = context.getPreviousPrimaryResponse(); } else { - assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response"; - return null; + primaryResponse = executionResult; } + context.markAsCompleted(primaryResponse); } else { - throw new AssertionError("unknown result type for " + request + ": " + operationResult.getResultType()); + context.markAsCompleted(executionResult); } - } - - /** Executes bulk item requests and handles request execution exceptions */ - static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard primary, - BulkShardRequest request, Translog.Location location, - int requestIndex, UpdateHelper updateHelper, - LongSupplier nowInMillisSupplier, - final MappingUpdatePerformer mappingUpdater) throws Exception { - final DocWriteRequest itemRequest = request.items()[requestIndex].request(); - final DocWriteRequest.OpType opType = itemRequest.opType(); - final BulkItemResultHolder responseHolder; - switch (itemRequest.opType()) { - case CREATE: - case INDEX: - responseHolder = executeIndexRequest((IndexRequest) itemRequest, - request.items()[requestIndex], primary, mappingUpdater); - break; - case UPDATE: - responseHolder = executeUpdateRequest((UpdateRequest) itemRequest, primary, metaData, request, - requestIndex, updateHelper, nowInMillisSupplier, mappingUpdater); - break; - case DELETE: - responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary, mappingUpdater); - break; - default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); - } - - final BulkItemRequest replicaRequest = responseHolder.replicaRequest; - - // update the bulk item request because update request execution can mutate the bulk item request - request.items()[requestIndex] = replicaRequest; - - // Retrieve the primary response, and update the replica request with the primary's response - BulkItemResponse primaryResponse = createPrimaryResponse(responseHolder, opType, request); - if (primaryResponse != null) { - replicaRequest.setPrimaryResponse(primaryResponse); - } - - // Update the translog with the new location, if needed - return calculateTranslogLocation(location, responseHolder); - } - - private static boolean isAborted(BulkItemResponse response) { - return response != null && response.isFailed() && response.getFailure().isAborted(); + assert context.isInitial(); } private static boolean isConflictException(final Exception e) { @@ -285,150 +284,50 @@ private static boolean isConflictException(final Exception e) { /** * Creates a new bulk item result from the given requests and result of performing the update operation on the shard. */ - static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex, - final Engine.Result result, final UpdateHelper.Result translate, - final IndexShard primary, final int bulkReqId) { - assert result.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "failed result should not have a sequence number"; - - Engine.Operation.TYPE opType = result.getOperationType(); - - final UpdateResponse updateResponse; - final BulkItemRequest replicaRequest; - - // enrich update response and set translated update (index/delete) request for replica execution in bulk items - if (opType == Engine.Operation.TYPE.INDEX) { - assert result instanceof Engine.IndexResult : result.getClass(); - final IndexRequest updateIndexRequest = translate.action(); - final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(), - result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated()); - updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), - indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(), - indexResponse.getResult()); - - if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) { - final BytesReference indexSourceAsBytes = updateIndexRequest.source(); - final Tuple> sourceAndContent = - XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType()); - updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex, - indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); - } - // set translated request as replica request - replicaRequest = new BulkItemRequest(bulkReqId, updateIndexRequest); - - } else if (opType == Engine.Operation.TYPE.DELETE) { - assert result instanceof Engine.DeleteResult : result.getClass(); - final DeleteRequest updateDeleteRequest = translate.action(); - - final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(), - result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound()); + static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex, + BulkItemResponse operationResponse, + final UpdateHelper.Result translate) { + + final BulkItemResponse response; + DocWriteResponse.Result translatedResult = translate.getResponseResult(); + if (operationResponse.isFailed()) { + response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, operationResponse.getFailure()); + } else { - updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), + final UpdateResponse updateResponse; + if (translatedResult == DocWriteResponse.Result.CREATED || translatedResult == DocWriteResponse.Result.UPDATED) { + final IndexRequest updateIndexRequest = translate.action(); + final IndexResponse indexResponse = operationResponse.getResponse(); + updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), + indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), + indexResponse.getVersion(), indexResponse.getResult()); + + if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) { + final BytesReference indexSourceAsBytes = updateIndexRequest.source(); + final Tuple> sourceAndContent = + XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType()); + updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex, + indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); + } + } else if (translatedResult == DocWriteResponse.Result.DELETED) { + final DeleteResponse deleteResponse = operationResponse.getResponse(); + updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(), deleteResponse.getVersion(), deleteResponse.getResult()); - final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(), + final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null); - updateResponse.setGetResult(getResult); - // set translated request as replica request - replicaRequest = new BulkItemRequest(bulkReqId, updateDeleteRequest); - - } else { - throw new IllegalArgumentException("unknown operation type: " + opType); - } - - return new BulkItemResultHolder(updateResponse, result, replicaRequest); - } - - /** - * Executes update request once, delegating to a index or delete operation after translation. - * NOOP updates are indicated by returning a null operation in {@link BulkItemResultHolder} - */ - static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest, IndexShard primary, - IndexMetaData metaData, String concreteIndex, - UpdateHelper updateHelper, LongSupplier nowInMillis, - BulkItemRequest primaryItemRequest, int bulkReqId, - final MappingUpdatePerformer mappingUpdater) throws Exception { - final UpdateHelper.Result translate; - // translate update request - try { - translate = updateHelper.prepare(updateRequest, primary, nowInMillis); - } catch (Exception failure) { - // we may fail translating a update to index or delete operation - // we use index result to communicate failure while translating update request - final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version()); - return new BulkItemResultHolder(null, result, primaryItemRequest); - } - - final Engine.Result result; - // execute translated update request - switch (translate.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = translate.action(); - MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); - indexRequest.process(metaData.getCreationVersion(), mappingMd, concreteIndex); - result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); - break; - case DELETED: - DeleteRequest deleteRequest = translate.action(); - result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater); - break; - case NOOP: - primary.noopUpdate(updateRequest.type()); - result = null; - break; - default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); - } - - if (result == null) { - // this is a noop operation - final UpdateResponse updateResponse = translate.action(); - return new BulkItemResultHolder(updateResponse, result, primaryItemRequest); - } else if (result.getResultType() == Engine.Result.Type.FAILURE) { - // There was a result, and the result was a failure - return new BulkItemResultHolder(null, result, primaryItemRequest); - } else if (result.getResultType() == Engine.Result.Type.SUCCESS) { - // It was successful, we need to construct the response and return it - return processUpdateResponse(updateRequest, concreteIndex, result, translate, primary, bulkReqId); - } else { - throw new AssertionError("unknown result type for " + updateRequest + ": " + result.getResultType()); - } - } - - /** - * Executes update request, delegating to a index or delete operation after translation, - * handles retries on version conflict and constructs update response - * NOOP updates are indicated by returning a null operation - * in {@link BulkItemResultHolder} - */ - private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary, - IndexMetaData metaData, BulkShardRequest request, - int requestIndex, UpdateHelper updateHelper, - LongSupplier nowInMillis, - final MappingUpdatePerformer mappingUpdater) throws Exception { - BulkItemRequest primaryItemRequest = request.items()[requestIndex]; - assert primaryItemRequest.request() == updateRequest - : "expected bulk item request to contain the original update request, got: " + - primaryItemRequest.request() + " and " + updateRequest; - - BulkItemResultHolder holder = null; - // There must be at least one attempt - int maxAttempts = Math.max(1, updateRequest.retryOnConflict()); - for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) { - - holder = executeUpdateRequestOnce(updateRequest, primary, metaData, request.index(), updateHelper, - nowInMillis, primaryItemRequest, request.items()[requestIndex].id(), mappingUpdater); - - // It was either a successful request, or it was a non-conflict failure - if (holder.isVersionConflict() == false) { - return holder; + updateResponse.setGetResult(getResult); + } else { + throw new IllegalArgumentException("unknown operation type: " + translatedResult); } + response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, updateResponse); } - // We ran out of tries and haven't returned a valid bulk item response, so return the last one generated - return holder; + return response; } + /** Modes for executing item request on replica depending on corresponding primary execution result */ public enum ReplicaItemExecutionMode { @@ -451,6 +350,7 @@ public enum ReplicaItemExecutionMode { /** * Determines whether a bulk item request should be executed on the replica. + * * @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures * {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation * {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or @@ -461,8 +361,8 @@ static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest r assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; if (primaryResponse.isFailed()) { return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - ? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op - : ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication + ? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op + : ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication } else { // TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq# // (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether @@ -470,8 +370,8 @@ static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest r // ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for // both failures and indexing operations. return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP - ? ReplicaItemExecutionMode.NORMAL // execution successful on primary - : ReplicaItemExecutionMode.NOOP; // ignore replication + ? ReplicaItemExecutionMode.NORMAL // execution successful on primary + : ReplicaItemExecutionMode.NOOP; // ignore replication } } @@ -527,7 +427,7 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse break; case DELETE: DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest; - result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), + result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), deleteRequest.type(), deleteRequest.id()); break; default: @@ -550,56 +450,62 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse } /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ - static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, - MappingUpdatePerformer mappingUpdater) throws Exception { + private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext context, + MappingUpdatePerformer mappingUpdater) throws Exception { + final IndexRequest request = context.getRequestToExecute(); + final IndexShard primary = context.getPrimary(); final SourceToParse sourceToParse = SourceToParse.source(request.index(), request.type(), request.id(), request.source(), request.getContentType()) .routing(request.routing()); - return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(), + executeOnPrimaryWhileHandlingMappingUpdates(context, () -> primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, request.getAutoGeneratedTimestamp(), request.isRetry()), e -> primary.getFailedIndexResult(e, request.version()), - mappingUpdater); + context::markOperationAsExecuted, + mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); } - private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary, - MappingUpdatePerformer mappingUpdater) throws Exception { - return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(), + private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext context, + MappingUpdatePerformer mappingUpdater) throws Exception { + final DeleteRequest request = context.getRequestToExecute(); + final IndexShard primary = context.getPrimary(); + executeOnPrimaryWhileHandlingMappingUpdates(context, () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()), e -> primary.getFailedDeleteResult(e, request.version()), - mappingUpdater); + context::markOperationAsExecuted, + mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); } - private static T executeOnPrimaryWhileHandlingMappingUpdates(ShardId shardId, String type, - CheckedSupplier toExecute, - Function onError, - MappingUpdatePerformer mappingUpdater) + private static void executeOnPrimaryWhileHandlingMappingUpdates( + BulkPrimaryExecutionContext context, CheckedSupplier toExecute, + Function exceptionToResult, Consumer onComplete, Consumer mappingUpdater) throws IOException { T result = toExecute.get(); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { // try to update the mappings and try again. try { - mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), shardId, type); + mappingUpdater.accept(result.getRequiredMappingUpdate()); } catch (Exception e) { // failure to update the mapping should translate to a failure of specific requests. Other requests // still need to be executed and replicated. - return onError.apply(e); + onComplete.accept(exceptionToResult.apply(e)); + return; } + // TODO - we can fall back to a wait for cluster state update but I'm keeping the logic the same for now result = toExecute.get(); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { // double mapping update. We assume that the successful mapping update wasn't yet processed on the node // and retry the entire request again. - throw new ReplicationOperation.RetryOnPrimaryException(shardId, - "Dynamic mappings are not available on the node that holds the primary yet"); + context.markAsRequiringMappingUpdate(); + } else { + onComplete.accept(result); } + } else { + onComplete.accept(result); } - assert result.getFailure() instanceof ReplicationOperation.RetryOnPrimaryException == false : - "IndexShard shouldn't use RetryOnPrimaryException. got " + result.getFailure(); - return result; - } class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index ca91a32a17a3a..ae029ce3f9357 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -83,7 +83,7 @@ protected static Location syncOperationResultOrThrow(final Engine.Result operati return location; } - protected static Location locationToSync(Location current, Location next) { + public static Location locationToSync(Location current, Location next) { /* here we are moving forward in the translog with each operation. Under the hood this might * cross translog files which is ok since from the user perspective the translog is like a * tape where only the highest location needs to be fsynced in order to sync all previous diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java new file mode 100644 index 0000000000000..de7444fac0903 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java @@ -0,0 +1,158 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.TransportShardBulkActionTests.FakeDeleteResult; +import org.elasticsearch.action.bulk.TransportShardBulkActionTests.FakeIndexResult; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BulkPrimaryExecutionContextTests extends ESTestCase { + + public void testAbortedSkipped() { + BulkShardRequest shardRequest = generateRandomRequest(); + + ArrayList> nonAbortedRequests = new ArrayList<>(); + for (BulkItemRequest request : shardRequest.items()) { + if (randomBoolean()) { + request.abort("index", new ElasticsearchException("bla")); + } else { + nonAbortedRequests.add(request.request()); + } + } + + ArrayList> visitedRequests = new ArrayList<>(); + for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null); + context.hasMoreOperationsToExecute(); + ) { + visitedRequests.add(context.getCurrent()); + context.setRequestToExecute(context.getCurrent()); + // using failures prevents caring about types + context.markOperationAsExecuted(new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1)); + context.markAsCompleted(context.getExecutionResult()); + } + + assertThat(visitedRequests, equalTo(nonAbortedRequests)); + } + + private BulkShardRequest generateRandomRequest() { + BulkItemRequest[] items = new BulkItemRequest[randomInt(20)]; + for (int i = 0; i < items.length; i++) { + final DocWriteRequest request; + switch (randomFrom(DocWriteRequest.OpType.values())) { + case INDEX: + request = new IndexRequest("index", "_doc", "id_" + i); + break; + case CREATE: + request = new IndexRequest("index", "_doc", "id_" + i).create(true); + break; + case UPDATE: + request = new UpdateRequest("index", "_doc", "id_" + i); + break; + case DELETE: + request = new DeleteRequest("index", "_doc", "id_" + i); + break; + default: + throw new AssertionError("unknown type"); + } + items[i] = new BulkItemRequest(i, request); + } + return new BulkShardRequest(new ShardId("index", "_na_", 0), + randomFrom(WriteRequest.RefreshPolicy.values()), items); + } + + public void testTranslogLocation() { + + BulkShardRequest shardRequest = generateRandomRequest(); + + Translog.Location expectedLocation = null; + final IndexShard primary = mock(IndexShard.class); + when(primary.shardId()).thenReturn(shardRequest.shardId()); + + long translogGen = 0; + long translogOffset = 0; + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); + while (context.hasMoreOperationsToExecute()) { + final Engine.Result result; + final DocWriteRequest current = context.getCurrent(); + final boolean failure = rarely(); + if (frequently()) { + translogGen += randomIntBetween(1, 4); + translogOffset = 0; + } else { + translogOffset += randomIntBetween(200, 400); + } + + Translog.Location location = new Translog.Location(translogGen, translogOffset, randomInt(200)); + switch (current.opType()) { + case INDEX: + case CREATE: + context.setRequestToExecute(current); + if (failure) { + result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1); + } else { + result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location); + } + break; + case UPDATE: + context.setRequestToExecute(new IndexRequest(current.index(), current.type(), current.id())); + if (failure) { + result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1, 1); + } else { + result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location); + } + break; + case DELETE: + context.setRequestToExecute(current); + if (failure) { + result = new Engine.DeleteResult(new ElasticsearchException("bla"), 1, 1); + } else { + result = new FakeDeleteResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location); + } + break; + default: + throw new AssertionError("unknown type:" + current.opType()); + } + if (failure == false) { + expectedLocation = location; + } + context.markOperationAsExecuted(result); + context.markAsCompleted(context.getExecutionResult()); + } + + assertThat(context.getLocationToSync(), equalTo(expectedLocation)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java index 0cdc9db916c0f..8a7c46ebcf60e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java @@ -405,16 +405,20 @@ public void testBulkUpdateLargerVolume() throws Exception { assertThat("expected no failures but got: " + response.buildFailureMessage(), response.hasFailures(), equalTo(false)); assertThat(response.getItems().length, equalTo(numDocs)); for (int i = 0; i < numDocs; i++) { - assertThat(response.getItems()[i].getItemId(), equalTo(i)); - assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); - assertThat(response.getItems()[i].getIndex(), equalTo("test")); - assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); + final BulkItemResponse itemResponse = response.getItems()[i]; + assertThat(itemResponse.getFailure(), nullValue()); + assertThat(itemResponse.isFailed(), equalTo(false)); + assertThat(itemResponse.getItemId(), equalTo(i)); + assertThat(itemResponse.getId(), equalTo(Integer.toString(i))); + assertThat(itemResponse.getIndex(), equalTo("test")); + assertThat(itemResponse.getType(), equalTo("type1")); + assertThat(itemResponse.getOpType(), equalTo(OpType.UPDATE)); for (int j = 0; j < 5; j++) { GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).get(); assertThat(getResponse.isExists(), equalTo(false)); } } + assertThat(response.hasFailures(), equalTo(false)); } public void testBulkIndexingWhileInitializing() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index bbe25ea02d60c..e60ee1395a858 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; -import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.TransportWriteAction.WritePrimaryResult; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateRequest; @@ -39,12 +38,13 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -52,10 +52,9 @@ import org.elasticsearch.rest.RestStatus; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.LongSupplier; import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode; import static org.hamcrest.CoreMatchers.equalTo; @@ -63,103 +62,103 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TransportShardBulkActionTests extends IndexShardTestCase { private final ShardId shardId = new ShardId("index", "_na_", 0); private final Settings idxSettings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .put("index.version.created", Version.CURRENT.id) - .build(); + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT.id) + .build(); private IndexMetaData indexMetaData() throws IOException { return IndexMetaData.builder("index") - .putMapping("_doc", - "{\"properties\":{\"foo\":{\"type\":\"text\",\"fields\":" + - "{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}") - .settings(idxSettings) - .primaryTerm(0, 1).build(); + .putMapping("_doc", + "{\"properties\":{\"foo\":{\"type\":\"text\",\"fields\":" + + "{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}") + .settings(idxSettings) + .primaryTerm(0, 1).build(); } public void testShouldExecuteReplicaItem() throws Exception { // Successful index request should be replicated DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NORMAL)); + equalTo(ReplicaItemExecutionMode.NORMAL)); // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( - new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure("index", "type", "id", - new IllegalArgumentException("i died")))); + new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "type", "id", + new IllegalArgumentException("i died")))); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NOOP)); + equalTo(ReplicaItemExecutionMode.NOOP)); // Failed index requests with sequence no should be replicated request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( - new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure("index", "type", "id", - new IllegalArgumentException( - "i died after sequence no was generated"), - 1))); + new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "type", "id", + new IllegalArgumentException( + "i died after sequence no was generated"), + 1))); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.FAILURE)); + equalTo(ReplicaItemExecutionMode.FAILURE)); // NOOP requests should not be replicated DocWriteRequest updateRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); request = new BulkItemRequest(0, updateRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, - response)); + response)); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NOOP)); + equalTo(ReplicaItemExecutionMode.NOOP)); } - public void testExecuteBulkIndexRequest() throws Exception { - IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(true); BulkItemRequest[] items = new BulkItemRequest[1]; boolean create = randomBoolean(); DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE) - .create(create); + .create(create); BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); items[0] = primaryRequest; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; - - Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, - shard, bulkShardRequest, location, 0, updateHelper, - threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer()); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Translog should change, since there were no problems - assertThat(newLocation, not(location)); + assertNotNull(context.getLocationToSync()); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), - equalTo(create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX)); + equalTo(create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX)); assertFalse(primaryResponse.isFailed()); // Assert that the document actually made it there @@ -170,13 +169,12 @@ public void testExecuteBulkIndexRequest() throws Exception { items[0] = primaryRequest; bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location secondLocation = - TransportShardBulkAction.executeBulkItemRequest( metaData, - shard, bulkShardRequest, newLocation, 0, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail"))); + BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, + threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); - // Translog should not change, since the document was not indexed due to a version conflict - assertThat(secondLocation, equalTo(newLocation)); + assertNull(secondContext.getLocationToSync()); BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; @@ -194,7 +192,7 @@ public void testExecuteBulkIndexRequest() throws Exception { assertThat(failure.getId(), equalTo("id")); assertThat(failure.getCause().getClass(), equalTo(VersionConflictEngineException.class)); assertThat(failure.getCause().getMessage(), - containsString("version conflict, document already exists (current version [1])")); + containsString("version conflict, document already exists (current version [1])")); assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); assertThat(replicaRequest, equalTo(primaryRequest)); @@ -224,7 +222,8 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { UpdateHelper updateHelper = null; WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer()); + bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + () -> {}); // since at least 1 item passed, the tran log location should exist, assertThat(result.location, notNullValue()); @@ -254,52 +253,85 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { closeShards(shard); } - public void testExecuteBulkIndexRequestWithRejection() throws Exception { - IndexMetaData metaData = indexMetaData(); - IndexShard shard = newStartedShard(true); + public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkItemRequest[] items = new BulkItemRequest[1]; DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location location = new Translog.Location(0, 0, 0); - UpdateHelper updateHelper = null; + Engine.IndexResult mappingUpdate = + new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + Translog.Location resultLocation = new Translog.Location(42, 42, 42); + Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); + + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(mappingUpdate); + + // Pretend the mappings haven't made it to the node yet + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + AtomicInteger updateCalled = new AtomicInteger(); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + (update, shardId, type) -> { + // There should indeed be a mapping update + assertNotNull(update); + updateCalled.incrementAndGet(); + }, () -> {}); + assertTrue(context.isInitial()); + assertTrue(context.hasMoreOperationsToExecute()); + + assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); + + // Verify that the shard "executed" the operation twice + verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); - // Pretend the mappings haven't made it to the node yet, and throw a rejection - expectThrows(ReplicationOperation.RetryOnPrimaryException.class, - () -> TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest, - location, 0, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer())); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(success); + + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); + + + // Verify that the shard "executed" the operation only once (2 for previous invocations plus + // 1 for this execution) + verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + + + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(writeRequest.opType())); + assertFalse(primaryResponse.isFailed()); closeShards(shard); } public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Exception { - IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(true); BulkItemRequest[] items = new BulkItemRequest[1]; DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); items[0] = new BulkItemRequest(0, writeRequest); - BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; - // Return an exception when trying to update the mapping + // Return an exception when trying to update the mapping, or when waiting for it to come RuntimeException err = new RuntimeException("some kind of exception"); - Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, - shard, bulkShardRequest, location, 0, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(err)); + boolean errorOnWait = randomBoolean(); + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), + errorOnWait ? () -> { throw err; } : () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); - // Translog shouldn't change, as there were conflicting mappings - assertThat(newLocation, equalTo(location)); + // Translog shouldn't be synced, as there were conflicting mappings + assertThat(context.getLocationToSync(), nullValue()); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); @@ -320,24 +352,24 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex } public void testExecuteBulkDeleteRequest() throws Exception { - IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(true); BulkItemRequest[] items = new BulkItemRequest[1]; DocWriteRequest writeRequest = new DeleteRequest("index", "_doc", "id"); items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; - Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, - shard, bulkShardRequest, location, 0, updateHelper, - threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer()); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, even though the document didn't exist - assertThat(newLocation, not(location)); + assertThat(context.getLocationToSync(), not(location)); BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; DocWriteRequest replicaDeleteRequest = replicaRequest.request(); @@ -369,14 +401,15 @@ public void testExecuteBulkDeleteRequest() throws Exception { items[0] = new BulkItemRequest(0, writeRequest); bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - location = newLocation; + location = context.getLocationToSync(); - newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, - bulkShardRequest, location, 0, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer()); + context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, because the document was deleted - assertThat(newLocation, not(location)); + assertThat(context.getLocationToSync(), not(location)); replicaRequest = bulkShardRequest.items()[0]; replicaDeleteRequest = replicaRequest.request(); @@ -405,63 +438,79 @@ public void testExecuteBulkDeleteRequest() throws Exception { closeShards(shard); } - public void testNoopUpdateReplicaRequest() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "field", "value"); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + public void testNoopUpdateRequest() throws Exception { + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); DocWriteResponse noopUpdateResponse = new UpdateResponse(shardId, "_doc", "id", 0, - DocWriteResponse.Result.NOOP); - BulkItemResultHolder noopResults = new BulkItemResultHolder(noopUpdateResponse, null, - replicaRequest); + DocWriteResponse.Result.NOOP); - Translog.Location location = new Translog.Location(0, 0, 0); - BulkItemRequest[] items = new BulkItemRequest[0]; + IndexShard shard = mock(IndexShard.class); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(noopUpdateResponse, DocWriteResponse.Result.NOOP, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = TransportShardBulkAction.createPrimaryResponse( - noopResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(location, noopResults); + assertFalse(context.hasMoreOperationsToExecute()); // Basically nothing changes in the request since it's a noop - assertThat(newLocation, equalTo(location)); + assertThat(context.getLocationToSync(), nullValue()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse)); assertThat(primaryResponse.getResponse().getResult(), - equalTo(DocWriteResponse.Result.NOOP)); + equalTo(DocWriteResponse.Result.NOOP)); + assertThat(primaryResponse.getResponse().getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); } - public void testUpdateReplicaRequestWithFailure() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + public void testUpdateRequestWithFailure() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); Exception err = new ElasticsearchException("I'm dead <(x.x)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); - BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, - replicaRequest); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.indexSettings()).thenReturn(indexSettings); - Translog.Location location = new Translog.Location(0, 0, 0); - BulkItemRequest[] items = new BulkItemRequest[0]; + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = - TransportShardBulkAction.createPrimaryResponse( - failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(location, failedResults); + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Since this was not a conflict failure, the primary response // should be filled out with the failure information - assertThat(newLocation, equalTo(location)); + assertNull(context.getLocationToSync()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertTrue(primaryResponse.isFailed()); assertThat(primaryResponse.getFailureMessage(), containsString("I'm dead <(x.x)>")); BulkItemResponse.Failure failure = primaryResponse.getFailure(); @@ -472,33 +521,42 @@ public void testUpdateReplicaRequestWithFailure() throws Exception { assertThat(failure.getStatus(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); } - public void testUpdateReplicaRequestWithConflictFailure() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + + public void testUpdateRequestWithConflictFailure() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); Exception err = new VersionConflictEngineException(shardId, "_doc", "id", - "I'm conflicted <(;_;)>"); + "I'm conflicted <(;_;)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); - BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, - replicaRequest); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.indexSettings()).thenReturn(indexSettings); - Translog.Location location = new Translog.Location(0, 0, 0); - BulkItemRequest[] items = new BulkItemRequest[0]; + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = - TransportShardBulkAction.createPrimaryResponse( - failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(location, failedResults); - // Since this was not a conflict failure, the primary response - // should be filled out with the failure information - assertThat(newLocation, equalTo(location)); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); + + assertNull(context.getLocationToSync()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertTrue(primaryResponse.isFailed()); assertThat(primaryResponse.getFailureMessage(), containsString("I'm conflicted <(;_;)>")); BulkItemResponse.Failure failure = primaryResponse.getFailure(); @@ -509,329 +567,268 @@ public void testUpdateReplicaRequestWithConflictFailure() throws Exception { assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); } - public void testUpdateReplicaRequestWithSuccess() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + public void testUpdateRequestWithSuccess() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); boolean created = randomBoolean(); Translog.Location resultLocation = new Translog.Location(42, 42, 42); - Engine.IndexResult indexResult = new FakeResult(1, 1, 1, created, resultLocation); - DocWriteResponse indexResponse = new IndexResponse(shardId, "_doc", "id", 1, 17, 1, created); - BulkItemResultHolder goodResults = - new BulkItemResultHolder(indexResponse, indexResult, replicaRequest); + Engine.IndexResult indexResult = new FakeIndexResult(1, 1, 13, created, resultLocation); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.indexSettings()).thenReturn(indexSettings); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, created ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); - Translog.Location originalLocation = new Translog.Location(21, 21, 21); - BulkItemRequest[] items = new BulkItemRequest[0]; + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = - TransportShardBulkAction.createPrimaryResponse( - goodResults, DocWriteRequest.OpType.INDEX, bulkShardRequest); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(originalLocation, goodResults); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced - assertThat(newLocation, equalTo(resultLocation)); + assertThat(context.getLocationToSync(), equalTo(resultLocation)); + assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); // Since this was not a conflict failure, the primary response // should be filled out with the failure information + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); DocWriteResponse response = primaryResponse.getResponse(); assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK)); + assertThat(response.getSeqNo(), equalTo(13L)); } - public void testCalculateTranslogLocation() throws Exception { - final Translog.Location original = new Translog.Location(0, 0, 0); + public void testUpdateWithDelete() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); - BulkItemResultHolder results = new BulkItemResultHolder(null, null, replicaRequest); + DeleteRequest updateResponse = new DeleteRequest("index", "_doc", "id"); - assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results), - equalTo(original)); + boolean found = randomBoolean(); + Translog.Location resultLocation = new Translog.Location(42, 42, 42); + final long resultSeqNo = 13; + Engine.DeleteResult deleteResult = new FakeDeleteResult(1, 1, resultSeqNo, found, resultLocation); + IndexShard shard = mock(IndexShard.class); + when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any())).thenReturn(deleteResult); + when(shard.indexSettings()).thenReturn(indexSettings); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, DocWriteResponse.Result.DELETED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - boolean created = randomBoolean(); - DocWriteResponse indexResponse = new IndexResponse(shardId, "_doc", "id", 1, 17, 1, created); - Translog.Location newLocation = new Translog.Location(1, 1, 1); - final long version = randomNonNegativeLong(); - final long seqNo = randomNonNegativeLong(); - Engine.IndexResult indexResult = new IndexResultWithLocation(version, 0L, seqNo, created, newLocation); - results = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest); - assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results), - equalTo(newLocation)); - } + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); - public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { - final IndexShard shard = spy(newStartedShard(false)); - BulkItemRequest itemRequest = new BulkItemRequest(0, new IndexRequest("index", "_doc").source(Requests.INDEX_CONTENT_TYPE)); - final String failureMessage = "simulated primary failure"; - final IOException exception = new IOException(failureMessage); - itemRequest.setPrimaryResponse(new BulkItemResponse(0, - randomFrom( - DocWriteRequest.OpType.CREATE, - DocWriteRequest.OpType.DELETE, - DocWriteRequest.OpType.INDEX - ), - new BulkItemResponse.Failure("index", "_doc", "1", - exception, 1L) - )); - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - itemRequests[0] = itemRequest; - BulkShardRequest bulkShardRequest = new BulkShardRequest( - shard.shardId(), RefreshPolicy.NONE, itemRequests); - TransportShardBulkAction.performOnReplica(bulkShardRequest, shard); - verify(shard, times(1)).markSeqNoAsNoop(1, exception.toString()); - closeShards(shard); + // Check that the translog is successfully advanced + assertThat(context.getLocationToSync(), equalTo(resultLocation)); + assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + DocWriteResponse response = primaryResponse.getResponse(); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertThat(response.getSeqNo(), equalTo(resultSeqNo)); } - public void testMappingUpdateParsesCorrectNumberOfTimes() throws Exception { - IndexMetaData metaData = indexMetaData(); - logger.info("--> metadata.getIndex(): {}", metaData.getIndex()); - final IndexShard shard = spy(newStartedShard(true)); - - IndexRequest request = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); - - final AtomicInteger updateCalled = new AtomicInteger(0); - expectThrows(ReplicationOperation.RetryOnPrimaryException.class, - () -> TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard, - (update, shardId, type) -> { - // There should indeed be a mapping update - assertNotNull(update); - updateCalled.incrementAndGet(); - })); - - assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); + public void testFailureDuringUpdateProcessing() throws Exception { + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - // Verify that the shard "executed" the operation twice - verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + IndexShard shard = mock(IndexShard.class); - // Update the mapping, so the next mapping updater doesn't do anything - final MapperService mapperService = shard.mapperService(); - logger.info("--> mapperService.index(): {}", mapperService.index()); - mapperService.updateMapping(metaData); + UpdateHelper updateHelper = mock(UpdateHelper.class); + final ElasticsearchException err = new ElasticsearchException("oops"); + when(updateHelper.prepare(any(), eq(shard), any())).thenThrow(err); + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard, - (update, shardId, type) -> fail("should not have had to update the mappings")); - // Verify that the shard "executed" the operation only once (2 for previous invocations plus - // 1 for this execution) - verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); - closeShards(shard); + assertNull(context.getLocationToSync()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + assertTrue(primaryResponse.isFailed()); + assertThat(primaryResponse.getFailureMessage(), containsString("oops")); + BulkItemResponse.Failure failure = primaryResponse.getFailure(); + assertThat(failure.getIndex(), equalTo("index")); + assertThat(failure.getType(), equalTo("_doc")); + assertThat(failure.getId(), equalTo("id")); + assertThat(failure.getCause(), equalTo(err)); + assertThat(failure.getStatus(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); } - public class IndexResultWithLocation extends Engine.IndexResult { - private final Translog.Location location; - public IndexResultWithLocation(long version, long term, long seqNo, boolean created, Translog.Location newLocation) { - super(version, term, seqNo, created); - this.location = newLocation; + public void testTranslogPositionToSync() throws Exception { + IndexShard shard = newStartedShard(true); + + BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(2, 5)]; + for (int i = 0; i < items.length; i++) { + DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id_" + i) + .source(Requests.INDEX_CONTENT_TYPE) + .opType(DocWriteRequest.OpType.INDEX); + items[i] = new BulkItemRequest(i, writeRequest); } + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - @Override - public Translog.Location getTranslogLocation() { - return this.location; + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + while (context.hasMoreOperationsToExecute()) { + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); } - } - public void testProcessUpdateResponse() throws Exception { - IndexShard shard = newStartedShard(false); + assertTrue(shard.isSyncNeeded()); - UpdateRequest updateRequest = new UpdateRequest("index", "_doc", "id"); - BulkItemRequest request = new BulkItemRequest(0, updateRequest); - Exception err = new VersionConflictEngineException(shardId, "_doc", "id", - "I'm conflicted <(;_;)>"); - Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); - Engine.DeleteResult deleteResult = new Engine.DeleteResult(1, 1, 1, true); - DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED; - DocWriteResponse.Result deleteWriteResult = DocWriteResponse.Result.DELETED; - IndexRequest indexRequest = new IndexRequest("index", "_doc", "id"); - DeleteRequest deleteRequest = new DeleteRequest("index", "_doc", "id"); - UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult, - new HashMap(), XContentType.JSON); - UpdateHelper.Result translateDelete = new UpdateHelper.Result(deleteRequest, deleteWriteResult, - new HashMap(), XContentType.JSON); + // if we sync the location, nothing else is unsynced + CountDownLatch latch = new CountDownLatch(1); + shard.sync(context.getLocationToSync(), e -> { + if (e != null) { + throw new AssertionError(e); + } + latch.countDown(); + }); - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - itemRequests[0] = request; - - BulkItemResultHolder holder = TransportShardBulkAction.processUpdateResponse(updateRequest, - "index", indexResult, translate, shard, 7); - - assertTrue(holder.isVersionConflict()); - assertThat(holder.response, instanceOf(UpdateResponse.class)); - UpdateResponse updateResp = (UpdateResponse) holder.response; - assertThat(updateResp.getGetResult(), equalTo(null)); - assertThat(holder.operationResult, equalTo(indexResult)); - BulkItemRequest replicaBulkRequest = holder.replicaRequest; - assertThat(replicaBulkRequest.id(), equalTo(7)); - DocWriteRequest replicaRequest = replicaBulkRequest.request(); - assertThat(replicaRequest, instanceOf(IndexRequest.class)); - assertThat(replicaRequest, equalTo(indexRequest)); - - BulkItemResultHolder deleteHolder = TransportShardBulkAction.processUpdateResponse(updateRequest, - "index", deleteResult, translateDelete, shard, 8); - - assertFalse(deleteHolder.isVersionConflict()); - assertThat(deleteHolder.response, instanceOf(UpdateResponse.class)); - UpdateResponse delUpdateResp = (UpdateResponse) deleteHolder.response; - assertThat(delUpdateResp.getGetResult(), equalTo(null)); - assertThat(deleteHolder.operationResult, equalTo(deleteResult)); - BulkItemRequest delReplicaBulkRequest = deleteHolder.replicaRequest; - assertThat(delReplicaBulkRequest.id(), equalTo(8)); - DocWriteRequest delReplicaRequest = delReplicaBulkRequest.request(); - assertThat(delReplicaRequest, instanceOf(DeleteRequest.class)); - assertThat(delReplicaRequest, equalTo(deleteRequest)); + latch.await(); + assertFalse(shard.isSyncNeeded()); closeShards(shard); } - public void testExecuteUpdateRequestOnce() throws Exception { - IndexMetaData metaData = indexMetaData(); - IndexShard shard = newStartedShard(true); + public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { + final IndexShard shard = spy(newStartedShard(false)); + BulkItemRequest itemRequest = new BulkItemRequest(0, new IndexRequest("index", "_doc").source(Requests.INDEX_CONTENT_TYPE)); + final String failureMessage = "simulated primary failure"; + final IOException exception = new IOException(failureMessage); + itemRequest.setPrimaryResponse(new BulkItemResponse(0, + randomFrom( + DocWriteRequest.OpType.CREATE, + DocWriteRequest.OpType.DELETE, + DocWriteRequest.OpType.INDEX + ), + new BulkItemResponse.Failure("index", "_doc", "1", + exception, 1L) + )); + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + itemRequests[0] = itemRequest; + BulkShardRequest bulkShardRequest = new BulkShardRequest( + shard.shardId(), RefreshPolicy.NONE, itemRequests); + TransportShardBulkAction.performOnReplica(bulkShardRequest, shard); + verify(shard, times(1)).markSeqNoAsNoop(1, exception.toString()); + closeShards(shard); + } - Map source = new HashMap<>(); - BulkItemRequest[] items = new BulkItemRequest[1]; - boolean create = randomBoolean(); - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE) - .create(create); + public void testRetries() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + UpdateRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + // the beating will continue until success has come. + writeRequest.retryOnConflict(Integer.MAX_VALUE); BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - items[0] = primaryRequest; - IndexRequest indexRequest = new IndexRequest("index", "_doc", "id"); - indexRequest.source(source); - - DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED; - UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult, - new HashMap(), XContentType.JSON); - UpdateHelper updateHelper = new MockUpdateHelper(translate); - UpdateRequest updateRequest = new UpdateRequest("index", "_doc", "id"); - updateRequest.upsert(source); - - BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData, - "index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, - new ThrowingMappingUpdatePerformer(new RuntimeException())); - - assertFalse(holder.isVersionConflict()); - assertNotNull(holder.response); - assertNotNull(holder.operationResult); - assertNotNull(holder.replicaRequest); - - assertThat(holder.response, instanceOf(UpdateResponse.class)); - UpdateResponse updateResp = (UpdateResponse) holder.response; - assertThat(updateResp.getGetResult(), equalTo(null)); - BulkItemRequest replicaBulkRequest = holder.replicaRequest; - assertThat(replicaBulkRequest.id(), equalTo(0)); - DocWriteRequest replicaRequest = replicaBulkRequest.request(); - assertThat(replicaRequest, instanceOf(IndexRequest.class)); - assertThat(replicaRequest, equalTo(indexRequest)); + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); - // Assert that the document actually made it there - assertDocCount(shard, 1); - closeShards(shard); - } + Exception err = new VersionConflictEngineException(shardId, "_doc", "id", + "I'm conflicted <(;_;)>"); + Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0, 0); + Engine.IndexResult mappingUpdate = + new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + Translog.Location resultLocation = new Translog.Location(42, 42, 42); + Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); - public void testExecuteUpdateRequestOnceWithFailure() throws Exception { - IndexMetaData metaData = indexMetaData(); - IndexShard shard = newStartedShard(true); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenAnswer(ir -> { + if (randomBoolean()) { + return conflictedResult; + } + if (randomBoolean()) { + return mappingUpdate; + } else { + return success; + } + }); + when(shard.indexSettings()).thenReturn(indexSettings); - Map source = new HashMap<>(); - source.put("foo", "bar"); - BulkItemRequest[] items = new BulkItemRequest[1]; - boolean create = randomBoolean(); - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar") - .create(create); - BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - items[0] = primaryRequest; + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); - IndexRequest indexRequest = new IndexRequest("index", "_doc", "id"); - indexRequest.source(source); - - Exception prepareFailure = new IllegalArgumentException("I failed to do something!"); - UpdateHelper updateHelper = new FailingUpdateHelper(prepareFailure); - UpdateRequest updateRequest = new UpdateRequest("index", "_doc", "id"); - updateRequest.upsert(source); - - BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData, - "index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, new NoopMappingUpdatePerformer()); - - assertFalse(holder.isVersionConflict()); - assertNull(holder.response); - assertNotNull(holder.operationResult); - assertNotNull(holder.replicaRequest); - - Engine.IndexResult opResult = (Engine.IndexResult) holder.operationResult; - assertThat(opResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); - assertFalse(opResult.isCreated()); - Exception e = opResult.getFailure(); - assertThat(e.getMessage(), containsString("I failed to do something!")); - - BulkItemRequest replicaBulkRequest = holder.replicaRequest; - assertThat(replicaBulkRequest.id(), equalTo(0)); - assertThat(replicaBulkRequest.request(), instanceOf(IndexRequest.class)); - IndexRequest replicaRequest = (IndexRequest) replicaBulkRequest.request(); - assertThat(replicaRequest.index(), equalTo("index")); - assertThat(replicaRequest.type(), equalTo("_doc")); - assertThat(replicaRequest.id(), equalTo("id")); - assertThat(replicaRequest.sourceAsMap(), equalTo(source)); - - // Assert that the document did not make it there, since it should have failed - assertDocCount(shard, 0); - closeShards(shard); - } - - /** - * Fake UpdateHelper that always returns whatever result you give it - */ - private static class MockUpdateHelper extends UpdateHelper { - private final UpdateHelper.Result result; + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - MockUpdateHelper(UpdateHelper.Result result) { - super(Settings.EMPTY, null); - this.result = result; - } + WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( + bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + () -> {}); - @Override - public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) { - logger.info("--> preparing update for {} - {}", s, u); - return result; - } + assertThat(result.location, equalTo(resultLocation)); + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + DocWriteResponse response = primaryResponse.getResponse(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + assertThat(response.getSeqNo(), equalTo(13L)); } /** - * An update helper that always fails to prepare the update + * Fake IndexResult that has a settable translog location */ - private static class FailingUpdateHelper extends UpdateHelper { - private final Exception e; + static class FakeIndexResult extends Engine.IndexResult { + + private final Translog.Location location; - FailingUpdateHelper(Exception failure) { - super(Settings.EMPTY, null); - this.e = failure; + protected FakeIndexResult(long version, long term, long seqNo, boolean created, Translog.Location location) { + super(version, term, seqNo, created); + this.location = location; } @Override - public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) { - logger.info("--> preparing failing update for {} - {}", s, u); - throw new ElasticsearchException(e); + public Translog.Location getTranslogLocation() { + return this.location; } } /** - * Fake IndexResult that has a settable translog location + * Fake DeleteResult that has a settable translog location */ - private static class FakeResult extends Engine.IndexResult { + static class FakeDeleteResult extends Engine.DeleteResult { private final Translog.Location location; - protected FakeResult(long version, long term, long seqNo, boolean created, Translog.Location location) { - super(version, term, seqNo, created); + protected FakeDeleteResult(long version, long term, long seqNo, boolean found, Translog.Location location) { + super(version, term, seqNo, found); this.location = location; } @@ -851,6 +848,7 @@ public void updateMappings(Mapping update, ShardId shardId, String type) { /** Always throw the given exception */ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { private final RuntimeException e; + ThrowingMappingUpdatePerformer(RuntimeException e) { this.e = e; } diff --git a/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java index 5a5f279985f4c..682b1deb14662 100644 --- a/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -91,6 +91,7 @@ public void testBulkWithUpdateItems() throws Exception { BulkResponse bulkResponse = bulkRequestBuilder.get(); for (BulkItemResponse item : bulkResponse) { + assertThat(item.getFailure(), nullValue()); assertThat(item.isFailed(), equalTo(false)); assertShardInfo(item.getResponse()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index a40f950b02e0d..77bc644909ab8 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -629,7 +629,8 @@ private TransportWriteAction.WritePrimaryResult result; try (Releasable ignored = permitAcquiredFuture.actionGet()) { MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; - result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater); + result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, + null); } TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); return result;