diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index aff85f10e4bf8..8071d0e4b7d79 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -1334,11 +1334,7 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex // note that if there was no cluster state update between start of the engine of this shard and the call to // initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort. runAfter.run(); - - if (indexSettings.isSoftDeleteEnabled()) { - addPeerRecoveryRetentionLeaseForSolePrimary(); - } - + addPeerRecoveryRetentionLeaseForSolePrimary(); assert invariant(); } diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 2d0856c95ec40..c22d1bc0dd28f 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -20,12 +20,14 @@ package org.elasticsearch.recovery; import com.carrotsearch.hppc.IntHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.procedures.IntProcedure; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.English; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; @@ -45,6 +47,9 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; @@ -77,9 +82,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -88,6 +96,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; @@ -103,6 +113,7 @@ protected Collection> nodePlugins() { @Override protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); + assertActiveCopiesEstablishedPeerRecoveryRetentionLeases(); internalCluster().assertSeqNos(); internalCluster().assertSameDocIdsOnShards(); } @@ -603,6 +614,49 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); } + public void testRelocationEstablishedPeerRecoveryRetentionLeases() throws Exception { + int halfNodes = randomIntBetween(1, 3); + String indexName = "test"; + Settings[] nodeSettings = Stream.concat( + Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), + Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes)).toArray(Settings[]::new); + List nodes = internalCluster().startNodes(nodeSettings); + String[] blueNodes = nodes.subList(0, halfNodes).toArray(String[]::new); + String[] redNodes = nodes.subList(0, halfNodes).toArray(String[]::new); + ensureStableCluster(halfNodes * 2); + assertAcked( + client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, halfNodes - 1)) + .put("index.routing.allocation.include.color", "blue"))); + ensureGreen("test"); + assertBusy(() -> assertAllShardsOnNodes(indexName, blueNodes)); + assertActiveCopiesEstablishedPeerRecoveryRetentionLeases(); + client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.routing.allocation.include.color", "red")).get(); + assertBusy(() -> assertAllShardsOnNodes(indexName, redNodes)); + ensureGreen("test"); + assertActiveCopiesEstablishedPeerRecoveryRetentionLeases(); + } + + private void assertActiveCopiesEstablishedPeerRecoveryRetentionLeases() throws Exception { + assertBusy(() -> { + for (ObjectCursor it : client().admin().cluster().prepareState().get().getState().metaData().indices().keys()) { + Map> byShardId = Stream.of(client().admin().indices().prepareStats(it.value).get().getShards()) + .collect(Collectors.groupingBy(l -> l.getShardRouting().shardId())); + for (List shardStats : byShardId.values()) { + Set expectedLeaseIds = shardStats.stream() + .map(s -> ReplicationTracker.getPeerRecoveryRetentionLeaseId(s.getShardRouting())).collect(Collectors.toSet()); + for (ShardStats shardStat : shardStats) { + Set actualLeaseIds = shardStat.getRetentionLeaseStats().retentionLeases().leases().stream() + .map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(expectedLeaseIds, everyItem(in(actualLeaseIds))); + } + } + } + }); + } + class RecoveryCorruption implements StubbableTransport.SendRequestBehavior { private final CountDownLatch corruptionCount;