diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index ae15f76c4aea5..1e41fad940c5c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -58,6 +58,12 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del return planDeletionAsNonPrimary(delete); } + @Override + public int fillSeqNoGaps(long primaryTerm) throws IOException { + // a noop implementation, because follow shard does not own the history but the leader shard does. + return 0; + } + @Override protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) { // sequence number should be set when operation origin is primary diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java new file mode 100644 index 0000000000000..54a56c4c3913d --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.index.engine; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrSettings; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.hamcrest.Matchers.equalTo; + +public class FollowEngineIndexShardTests extends IndexShardTestCase { + + public void testDoNotFillGaps() throws Exception { + Settings settings = Settings.builder() + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .build(); + final IndexShard indexShard = newStartedShard(false, settings, new FollowingEngineFactory()); + + long seqNo = -1; + for (int i = 0; i < 8; i++) { + final String id = Long.toString(i); + SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id, + new BytesArray("{}"), XContentType.JSON); + indexShard.applyIndexOperationOnReplica(++seqNo, + 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + } + long seqNoBeforeGap = seqNo; + seqNo += 8; + SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", "9", + new BytesArray("{}"), XContentType.JSON); + indexShard.applyIndexOperationOnReplica(seqNo, + 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + + // promote the replica to primary: + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = + newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replicaRouting.allocationId()); + indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, + 0L, Collections.singleton(primaryRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet()); + + final CountDownLatch latch = new CountDownLatch(1); + ActionListener actionListener = ActionListener.wrap(releasable -> { + releasable.close(); + latch.countDown(); + }, e -> {assert false : "expected no exception, but got [" + e.getMessage() + "]";}); + indexShard.acquirePrimaryOperationPermit(actionListener, ThreadPool.Names.GENERIC, ""); + latch.await(); + assertThat(indexShard.getLocalCheckpoint(), equalTo(seqNoBeforeGap)); + indexShard.refresh("test"); + assertThat(indexShard.docStats().getCount(), equalTo(9L)); + closeShards(indexShard); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 3569fbc420510..ff877724ce4b5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -150,50 +150,7 @@ public void runIndexTest( try (Store store = createStore(shardId, indexSettings, newDirectory())) { final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { - final String id = "id"; - final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); - final String type = "type"; - final Field versionField = new NumericDocValuesField("_version", 0); - final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - final ParseContext.Document document = new ParseContext.Document(); - document.add(uidField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - final BytesReference source = new BytesArray(new byte[]{1}); - final ParsedDocument parsedDocument = new ParsedDocument( - versionField, - seqID, - id, - type, - "routing", - Collections.singletonList(document), - source, - XContentType.JSON, - null); - - final long version; - final long autoGeneratedIdTimestamp; - if (randomBoolean()) { - version = 1; - autoGeneratedIdTimestamp = System.currentTimeMillis(); - } else { - version = randomNonNegativeLong(); - autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; - } - final Engine.Index index = new Engine.Index( - new Term("_id", parsedDocument.id()), - parsedDocument, - seqNo, - primaryTerm.get(), - version, - VersionType.EXTERNAL, - origin, - System.currentTimeMillis(), - autoGeneratedIdTimestamp, - randomBoolean()); - + final Engine.Index index = createIndexOp("id", seqNo, origin); consumer.accept(followingEngine, index); } } @@ -243,6 +200,26 @@ public void runDeleteTest( } } + public void testDoNotFillSeqNoGaps() throws Exception { + final Settings settings = + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { + followingEngine.index(createIndexOp("id", 128, Engine.Operation.Origin.PRIMARY)); + int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get()); + assertThat(addedNoops, equalTo(0)); + } + } + } + private EngineConfig engineConfig( final ShardId shardId, final IndexSettings indexSettings, @@ -307,4 +284,49 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO return followingEngine; } + private Engine.Index createIndexOp(String id, long seqNo, Engine.Operation.Origin origin) { + final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); + final String type = "type"; + final Field versionField = new NumericDocValuesField("_version", 0); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + final ParseContext.Document document = new ParseContext.Document(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + final BytesReference source = new BytesArray(new byte[]{1}); + final ParsedDocument parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + type, + "routing", + Collections.singletonList(document), + source, + XContentType.JSON, + null); + + final long version; + final long autoGeneratedIdTimestamp; + if (randomBoolean()) { + version = 1; + autoGeneratedIdTimestamp = System.currentTimeMillis(); + } else { + version = randomNonNegativeLong(); + autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } + return new Engine.Index( + new Term("_id", parsedDocument.id()), + parsedDocument, + seqNo, + primaryTerm.get(), + version, + VersionType.EXTERNAL, + origin, + System.currentTimeMillis(), + autoGeneratedIdTimestamp, + randomBoolean()); + } + }