Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use retention lease in peer recovery of closed indices #48430

Merged
merged 16 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ public void onFailure(Exception e) {
}
}

private IndexShard getIndexShard(final ShardId shardId) {
protected IndexShard getIndexShard(final ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,10 +827,7 @@ private boolean invariant() {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}

if (primaryMode
&& indexSettings.isSoftDeleteEnabled()
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases) {
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
Expand Down Expand Up @@ -898,7 +895,9 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
Expand Down Expand Up @@ -1011,34 +1010,32 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() {
assert primaryMode;
assert Thread.holdsLock(this);

if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
hasAllPeerRecoveryRetentionLeases = true;
}
}

Expand Down Expand Up @@ -1356,10 +1353,7 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() {
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
*/
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (indexSettings().isSoftDeleteEnabled()
&& indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases == false) {

if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -40,8 +41,13 @@
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -88,6 +94,46 @@ public RetentionLeaseSyncAction(
ThreadPool.Names.MANAGEMENT, false);
}

@Override
protected void doExecute(Task parentTask, Request request, ActionListener<Response> listener) {
// Skip reroute phase as we are on the primary shard already.
final IndexShard indexShard = getIndexShard(request.shardId());
final ShardRouting shardRouting = indexShard.routingEntry();
if (shardRouting.primary() == false) {
throw new ShardNotInPrimaryModeException(indexShard.shardId(), indexShard.state());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != request.retentionLeases.primaryTerm()) {
throw new ShardNotFoundException(
request.shardId(), "expected primary term [{}] but found [{}]", request.retentionLeases.primaryTerm(), actualTerm);
}
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
new ConcreteShardRequest<>(request, shardRouting.allocationId().getId(), actualTerm),
parentTask,
transportOptions,
new TransportResponseHandler<Response>() {
@Override
public Response read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
}

@Override
protected void shardOperationOnPrimary(Request request, IndexShard primary,
ActionListener<PrimaryResult<Request, Response>> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedSupplier;
Expand Down Expand Up @@ -154,8 +153,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
};

final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE;
final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled();
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();

runUnderPrimaryPermit(() -> {
Expand All @@ -167,7 +165,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get(
retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get(
ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null);
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger);
Expand All @@ -178,15 +176,15 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& isTargetSameHistory()
&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
&& (useRetentionLeases == false
&& (softDeletesEnabled == false
|| (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,
// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's
// possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
// without having a complete history.

if (isSequenceNumberBasedRecovery && useRetentionLeases) {
if (isSequenceNumberBasedRecovery && softDeletesEnabled) {
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
retentionLock.close();
logger.trace("history is retained by {}", retentionLeaseRef.get());
Expand Down Expand Up @@ -225,7 +223,7 @@ && isTargetSameHistory()
// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
// down.
startingSeqNo = useRetentionLeases
startingSeqNo = softDeletesEnabled
? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
: 0;
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
Expand All @@ -243,7 +241,7 @@ && isTargetSameHistory()
});

final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
if (useRetentionLeases) {
if (softDeletesEnabled) {
runUnderPrimaryPermit(() -> {
try {
// If the target previously had a copy of this shard then a file-based recovery might move its global
Expand All @@ -266,7 +264,7 @@ && isTargetSameHistory()
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");

final Consumer<ActionListener<RetentionLease>> createRetentionLeaseAsync;
if (useRetentionLeases) {
if (softDeletesEnabled) {
createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l);
} else {
createRetentionLeaseAsync = l -> l.onResponse(null);
Expand Down Expand Up @@ -304,7 +302,7 @@ && isTargetSameHistory()
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);

if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) {
if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) {
// we can release the retention lock here because the snapshot itself will retain the required operations.
retentionLock.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,40 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception {
transportService.clearAllRules();
}

public void testPeerRecoveryForClosedIndices() throws Exception {
String indexName = "peer_recovery_closed_indices";
internalCluster().ensureAtLeastNumDataNodes(1);
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 100))
.mapToObj(n -> client().prepareIndex(indexName).setSource("num", n)).collect(Collectors.toList()));
ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
int numberOfReplicas = randomIntBetween(1, 2);
internalCluster().ensureAtLeastNumDataNodes(2 + numberOfReplicas);
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)));
ensureGreen(indexName);
ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build()));
internalCluster().fullRestart();
ensureYellow(indexName);
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareOpen(indexName));
client().admin().indices().prepareForceMerge(indexName).get();
}
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build()));
ensureGreen(indexName);
assertNoOpRecoveries(indexName);
}

private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception {
assertBusy(() -> {
Index index = resolveIndex(indexName);
Expand Down