Skip to content

Commit

Permalink
Follow engine should not fill gaps upon promotion and recovery.
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Jul 3, 2018
1 parent 05b4517 commit 8597ac0
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del
return planDeletionAsNonPrimary(delete);
}

@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
// a noop implementation, because follow shard does not own the history but the leader shard does.
return 0;
}

@Override
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
// sequence number should be set when operation origin is primary
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.index.engine;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.equalTo;

public class FollowEngineIndexShardTests extends IndexShardTestCase {

public void testDoNotFillGaps() throws Exception {
Settings settings = Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.build();
final IndexShard indexShard = newStartedShard(false, settings, new FollowingEngineFactory());

long seqNo = -1;
for (int i = 0; i < 8; i++) {
final String id = Long.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(++seqNo,
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
}
long seqNoBeforeGap = seqNo;
seqNo += 8;
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", "9",
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(seqNo,
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);

// promote the replica to primary:
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());

final CountDownLatch latch = new CountDownLatch(1);
ActionListener<Releasable> actionListener = ActionListener.wrap(releasable -> {
releasable.close();
latch.countDown();
}, e -> {throw new RuntimeException(e);});
indexShard.acquirePrimaryOperationPermit(actionListener, ThreadPool.Names.GENERIC, "");
latch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo(seqNoBeforeGap));
indexShard.refresh("test");
assertThat(indexShard.docStats().getCount(), equalTo(9L));
closeShards(indexShard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,50 +150,7 @@ public void runIndexTest(
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
final String id = "id";
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
final String type = "type";
final Field versionField = new NumericDocValuesField("_version", 0);
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
final ParseContext.Document document = new ParseContext.Document();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[]{1});
final ParsedDocument parsedDocument = new ParsedDocument(
versionField,
seqID,
id,
type,
"routing",
Collections.singletonList(document),
source,
XContentType.JSON,
null);

final long version;
final long autoGeneratedIdTimestamp;
if (randomBoolean()) {
version = 1;
autoGeneratedIdTimestamp = System.currentTimeMillis();
} else {
version = randomNonNegativeLong();
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
final Engine.Index index = new Engine.Index(
new Term("_id", parsedDocument.id()),
parsedDocument,
seqNo,
primaryTerm.get(),
version,
VersionType.EXTERNAL,
origin,
System.currentTimeMillis(),
autoGeneratedIdTimestamp,
randomBoolean());

final Engine.Index index = createIndexOp("id", seqNo, origin);
consumer.accept(followingEngine, index);
}
}
Expand Down Expand Up @@ -243,6 +200,26 @@ public void runDeleteTest(
}
}

public void testDoNotFillSeqNoGaps() throws Exception {
final Settings settings =
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
followingEngine.index(createIndexOp("id", 128, Engine.Operation.Origin.PRIMARY));
int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get());
assertThat(addedNoops, equalTo(0));
}
}
}

private EngineConfig engineConfig(
final ShardId shardId,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -307,4 +284,49 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO
return followingEngine;
}

private Engine.Index createIndexOp(String id, long seqNo, Engine.Operation.Origin origin) {
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
final String type = "type";
final Field versionField = new NumericDocValuesField("_version", 0);
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
final ParseContext.Document document = new ParseContext.Document();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[]{1});
final ParsedDocument parsedDocument = new ParsedDocument(
versionField,
seqID,
id,
type,
"routing",
Collections.singletonList(document),
source,
XContentType.JSON,
null);

final long version;
final long autoGeneratedIdTimestamp;
if (randomBoolean()) {
version = 1;
autoGeneratedIdTimestamp = System.currentTimeMillis();
} else {
version = randomNonNegativeLong();
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
return new Engine.Index(
new Term("_id", parsedDocument.id()),
parsedDocument,
seqNo,
primaryTerm.get(),
version,
VersionType.EXTERNAL,
origin,
System.currentTimeMillis(),
autoGeneratedIdTimestamp,
randomBoolean());
}

}

0 comments on commit 8597ac0

Please sign in to comment.