Skip to content

Commit

Permalink
Changes made to reproduce retry_on_conflict error in TransportShardBu…
Browse files Browse the repository at this point in the history
…lkAction

Signed-off-by: Raghuvansh Raj <[email protected]>
  • Loading branch information
raghuvanshraj committed Nov 9, 2023
1 parent 0a9dfec commit 9fd0cdb
Showing 1 changed file with 59 additions and 48 deletions.
107 changes: 59 additions & 48 deletions server/src/main/java/org/opensearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1052,54 +1052,65 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
final VersionValue versionValue = resolveDocVersion(index, index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
final long currentVersion;
final boolean currentNotFoundOrDeleted;
if (versionValue == null) {
currentVersion = Versions.NOT_FOUND;
currentNotFoundOrDeleted = true;
} else {
currentVersion = versionValue.version;
currentNotFoundOrDeleted = versionValue.isDelete();
}
if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentNotFoundOrDeleted) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index.id(),
index.getIfSeqNo(),
index.getIfPrimaryTerm(),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion);
} else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& (versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm())) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index.id(),
index.getIfSeqNo(),
index.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index,
currentVersion,
currentNotFoundOrDeleted
);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
if (reserveError != null) {
plan = IndexingStrategy.failAsTooManyDocs(reserveError);
} else {
plan = IndexingStrategy.processNormally(
currentNotFoundOrDeleted,
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()),
reservingDocs
);
}
}

// always throw a VersionConflictEngineException
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index.id(),
index.getIfSeqNo(),
index.getIfPrimaryTerm(),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);
plan = IndexingStrategy.skipDueToVersionConflict(e, true, Versions.NOT_FOUND);
// if (versionValue == null) {
// currentVersion = Versions.NOT_FOUND;
// currentNotFoundOrDeleted = true;
// } else {
// currentVersion = versionValue.version;
// currentNotFoundOrDeleted = versionValue.isDelete();
// }
// if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentNotFoundOrDeleted) {
// final VersionConflictEngineException e = new VersionConflictEngineException(
// shardId,
// index.id(),
// index.getIfSeqNo(),
// index.getIfPrimaryTerm(),
// SequenceNumbers.UNASSIGNED_SEQ_NO,
// SequenceNumbers.UNASSIGNED_PRIMARY_TERM
// );
// plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion);
// } else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
// && (versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm())) {
// final VersionConflictEngineException e = new VersionConflictEngineException(
// shardId,
// index.id(),
// index.getIfSeqNo(),
// index.getIfPrimaryTerm(),
// versionValue.seqNo,
// versionValue.term
// );
// plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
// } else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) {
// final VersionConflictEngineException e = new VersionConflictEngineException(
// shardId,
// index,
// currentVersion,
// currentNotFoundOrDeleted
// );
// plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
// } else {
// final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
// if (reserveError != null) {
// plan = IndexingStrategy.failAsTooManyDocs(reserveError);
// } else {
// plan = IndexingStrategy.processNormally(
// currentNotFoundOrDeleted,
// canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()),
// reservingDocs
// );
// }
// }
}
return plan;
}
Expand Down

0 comments on commit 9fd0cdb

Please sign in to comment.