Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for prewarm when relocating searchable snapshot shards #65531

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
74eae5c
Wait for Prewarm when Relocating Searchable Snapshot Shards
original-brownbear Nov 26, 2020
e688e74
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Nov 29, 2020
e57db07
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Nov 30, 2020
f489058
add test
original-brownbear Nov 30, 2020
95dadb2
better test
original-brownbear Nov 30, 2020
d5ee3f3
way better test
original-brownbear Nov 30, 2020
fa8dea6
reformat nicer
original-brownbear Nov 30, 2020
eccb1b4
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Nov 30, 2020
a428a8b
start
original-brownbear Nov 30, 2020
472fa1e
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm-o…
original-brownbear Nov 30, 2020
fc01ad4
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm-o…
original-brownbear Dec 1, 2020
48c55f5
bck
original-brownbear Dec 1, 2020
40e18cd
works nicely
original-brownbear Dec 1, 2020
9427179
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm-o…
original-brownbear Dec 2, 2020
ed4e8c9
fixes
original-brownbear Dec 2, 2020
59ab566
fix liveness check disabling
original-brownbear Dec 2, 2020
566884e
fix comment
original-brownbear Dec 2, 2020
3b41f93
much simpler
original-brownbear Dec 2, 2020
43816f5
cs
original-brownbear Dec 2, 2020
714e6c1
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Dec 2, 2020
86e9ebb
adjust broken test
original-brownbear Dec 2, 2020
d44b120
shorter
original-brownbear Dec 2, 2020
ef5032e
Merge remote-tracking branch 'elastic/master' into wait-for-prewarm
original-brownbear Dec 9, 2020
b9063b5
adjust tests
original-brownbear Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 88 additions & 5 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.util.concurrent.RunOnce;
Expand Down Expand Up @@ -376,6 +377,9 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
if (shardRouting.isRelocationTarget()) {
relocationCondition = new RelocationCondition(shardRouting);
}
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -457,6 +461,38 @@ public QueryCachingPolicy getQueryCachingPolicy() {
return cachingPolicy;
}

/**
* A ref counter that can be used to delay primary relocation handoff via {@link #createRelocationDependency()}.
*/
private final class RelocationCondition extends AbstractRefCounted {

private Runnable asyncActivation;

RelocationCondition(ShardRouting routing) {
super("relocation condition for [" + routing.shardId() + "][" + routing.allocationId() + "]");
}

@Override
protected void closeInternal() {
synchronized (this) {
if (asyncActivation != null) {
threadPool.generic().execute(asyncActivation);
}
}
}

// Set the relocation context when receiving it and execute the handoff right away if no more conditions are waiting or create a
// Runnable to execute once all conditions have finished
void receivePrimaryContext(ReplicationTracker.PrimaryContext primaryContext, ActionListener<Void> listener) {
synchronized (this) {
if (decRef()) {
doActivateWithPrimaryContext(primaryContext, listener);
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
} else {
asyncActivation = () -> doActivateWithPrimaryContext(primaryContext, listener);
}
}
}
}

@Override
public void updateShardState(final ShardRouting newRouting,
Expand Down Expand Up @@ -604,6 +640,14 @@ public void onFailure(Exception e) {
}, null);
}
}
if (newRouting.isRelocationTarget()) {
if (currentRouting.isRelocationTarget() == false) {
assert relocationCondition == null : "Found relocation condition even though there shouldn't be one";
relocationCondition = new RelocationCondition(newRouting);
}
} else {
relocationCondition = null;
}
// set this last, once we finished updating all internal state.
this.shardRouting = newRouting;

Expand Down Expand Up @@ -2409,23 +2453,62 @@ assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.ST
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason);
}

private RelocationCondition relocationCondition;

/**
* Creates a {@link Runnable} that must be executed before primary relocation to this shard can complete by a call to
* {@link #activateThrottling()}.
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
*
* @return listener that must be resolved before primary relocation to this shard can complete
*/
public Runnable createRelocationDependency() {
assert assertRelocationTarget();
logger.trace("adding relocation condition for [{}]", shardRouting);
final RelocationCondition condition;
synchronized (mutex) {
condition = this.relocationCondition;
}
condition.incRef();
return condition::decRef;
}

/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
* @param primaryContext the sequence number context
*/
public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
assert shardRouting.primary() && shardRouting.isRelocationTarget() :
"only primary relocation target can update allocation IDs from primary context: " + shardRouting;
public void activateWithPrimaryContext(ReplicationTracker.PrimaryContext primaryContext, ActionListener<Void> listener) {
assert assertRelocationTarget();
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) :
"primary context [" + primaryContext + "] does not contain relocation target [" + routingEntry() + "]";
assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId())
.getLocalCheckpoint() || indexSettings().getTranslogDurability() == Translog.Durability.ASYNC :
"local checkpoint [" + getLocalCheckpoint() + "] does not match checkpoint from primary context [" + primaryContext + "]";
final RelocationCondition condition;
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
condition = relocationCondition;
}
ensurePeerRecoveryRetentionLeasesExist();
condition.receivePrimaryContext(primaryContext, listener);
}

private void doActivateWithPrimaryContext(ReplicationTracker.PrimaryContext primaryContext, ActionListener<Void> listener) {
try {
synchronized (mutex) {
// make changes to primaryMode flag only under mutex
replicationTracker.activateWithPrimaryContext(primaryContext);
}
ensurePeerRecoveryRetentionLeasesExist();
} catch (Exception e) {
listener.onFailure(e);
return;
}
listener.onResponse(null);
}

private boolean assertRelocationTarget() {
assert shardRouting.primary() && shardRouting.isRelocationTarget() :
"only primary relocation target can update allocation IDs from primary context: " + shardRouting;
return true;
}

private void ensurePeerRecoveryRetentionLeasesExist() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -68,6 +70,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

Expand Down Expand Up @@ -312,16 +316,23 @@ class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<Rec
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel,
Task task) throws Exception {
final RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId());
final List<Releasable> toRelease = new ArrayList<>(2);
toRelease.add(recoveryRef::close);
boolean success = false;
try {
// Due to relocation conditions on the shard it could take a while for the hand-off to complete so we disable the recovery
// monitor since we don't expect any transport messages from master for the duration of the handoff and activate it again
// after the handoff.
final Releasable disabledMonitor = recoveryRef.target().disableRecoveryMonitor();
Copy link
Member Author

@original-brownbear original-brownbear Nov 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a BwC issue I guess. If the hand-off request comes from 7.10 and doesn't wait indefinitely yet then it could timeout on the primary I guess but maybe we can just ignore it since it's so fringe?

toRelease.add(disabledMonitor);
recoveryRef.target().handoffPrimaryContext(request.primaryContext(),
ActionListener.runBefore(ActionListener.map(
new ChannelActionListener<>(channel, Actions.HANDOFF_PRIMARY_CONTEXT, request),
v -> TransportResponse.Empty.INSTANCE), recoveryRef::close));
v -> TransportResponse.Empty.INSTANCE), () -> Releasables.close(toRelease)));
success = true;
} finally {
if (success == false) {
recoveryRef.close();
Releasables.close(toRelease);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,20 @@ protected void doRun() throws Exception {
logger.trace("[monitor] no status found for [{}], shutting down", recoveryId);
return;
}
long accessTime = status.lastAccessTime();
if (accessTime == lastSeenAccessTime) {
String message = "no activity after [" + checkInterval + "]";
failRecovery(recoveryId,
new RecoveryFailedException(status.state(), message, new ElasticsearchTimeoutException(message)),
true // to be safe, we don't know what go stuck
);
return;
if (status.isRecoveryMonitorEnabled()) {
long accessTime = status.lastAccessTime();
if (accessTime == lastSeenAccessTime) {
String message = "no activity after [" + checkInterval + "]";
failRecovery(recoveryId,
new RecoveryFailedException(status.state(), message, new ElasticsearchTimeoutException(message)),
true // to be safe, we don't know what go stuck
);
return;
}
lastSeenAccessTime = accessTime;
} else {
lastSeenAccessTime = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be simpler to just fake the progress inside RecoveryTarget by returning System.nanoTime()?

}
lastSeenAccessTime = accessTime;
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime);
threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CancellableThreads;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();

private volatile boolean recoveryMonitorEnabled = true;

// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -161,6 +164,26 @@ public void setLastAccessTime() {
lastAccessTime = System.nanoTime();
}

/**
* Set flag to signal to {@link org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryMonitor} that it must not cancel this
* recovery temporarily. This is used by the primary relocation mechanism to avoid recovery failure in case a long running relocation
* condition was added to the shard via {@link IndexShard#createRelocationDependency()}.
*
* @return releasable that once closed will re-enable liveness checks by the recovery monitor
*/
public Releasable disableRecoveryMonitor() {
assert recoveryMonitorEnabled : "recovery monitor already disabled";
recoveryMonitorEnabled = false;
return () -> {
setLastAccessTime();
recoveryMonitorEnabled = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little low-tech relative to the tricky ref-counting in the IndexShard. I figured this was ok here since the hand-off request only comes in once (at least judging by the assertions we have in IndexShard) while the other API has a more "feel" to it and there are no hard guarantees on the index shard state listener only being invoked once (though the "loaded" flag on the directory effectively guarantees we only add one condition for now) and it wasn't that much extra effort since the API was supposed to be non-blocking anyway.

};
}

public boolean isRecoveryMonitorEnabled() {
return recoveryMonitorEnabled;
}

public Store store() {
ensureRefCount();
return store;
Expand Down Expand Up @@ -332,10 +355,7 @@ private boolean hasUncommittedOperations() throws IOException {

@Override
public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
indexShard.activateWithPrimaryContext(primaryContext);
return null;
});
indexShard.activateWithPrimaryContext(primaryContext, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ protected final boolean assertCurrentThreadMayLoadSnapshot() {
*
* @return true if the snapshot was loaded by executing this method, false otherwise
*/
public boolean loadSnapshot(RecoveryState recoveryState) {
public boolean loadSnapshot(RecoveryState recoveryState, ActionListener<Void> preWarmListener) {
assert recoveryState != null;
assert recoveryState instanceof SearchableSnapshotRecoveryState;
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT
Expand All @@ -214,7 +214,7 @@ public boolean loadSnapshot(RecoveryState recoveryState) {
this.loaded = true;
cleanExistingRegularShardFiles();
this.recoveryState = (SearchableSnapshotRecoveryState) recoveryState;
prewarmCache();
prewarmCache(preWarmListener);
}
}
}
Expand Down Expand Up @@ -414,19 +414,20 @@ private void cleanExistingRegularShardFiles() {
}
}

private void prewarmCache() {
private void prewarmCache(ActionListener<Void> listener) {
if (prewarmCache == false) {
recoveryState.setPreWarmComplete();
listener.onResponse(null);
return;
}

final BlockingQueue<Tuple<ActionListener<Void>, CheckedRunnable<Exception>>> queue = new LinkedBlockingQueue<>();
final Executor executor = prewarmExecutor();

final GroupedActionListener<Void> completionListener = new GroupedActionListener<>(
ActionListener.wrap(voids -> recoveryState.setPreWarmComplete(), e -> {}), // Ignore pre-warm errors
snapshot().totalFileCount()
);
final GroupedActionListener<Void> completionListener = new GroupedActionListener<>(ActionListener.wrap(voids -> {
recoveryState.setPreWarmComplete();
listener.onResponse(null);
}, listener::onFailure), snapshot().totalFileCount());

for (BlobStoreIndexShardSnapshot.FileInfo file : snapshot().indexFiles()) {
if (file.metadata().hashEqualsContents() || isExcludedFromCache(file.physicalName())) {
Expand All @@ -448,11 +449,11 @@ private void prewarmCache() {
fileCompletionListener.whenComplete(voids -> input.close(), e -> IOUtils.closeWhileHandlingException(input));
fileCompletionListener.whenComplete(voids -> completionListener.onResponse(null), completionListener::onFailure);

final GroupedActionListener<Void> listener = new GroupedActionListener<>(fileCompletionListener, numberOfParts);
final GroupedActionListener<Void> partsListener = new GroupedActionListener<>(fileCompletionListener, numberOfParts);

for (int p = 0; p < numberOfParts; p++) {
final int part = p;
queue.add(Tuple.tuple(listener, () -> {
queue.add(Tuple.tuple(partsListener, () -> {
ensureOpen();

logger.trace("{} warming cache for [{}] part [{}/{}]", shardId, file.physicalName(), part + 1, numberOfParts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
*/
package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
Expand All @@ -23,6 +28,8 @@

public class SearchableSnapshotIndexEventListener implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class);

@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
Expand All @@ -33,8 +40,23 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS
private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory());
assert directory != null;

final boolean success = directory.loadSnapshot(indexShard.recoveryState());
final StepListener<Void> preWarmListener = new StepListener<>();
final boolean success = directory.loadSnapshot(indexShard.recoveryState(), preWarmListener);
final ShardRouting shardRouting = indexShard.routingEntry();
if (success && shardRouting.isRelocationTarget()) {
final Runnable preWarmCondition = indexShard.createRelocationDependency();
preWarmListener.whenComplete(v -> preWarmCondition.run(), e -> {
logger.warn(
new ParameterizedMessage(
"pre-warm operation failed for [{}] while it was the target of primary relocation [{}]",
shardRouting.shardId(),
shardRouting
),
e
);
preWarmCondition.run();
});
}
assert directory.listAll().length > 0 : "expecting directory listing to be non-empty";
assert success
|| indexShard.routingEntry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -654,7 +655,9 @@ protected IndexInputStats createIndexInputStats(long fileLength) {
);
DiscoveryNode targetNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
RecoveryState recoveryState = new SearchableSnapshotRecoveryState(shardRouting, targetNode, null);
final boolean loaded = directory.loadSnapshot(recoveryState);
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final boolean loaded = directory.loadSnapshot(recoveryState, future);
future.get();
assertThat("Failed to load snapshot", loaded, is(true));
assertThat("Snapshot should be loaded", directory.snapshot(), notNullValue());
assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue());
Expand Down
Loading