Skip to content

Commit

Permalink
Wait for Prewarm when Relocating Searchable Snapshot Shards (#65531)
Browse files Browse the repository at this point in the history
Add hooks to enable waiting for a condition before completing the clean files step for relocating searchable snapshot shards and use them to wait for pre-warm before responding to the clean files request.
  • Loading branch information
original-brownbear authored Dec 9, 2020
1 parent bc989e4 commit e189a20
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -974,11 +974,17 @@ public void testDisconnectsWhileRecovering() throws Exception {
final CountDownLatch requestFailed = new CountDownLatch(1);

if (randomBoolean()) {
final StubbableTransport.SendRequestBehavior sendRequestBehavior = (connection, requestId, action, request, options) -> {
if (recoveryActionToBlock.equals(action) || requestFailed.getCount() == 0) {
requestFailed.countDown();
logger.info("--> preventing {} request by throwing ConnectTransportException", action);
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
connection.sendRequest(requestId, action, request, options);
};
// Fail on the sending side
blueMockTransportService.addSendBehavior(redTransportService,
new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestFailed));
redMockTransportService.addSendBehavior(blueTransportService,
new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestFailed));
blueMockTransportService.addSendBehavior(redTransportService, sendRequestBehavior);
redMockTransportService.addSendBehavior(blueTransportService, sendRequestBehavior);
} else {
// Fail on the receiving side.
blueMockTransportService.addRequestHandlingBehavior(recoveryActionToBlock, (handler, request, channel, task) -> {
Expand Down Expand Up @@ -1013,34 +1019,6 @@ public void testDisconnectsWhileRecovering() throws Exception {
assertHitCount(searchResponse, numDocs);
}

private class RecoveryActionBlocker implements StubbableTransport.SendRequestBehavior {
private final boolean dropRequests;
private final String recoveryActionToBlock;
private final CountDownLatch requestBlocked;

RecoveryActionBlocker(boolean dropRequests, String recoveryActionToBlock, CountDownLatch requestBlocked) {
this.dropRequests = dropRequests;
this.recoveryActionToBlock = recoveryActionToBlock;
this.requestBlocked = requestBlocked;
}

@Override
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (recoveryActionToBlock.equals(action) || requestBlocked.getCount() == 0) {
requestBlocked.countDown();
if (dropRequests) {
logger.info("--> preventing {} request by dropping request", action);
return;
} else {
logger.info("--> preventing {} request by throwing ConnectTransportException", action);
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
}
connection.sendRequest(requestId, action, request, options);
}
}

/**
* Tests scenario where recovery target successfully sends recovery request to source but then the channel gets closed while
* the source is working on the recovery process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -174,9 +176,11 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -462,7 +466,6 @@ public QueryCachingPolicy getQueryCachingPolicy() {
return cachingPolicy;
}


@Override
public void updateShardState(final ShardRouting newRouting,
final long newPrimaryTerm,
Expand Down Expand Up @@ -2449,6 +2452,57 @@ assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.ST
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason);
}

private final AtomicInteger outstandingCleanFilesConditions = new AtomicInteger(0);

private final Deque<Runnable> afterCleanFilesActions = new LinkedList<>();

/**
* Creates a {@link Runnable} that must be executed before the clean files step in peer recovery can complete.
*
* @return runnable that must be executed during the clean files step in peer recovery
*/
public Runnable addCleanFilesDependency() {
logger.trace("adding clean files dependency for [{}]", shardRouting);
outstandingCleanFilesConditions.incrementAndGet();
return () -> {
if (outstandingCleanFilesConditions.decrementAndGet() == 0) {
runAfterCleanFilesActions();
}
};
}

/**
* Execute a {@link Runnable} on the generic pool once all dependencies added via {@link #addCleanFilesDependency()} have finished.
* If there are no dependencies to wait for then the {@code Runnable} will be executed on the calling thread.
*/
public void afterCleanFiles(Runnable runnable) {
if (outstandingCleanFilesConditions.get() == 0) {
runnable.run();
} else {
synchronized (afterCleanFilesActions) {
afterCleanFilesActions.add(runnable);
}
if (outstandingCleanFilesConditions.get() == 0) {
runAfterCleanFilesActions();
}
}
}

// for tests
public int outstandingCleanFilesConditions() {
return outstandingCleanFilesConditions.get();
}

private void runAfterCleanFilesActions() {
synchronized (afterCleanFilesActions) {
final Executor executor = threadPool.generic();
Runnable afterCleanFilesAction;
while ((afterCleanFilesAction = afterCleanFilesActions.poll()) != null) {
executor.execute(afterCleanFilesAction);
}
}
}

/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -432,7 +433,13 @@ public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel
}

recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(),
listener);
ActionListener.delegateFailure(listener, (r, l) -> {
Releasable reenableMonitor = recoveryRef.target().disableRecoveryMonitor();
recoveryRef.target().indexShard().afterCleanFiles(() -> {
reenableMonitor.close();
listener.onResponse(null);
});
}));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CancellableThreads;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();

private volatile boolean recoveryMonitorEnabled = true;

// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -153,14 +156,33 @@ public CancellableThreads cancellableThreads() {

/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
return lastAccessTime;
if (recoveryMonitorEnabled) {
return lastAccessTime;
}
return System.nanoTime();
}

/** sets the lasAccessTime flag to now */
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}

/**
* Set flag to signal to {@link org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryMonitor} that it must not cancel this
* recovery temporarily. This is used by the recovery clean files step to avoid recovery failure in case a long running condition was
* added to the shard via {@link IndexShard#addCleanFilesDependency()}.
*
* @return releasable that once closed will re-enable liveness checks by the recovery monitor
*/
public Releasable disableRecoveryMonitor() {
assert recoveryMonitorEnabled : "recovery monitor already disabled";
recoveryMonitorEnabled = false;
return () -> {
setLastAccessTime();
recoveryMonitorEnabled = true;
};
}

public Store store() {
ensureRefCount();
return store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
final RecoveryCleanFilesRequest request =
new RecoveryCleanFilesRequest(recoveryId, requestSeqNo, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
executeRetryableAction(action, request, standardTimeoutRequestOptions, listener.map(r -> null), reader);
final ActionListener<TransportResponse.Empty> responseListener = listener.map(r -> null);
executeRetryableAction(action, request, TransportRequestOptions.EMPTY, responseListener, reader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchableSnapshotsRelocationIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(LocalStateSearchableSnapshots.class, MockRepository.Plugin.class);
}

public void testRelocationWaitsForPreWarm() throws Exception {
internalCluster().startMasterOnlyNode();
final String firstDataNode = internalCluster().startDataOnlyNode();
final String index = "test-idx";
createIndexWithContent(index, indexSettingsNoReplicas(1).build());
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String snapshotName = "test-snapshot";
createSnapshot(repoName, snapshotName, List.of(index));
assertAcked(client().admin().indices().prepareDelete(index));
final String restoredIndex = mountSnapshot(repoName, snapshotName, index, Settings.EMPTY);
ensureGreen(restoredIndex);
final String secondDataNode = internalCluster().startDataOnlyNode();

final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, secondDataNode);
final int preWarmThreads = threadPool.info(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME).getMax();
final Executor executor = threadPool.executor(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME);
final CyclicBarrier barrier = new CyclicBarrier(preWarmThreads + 1);
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < preWarmThreads; i++) {
executor.execute(() -> {
try {
barrier.await();
latch.await();
} catch (Exception e) {
throw new AssertionError(e);
}
});
}
logger.info("--> waiting for prewarm threads to all become blocked");
barrier.await();

logger.info("--> force index [{}] to relocate to [{}]", index, secondDataNode);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(restoredIndex)
.setSettings(
Settings.builder()
.put(
IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(),
secondDataNode
)
)
);
assertBusy(() -> {
final List<RecoveryState> recoveryStates = getActiveRestores(restoredIndex);
assertThat(recoveryStates, Matchers.hasSize(1));
final RecoveryState shardRecoveryState = recoveryStates.get(0);
assertEquals(firstDataNode, shardRecoveryState.getSourceNode().getName());
assertEquals(secondDataNode, shardRecoveryState.getTargetNode().getName());
});

assertBusy(() -> assertSame(RecoveryState.Stage.TRANSLOG, getActiveRestores(restoredIndex).get(0).getStage()));
final Index restoredIdx = clusterAdmin().prepareState().get().getState().metadata().index(restoredIndex).getIndex();
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, secondDataNode);
assertEquals(1, indicesService.indexService(restoredIdx).getShard(0).outstandingCleanFilesConditions());
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final String primaryNodeId = state.routingTable().index(restoredIndex).shard(0).primaryShard().currentNodeId();
final DiscoveryNode primaryNode = state.nodes().resolveNode(primaryNodeId);
assertEquals(firstDataNode, primaryNode.getName());

logger.info("--> unblocking prewarm threads");
latch.countDown();

assertFalse(
client().admin()
.cluster()
.prepareHealth(restoredIndex)
.setWaitForNoRelocatingShards(true)
.setWaitForEvents(Priority.LANGUID)
.get()
.isTimedOut()
);
assertBusy(() -> assertThat(getActiveRestores(restoredIndex), Matchers.empty()));
}

private static List<RecoveryState> getActiveRestores(String restoredIndex) {
return client().admin()
.indices()
.prepareRecoveries(restoredIndex)
.setDetailed(true)
.setActiveOnly(true)
.get()
.shardRecoveryStates()
.get(restoredIndex);
}
}
Loading

0 comments on commit e189a20

Please sign in to comment.