Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specialize pre-closing checks for engine implementations (#38702) #38723

Merged
merged 1 commit into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,7 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
}

final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
if (indexShard.getGlobalCheckpoint() != maxSeqNo) {
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
}

indexShard.verifyShardBeforeIndexClosing();
indexShard.flush(new FlushRequest().force(true));
logger.trace("{} shard is ready for closing", shardId);
}
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,20 @@ protected final DocsStats docsStats(IndexReader indexReader) {
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

/**
* Performs the pre-closing checks on the {@link Engine}.
*
* @throws IllegalStateException if the sanity checks failed
*/
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
final long maxSeqNo = getSeqNoStats(globalCheckpoint).getMaxSeqNo();
if (globalCheckpoint != maxSeqNo) {
throw new IllegalStateException("Global checkpoint [" + globalCheckpoint
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
}
}

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final
}
}

@Override
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
// the value of the global checkpoint is verified when the read-only engine is opened,
// and it is not expected to change during the lifecycle of the engine. We could also
// check this value before closing the read-only engine but if something went wrong
// and the global checkpoint is not in-sync with the max. sequence number anymore,
// checking the value here again would prevent the read-only engine to be closed and
// reopened as an internal engine, which would be the path to fix the issue.
}

protected final DirectoryReader wrapReader(DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3092,4 +3092,13 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
}

/**
* Performs the pre-closing checks on the {@link IndexShard}.
*
* @throws IllegalStateException if the sanity checks failed
*/
public void verifyShardBeforeIndexClosing() throws IllegalStateException {
getEngine().verifyEngineBeforeIndexClosing();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -73,6 +71,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -100,8 +99,6 @@ public void setUp() throws Exception {

indexShard = mock(IndexShard.class);
when(indexShard.getActiveOperationsCount()).thenReturn(0);
when(indexShard.getGlobalCheckpoint()).thenReturn(0L);
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L));

final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
when(indexShard.shardId()).thenReturn(shardId);
Expand Down Expand Up @@ -174,17 +171,16 @@ public void testOperationFailsWithNoBlock() {
verify(indexShard, times(0)).flush(any(FlushRequest.class));
}

public void testOperationFailsWithGlobalCheckpointNotCaughtUp() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE);
final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo);
final long globalCheckpoint = randomValueOtherThan(maxSeqNo,
() -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint));
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint));
when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint);
public void testVerifyShardBeforeIndexClosing() throws Exception {
executeOnPrimaryOrReplica();
verify(indexShard, times(1)).verifyShardBeforeIndexClosing();
verify(indexShard, times(1)).flush(any(FlushRequest.class));
}

IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number ["
+ maxSeqNo + "] on index shard " + indexShard.shardId()));
public void testVerifyShardBeforeIndexClosingFailed() {
doThrow(new IllegalStateException("test")).when(indexShard).verifyShardBeforeIndexClosing();
expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
verify(indexShard, times(1)).verifyShardBeforeIndexClosing();
verify(indexShard, times(0)).flush(any(FlushRequest.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,25 @@ public void testReadOnly() throws IOException {
}
}
}

/**
* Test that {@link ReadOnlyEngine#verifyEngineBeforeIndexClosing()} never fails
* whatever the value of the global checkpoint to check is.
*/
public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
globalCheckpoint.set(randomNonNegativeLong());
try {
readOnlyEngine.verifyEngineBeforeIndexClosing();
} catch (final IllegalStateException e) {
fail("Read-only engine pre-closing verifications failed");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,12 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException {
public long getNumberOfOptimizedIndexing() {
return numOfOptimizedIndexing.count();
}

@Override
public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
// the value of the global checkpoint is not verified when the following engine is closed,
// allowing it to be closed even in the case where all operations have not been fetched and
// processed from the leader and the operations history has gaps. This way the following
// engine can be closed and reopened in order to bootstrap the follower index again.
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;

import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class CloseFollowerIndexIT extends CcrIntegTestCase {

public void testCloseAndReopenFollowerIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");

PutFollowAction.Request followRequest = new PutFollowAction.Request();
followRequest.setRemoteCluster("leader_cluster");
followRequest.setLeaderIndex("index1");
followRequest.setFollowerIndex("index2");
followRequest.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10));
followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(1));
followRequest.getParameters().setMaxOutstandingReadRequests(128);
followRequest.waitForActiveShards(ActiveShardCount.DEFAULT);

followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen("index2");

AtomicBoolean isRunning = new AtomicBoolean(true);
int numThreads = 4;
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(() -> {
while (isRunning.get()) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}
});
threads[i].start();
}

atLeastDocsIndexed(followerClient(), "index2", 32);
AcknowledgedResponse response = followerClient().admin().indices().close(new CloseIndexRequest("index2")).get();
assertThat(response.isAcknowledged(), is(true));

ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
List<ClusterBlock> blocks = new ArrayList<>(clusterState.getBlocks().indices().get("index2"));
assertThat(blocks.size(), equalTo(1));
assertThat(blocks.get(0).id(), equalTo(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID));

isRunning.set(false);
for (Thread thread : threads) {
thread.join();
}
assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get());

refresh(leaderClient(), "index1");
SearchRequest leaderSearchRequest = new SearchRequest("index1");
leaderSearchRequest.source().trackTotalHits(true);
long leaderIndexDocs = leaderClient().search(leaderSearchRequest).actionGet().getHits().getTotalHits().value;
assertBusy(() -> {
refresh(followerClient(), "index2");
SearchRequest followerSearchRequest = new SearchRequest("index2");
followerSearchRequest.source().trackTotalHits(true);
long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value;
assertThat(followerIndexDocs, equalTo(leaderIndexDocs));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -640,4 +640,23 @@ public void testProcessOnceOnPrimary() throws Exception {
}
}
}

/**
* Test that {@link FollowingEngine#verifyEngineBeforeIndexClosing()} never fails
* whatever the value of the global checkpoint to check is.
*/
public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
final long seqNo = randomIntBetween(0, Integer.MAX_VALUE);
runIndexTest(
seqNo,
Engine.Operation.Origin.PRIMARY,
(followingEngine, index) -> {
globalCheckpoint.set(randomNonNegativeLong());
try {
followingEngine.verifyEngineBeforeIndexClosing();
} catch (final IllegalStateException e) {
fail("Following engine pre-closing verifications failed");
}
});
}
}