Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for prewarm when relocating searchable snapshot shards #65531

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
74eae5c
Wait for Prewarm when Relocating Searchable Snapshot Shards
original-brownbear Nov 26, 2020
e688e74
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Nov 29, 2020
e57db07
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Nov 30, 2020
f489058
add test
original-brownbear Nov 30, 2020
95dadb2
better test
original-brownbear Nov 30, 2020
d5ee3f3
way better test
original-brownbear Nov 30, 2020
fa8dea6
reformat nicer
original-brownbear Nov 30, 2020
eccb1b4
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Nov 30, 2020
a428a8b
start
original-brownbear Nov 30, 2020
472fa1e
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm-o…
original-brownbear Nov 30, 2020
fc01ad4
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm-o…
original-brownbear Dec 1, 2020
48c55f5
bck
original-brownbear Dec 1, 2020
40e18cd
works nicely
original-brownbear Dec 1, 2020
9427179
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm-o…
original-brownbear Dec 2, 2020
ed4e8c9
fixes
original-brownbear Dec 2, 2020
59ab566
fix liveness check disabling
original-brownbear Dec 2, 2020
566884e
fix comment
original-brownbear Dec 2, 2020
3b41f93
much simpler
original-brownbear Dec 2, 2020
43816f5
cs
original-brownbear Dec 2, 2020
714e6c1
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Dec 2, 2020
86e9ebb
adjust broken test
original-brownbear Dec 2, 2020
d44b120
shorter
original-brownbear Dec 2, 2020
ef5032e
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Dec 9, 2020
b9063b5
adjust tests
original-brownbear Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -974,11 +974,17 @@ public void testDisconnectsWhileRecovering() throws Exception {
final CountDownLatch requestFailed = new CountDownLatch(1);

if (randomBoolean()) {
final StubbableTransport.SendRequestBehavior sendRequestBehavior = (connection, requestId, action, request, options) -> {
if (recoveryActionToBlock.equals(action) || requestFailed.getCount() == 0) {
requestFailed.countDown();
logger.info("--> preventing {} request by throwing ConnectTransportException", action);
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
connection.sendRequest(requestId, action, request, options);
};
// Fail on the sending side
blueMockTransportService.addSendBehavior(redTransportService,
new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestFailed));
redMockTransportService.addSendBehavior(blueTransportService,
new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestFailed));
blueMockTransportService.addSendBehavior(redTransportService, sendRequestBehavior);
redMockTransportService.addSendBehavior(blueTransportService, sendRequestBehavior);
} else {
// Fail on the receiving side.
blueMockTransportService.addRequestHandlingBehavior(recoveryActionToBlock, (handler, request, channel, task) -> {
Expand Down Expand Up @@ -1013,34 +1019,6 @@ public void testDisconnectsWhileRecovering() throws Exception {
assertHitCount(searchResponse, numDocs);
}

private class RecoveryActionBlocker implements StubbableTransport.SendRequestBehavior {
private final boolean dropRequests;
private final String recoveryActionToBlock;
private final CountDownLatch requestBlocked;

RecoveryActionBlocker(boolean dropRequests, String recoveryActionToBlock, CountDownLatch requestBlocked) {
this.dropRequests = dropRequests;
this.recoveryActionToBlock = recoveryActionToBlock;
this.requestBlocked = requestBlocked;
}

@Override
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (recoveryActionToBlock.equals(action) || requestBlocked.getCount() == 0) {
requestBlocked.countDown();
if (dropRequests) {
logger.info("--> preventing {} request by dropping request", action);
return;
} else {
logger.info("--> preventing {} request by throwing ConnectTransportException", action);
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
}
connection.sendRequest(requestId, action, request, options);
}
}

/**
* Tests scenario where recovery target successfully sends recovery request to source but then the channel gets closed while
* the source is working on the recovery process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -174,9 +176,11 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -462,7 +466,6 @@ public QueryCachingPolicy getQueryCachingPolicy() {
return cachingPolicy;
}


@Override
public void updateShardState(final ShardRouting newRouting,
final long newPrimaryTerm,
Expand Down Expand Up @@ -2452,6 +2455,52 @@ assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.ST
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason);
}

private final AtomicInteger outstandingCleanFilesConditions = new AtomicInteger(0);

private final Deque<Runnable> afterCleanFilesActions = new LinkedList<>();

/**
* Creates a {@link Runnable} that must be executed before the clean files step in peer recovery can complete.
*
* @return runnable that must be executed during the clean files step in peer recovery
*/
public Runnable addCleanFilesDependency() {
logger.trace("adding clean files dependency for [{}]", shardRouting);
outstandingCleanFilesConditions.incrementAndGet();
return () -> {
if (outstandingCleanFilesConditions.decrementAndGet() == 0) {
runAfterCleanFilesActions();
}
};
}

/**
* Execute a {@link Runnable} on the generic pool once all dependencies added via {@link #addCleanFilesDependency()} have finished.
* If there are no dependencies to wait for then the {@code Runnable} will be executed on the calling thread.
*/
public void afterCleanFiles(Runnable runnable) {
if (outstandingCleanFilesConditions.get() == 0) {
runnable.run();
} else {
synchronized (afterCleanFilesActions) {
afterCleanFilesActions.add(runnable);
}
if (outstandingCleanFilesConditions.get() == 0) {
runAfterCleanFilesActions();
}
}
}

private void runAfterCleanFilesActions() {
synchronized (afterCleanFilesActions) {
final Executor executor = threadPool.generic();
Runnable afterCleanFilesAction;
while ((afterCleanFilesAction = afterCleanFilesActions.poll()) != null) {
executor.execute(afterCleanFilesAction);
}
}
}

/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -432,7 +433,13 @@ public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel
}

recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(),
listener);
ActionListener.delegateFailure(listener, (r, l) -> {
Releasable reenableMonitor = recoveryRef.target().disableRecoveryMonitor();
recoveryRef.target().indexShard().afterCleanFiles(() -> {
reenableMonitor.close();
listener.onResponse(null);
});
}));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CancellableThreads;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();

private volatile boolean recoveryMonitorEnabled = true;

// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -153,14 +156,33 @@ public CancellableThreads cancellableThreads() {

/** return the last time this RecoveryStatus was used (based on System.nanoTime() */
public long lastAccessTime() {
return lastAccessTime;
if (recoveryMonitorEnabled) {
return lastAccessTime;
}
return System.nanoTime();
}

/** sets the lasAccessTime flag to now */
public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}

/**
* Set flag to signal to {@link org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryMonitor} that it must not cancel this
* recovery temporarily. This is used by the recovery clean files step to avoid recovery failure in case a long running condition was
* added to the shard via {@link IndexShard#addCleanFilesDependency()}.
*
* @return releasable that once closed will re-enable liveness checks by the recovery monitor
*/
public Releasable disableRecoveryMonitor() {
assert recoveryMonitorEnabled : "recovery monitor already disabled";
recoveryMonitorEnabled = false;
return () -> {
setLastAccessTime();
recoveryMonitorEnabled = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little low-tech relative to the tricky ref-counting in the IndexShard. I figured this was ok here since the hand-off request only comes in once (at least judging by the assertions we have in IndexShard) while the other API has a more "feel" to it and there are no hard guarantees on the index shard state listener only being invoked once (though the "loaded" flag on the directory effectively guarantees we only add one condition for now) and it wasn't that much extra effort since the API was supposed to be non-blocking anyway.

};
}

public Store store() {
ensureRefCount();
return store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
final RecoveryCleanFilesRequest request =
new RecoveryCleanFilesRequest(recoveryId, requestSeqNo, shardId, sourceMetadata, totalTranslogOps, globalCheckpoint);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
executeRetryableAction(action, request, standardTimeoutRequestOptions, listener.map(r -> null), reader);
final ActionListener<TransportResponse.Empty> responseListener = listener.map(r -> null);
executeRetryableAction(action, request, TransportRequestOptions.EMPTY, responseListener, reader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchableSnapshotsRelocationIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(LocalStateSearchableSnapshots.class, MockRepository.Plugin.class);
}

public void testRelocationWaitsForPreWarm() throws Exception {
internalCluster().startMasterOnlyNode();
final String firstDataNode = internalCluster().startDataOnlyNode();
final String index = "test-idx";
createIndexWithContent(index, indexSettingsNoReplicas(1).build());
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String snapshotName = "test-snapshot";
createSnapshot(repoName, snapshotName, List.of(index));
assertAcked(client().admin().indices().prepareDelete(index));
final String restoredIndex = mountSnapshot(repoName, snapshotName, index, Settings.EMPTY);
ensureGreen(restoredIndex);
final String secondDataNode = internalCluster().startDataOnlyNode();

final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, secondDataNode);
final int preWarmThreads = threadPool.info(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME).getMax();
final Executor executor = threadPool.executor(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME);
final CyclicBarrier barrier = new CyclicBarrier(preWarmThreads + 1);
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < preWarmThreads; i++) {
executor.execute(() -> {
try {
barrier.await();
latch.await();
} catch (Exception e) {
throw new AssertionError(e);
}
});
}
logger.info("--> waiting for prewarm threads to all become blocked");
barrier.await();

logger.info("--> force index [{}] to relocate to [{}]", index, secondDataNode);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(restoredIndex)
.setSettings(
Settings.builder()
.put(
IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(),
secondDataNode
)
)
);
assertBusy(() -> {
final List<RecoveryState> recoveryStates = getActiveRestores(restoredIndex);
assertThat(recoveryStates, Matchers.hasSize(1));
final RecoveryState shardRecoveryState = recoveryStates.get(0);
assertEquals(firstDataNode, shardRecoveryState.getSourceNode().getName());
assertEquals(secondDataNode, shardRecoveryState.getTargetNode().getName());
});

logger.info("--> sleep for 5s to ensure we are actually stuck at the FINALIZE stage and that the primary has not yet relocated");
TimeUnit.SECONDS.sleep(5L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead find the shard using internalCluster().getInstance(IndicesService.class, node) and assertBusy that it has a pending after cleanup action?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ thanks, you actually prevented a likely test failure here as well :) I moved the check for translog stage to a busy assert and then added the check for one clean files condition after. Otherwise we'd only have had 5s to arrive at TRANSLOG now at least we have 10 which should be a little safer.

final RecoveryState recoveryState = getActiveRestores(restoredIndex).get(0);
assertSame(RecoveryState.Stage.TRANSLOG, recoveryState.getStage());
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final String primaryNodeId = state.routingTable().index(restoredIndex).shard(0).primaryShard().currentNodeId();
final DiscoveryNode primaryNode = state.nodes().resolveNode(primaryNodeId);
assertEquals(firstDataNode, primaryNode.getName());

logger.info("--> unblocking prewarm threads");
latch.countDown();

assertFalse(
client().admin()
.cluster()
.prepareHealth(restoredIndex)
.setWaitForNoRelocatingShards(true)
.setWaitForEvents(Priority.LANGUID)
.get()
.isTimedOut()
);
assertBusy(() -> assertThat(getActiveRestores(restoredIndex), Matchers.empty()));
}

private static List<RecoveryState> getActiveRestores(String restoredIndex) {
return client().admin()
.indices()
.prepareRecoveries(restoredIndex)
.setDetailed(true)
.setActiveOnly(true)
.get()
.shardRecoveryStates()
.get(restoredIndex);
}
}
Loading