Skip to content

Commit

Permalink
Don't swallow exceptions on replication (#31179)
Browse files Browse the repository at this point in the history
Swallowing these exceptions is dangerous as they can result in replicas going out-of-sync with the primary.

Follow-up to #28571
  • Loading branch information
ywelsch authored Jun 11, 2018
1 parent f825a53 commit 4e9b554
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,32 +486,24 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
DocWriteRequest docWriteRequest = item.request();
try {
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
case NOOP:
break;
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
default:
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!TransportActions.isShardNotAvailableException(e)) {
throw e;
}
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
case NOOP:
break;
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
default:
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
}
}
return location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,12 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (Translog.Operation operation : request.getOperations()) {
try {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate());
}
location = syncOperationResultOrThrow(operationResult, location);
} catch (Exception e) {
// if its not a failure to be ignored, let it bubble up
if (!TransportActions.isShardNotAvailableException(e)) {
throw e;
}
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate());
}
location = syncOperationResultOrThrow(operationResult, location);
}
if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,7 @@ protected static Location syncOperationResultOrThrow(final Engine.Result operati
// check if any transient write operation failures should be bubbled up
Exception failure = operationResult.getFailure();
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
} else {
location = currentLocation;
}
throw failure;
} else {
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
}
Expand Down

0 comments on commit 4e9b554

Please sign in to comment.