Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove versionType from translog #31945

Merged
merged 14 commits into from
Jul 18, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,12 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
.routing(indexRequest.routing());
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(), sourceToParse);
indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
break;
case DELETE:
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.type(), deleteRequest.id());
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down
15 changes: 0 additions & 15 deletions server/src/main/java/org/elasticsearch/index/VersionType.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ public boolean validateVersionForReads(long version) {
// not allowing Versions.NOT_FOUND as it is not a valid input value.
return version > 0L || version == Versions.MATCH_ANY;
}

@Override
public VersionType versionTypeForReplicationAndRecovery() {
// replicas get the version from the primary after increment. The same version is stored in
// the transaction log. -> the should use the external semantics.
return EXTERNAL;
}
},
EXTERNAL((byte) 1) {
@Override
Expand Down Expand Up @@ -333,14 +326,6 @@ public byte getValue() {
*/
public abstract boolean validateVersionForReads(long version);

/**
* Some version types require different semantics for primary and replicas. This version allows
* the type to override the default behavior.
*/
public VersionType versionTypeForReplicationAndRecovery() {
return this;
}

public static VersionType fromString(String versionType) {
if ("internal".equals(versionType)) {
return INTERNAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,7 @@ public static class Index extends Operation {
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
Expand Down Expand Up @@ -1245,6 +1246,7 @@ public static class Delete extends Operation {
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ private boolean canOptimizeAddDocument(Index index) {
return true;
case PEER_RECOVERY:
case REPLICA:
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
assert index.version() == 1 && index.versionType() == null
: "version: " + index.version() + " type: " + index.versionType();
return true;
case LOCAL_TRANSLOG_RECOVERY:
Expand All @@ -704,20 +704,6 @@ private boolean canOptimizeAddDocument(Index index) {
return false;
}

private boolean assertVersionType(final Engine.Operation operation) {
if (operation.origin() == Operation.Origin.REPLICA ||
operation.origin() == Operation.Origin.PEER_RECOVERY ||
operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
// ensure that replica operation has expected version type for replication
// ensure that versionTypeForReplicationAndRecovery is idempotent
assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery()
: "unexpected version type in request from [" + operation.origin().name() + "] " +
"found [" + operation.versionType().name() + "] " +
"expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]";
}
return true;
}

private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
if (origin == Operation.Origin.PRIMARY) {
assert assertOriginPrimarySequenceNumber(seqNo);
Expand Down Expand Up @@ -757,7 +743,6 @@ public IndexResult index(Index index) throws IOException {
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
assert assertVersionType(index);
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
Expand Down Expand Up @@ -860,9 +845,6 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
Expand Down Expand Up @@ -1096,7 +1078,6 @@ private void updateDocs(final Term uid, final List<ParseContext.Document> docs,
public DeleteResult delete(Delete delete) throws IOException {
versionMap.enforceSafeAccess();
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
assert assertVersionType(delete);
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
final DeleteResult deleteResult;
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
Expand Down Expand Up @@ -1149,10 +1130,6 @@ public DeleteResult delete(Delete delete) throws IOException {

private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// drop out of order operations
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ delete.versionType() + "]";
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
Expand Down
23 changes: 11 additions & 12 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -645,22 +645,22 @@ private IndexShardState changeState(IndexShardState newState, String reason) {

public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
long autoGeneratedTimestamp, boolean isRetry) throws IOException {
assert versionType.validateVersionForWrites(version);
return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp,
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
}

public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType,
long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse)
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
boolean isRetry, SourceToParse sourceToParse)
throws IOException {
return applyIndexOperation(seqNo, primaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry,
return applyIndexOperation(seqNo, primaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
Engine.Operation.Origin.REPLICA, sourceToParse);
}

private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, VersionType versionType,
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType,
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
SourceToParse sourceToParse) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version);
ensureWriteAllowed(origin);
Engine.Index operation;
try {
Expand Down Expand Up @@ -736,19 +736,18 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {

public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
throws IOException {
assert versionType.validateVersionForWrites(version);
return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType,
Engine.Operation.Origin.PRIMARY);
}

public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id,
VersionType versionType) throws IOException {
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA);
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
}

private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
VersionType versionType, Engine.Operation.Origin origin) throws IOException {
@Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version);
ensureWriteAllowed(origin);
// When there is a single type, the unique identifier is only composed of the _id,
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
Expand Down Expand Up @@ -1211,14 +1210,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
// autoGeneratedID docs that are coming from the primary are updated correctly.
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin,
null, index.getAutoGeneratedIdTimestamp(), true, origin,
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
XContentHelper.xContentType(index.source())).routing(index.routing()));
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
delete.versionType().versionTypeForReplicationAndRecovery(), origin);
null, origin);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
Expand Down
Loading