Skip to content

Commit

Permalink
CCR: Translog op on primary should have versionType
Browse files Browse the repository at this point in the history
Normally translog operations will not be replayed on the primary.
Following engine is an exception where we replay translog on both
primary and replica as a non-primary strategy.  Even though we won't use
the version_type in the following engine, we still need to pass a valid
value for the primary operation in order not to trip assertions in an
engine.

This commit passes version_type EXTERNAL for translog operation if its
origin is primary.

Relates #31945
  • Loading branch information
dnhatn committed Jul 20, 2018
1 parent a6b7497 commit fe574f8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1214,21 +1214,23 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
}

public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
final Engine.Result result;
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
// autoGeneratedID docs that are coming from the primary are updated correctly.
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
null, index.getAutoGeneratedIdTimestamp(), true, origin,
versionType, index.getAutoGeneratedIdTimestamp(), true, origin,
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
XContentHelper.xContentType(index.source())).routing(index.routing()));
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
null, origin);
versionType, origin);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private void preFlight(final Operation operation) {
if (operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalStateException("a following engine does not accept operations without an assigned sequence number");
}
assert (operation.origin() == Operation.Origin.PRIMARY) == (operation.versionType() == VersionType.EXTERNAL) :
"invalid version_type in a following engine; version_type=" + operation.versionType() + "origin=" + operation.origin();
}

@Override
Expand Down

0 comments on commit fe574f8

Please sign in to comment.