From fe574f89f856a58df9467a83d9171e32407cfbb8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 20 Jul 2018 08:39:32 -0400 Subject: [PATCH] CCR: Translog op on primary should have versionType Normally translog operations will not be replayed on the primary. Following engine is an exception where we replay translog on both primary and replica as a non-primary strategy. Even though we won't use the version_type in the following engine, we still need to pass a valid value for the primary operation in order not to trip assertions in an engine. This commit passes version_type EXTERNAL for translog operation if its origin is primary. Relates #31945 --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 6 ++++-- .../xpack/ccr/index/engine/FollowingEngine.java | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d69072e0cb0da..21982d429752b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1214,6 +1214,8 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { } public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { + // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. + final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; final Engine.Result result; switch (operation.opType()) { case INDEX: @@ -1221,14 +1223,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(), - null, index.getAutoGeneratedIdTimestamp(), true, origin, + versionType, index.getAutoGeneratedIdTimestamp(), true, origin, source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())).routing(index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), - null, origin); + versionType, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 1e41fad940c5c..24ada3755cb2a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -44,6 +44,8 @@ private void preFlight(final Operation operation) { if (operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO) { throw new IllegalStateException("a following engine does not accept operations without an assigned sequence number"); } + assert (operation.origin() == Operation.Origin.PRIMARY) == (operation.versionType() == VersionType.EXTERNAL) : + "invalid version_type in a following engine; version_type=" + operation.versionType() + "origin=" + operation.origin(); } @Override