Skip to content

Commit

Permalink
Fix concurrency issues between cancelling a relocation and marking sh…
Browse files Browse the repository at this point in the history
…ard as relocated (#20443)

Once a primary is marked as relocated, we can not safely move it back to started (as we have no way of waiting on inflight operations that are performed on the target primary). If the master cancels the relocation in that state, we fail the primary. Sadly, there is a racing condition between the `updateRoutingEntry` method (which is called when the relocation is cancelled by the master) and the `relocated` method. That racing condition can leave the shard as marked "relocated" but have the routing entry not reflect the target relocation. This in turns causes NPEs in TransportReplicationAction:

```
java.util.Objects requireNonNull Objects.java 203
org.elasticsearch.action.support.replication.TransportReplicationAction$ConcreteShardRequest <init> TransportReplicationAction.java 982
```

Sadly, once we end up in this state, we will never recover.

This commit fixes that race condition by making sure `updateRoutingEntry` acquires the mutex when checking for the relocated status.  While at it, I also tightened up the code and added lots of assertions/hard checks.
  • Loading branch information
bleskes committed Sep 13, 2016
1 parent 8751acb commit 1c6e3f5
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 65 deletions.
84 changes: 39 additions & 45 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -116,9 +115,9 @@
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
Expand All @@ -135,7 +134,6 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -368,60 +366,46 @@ public QueryCachingPolicy getQueryCachingPolicy() {
* @throws IOException if shard state could not be persisted
*/
public void updateRoutingEntry(final ShardRouting newRouting) throws IOException {
final ShardRouting currentRouting = this.shardRouting;
if (!newRouting.shardId().equals(shardId())) {
throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
}
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
}
if (currentRouting != null) {
if (!newRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
final ShardRouting currentRouting;
synchronized (mutex) {
currentRouting = this.shardRouting;

if (!newRouting.shardId().equals(shardId())) {
throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
}
// if its the same routing, return
if (currentRouting.equals(newRouting)) {
return;
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
}
if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) {
throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current "
+ currentRouting + ", new " + newRouting);
}
}

if (state == IndexShardState.POST_RECOVERY) {
// if the state is started or relocating (cause it might move right away from started to relocating)
// then move to STARTED
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
// we want to refresh *before* we move to internal STARTED state
try {
getEngine().refresh("cluster_state_started");
} catch (Exception e) {
logger.debug("failed to refresh due to move to cluster wide started", e);
}

boolean movedToStarted = false;
synchronized (mutex) {
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
if (state == IndexShardState.POST_RECOVERY) {
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
movedToStarted = true;
} else {
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
}
}
if (movedToStarted) {
indexEventListener.afterIndexShardStarted(this);
}
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
} else if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
// active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
this.shardRouting = newRouting;
persistMetadata(newRouting, currentRouting);
}

if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
// active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
indexEventListener.afterIndexShardStarted(this);
}
if (newRouting.equals(currentRouting) == false) {
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
}
this.shardRouting = newRouting;
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
persistMetadata(newRouting, currentRouting);
}

/**
Expand Down Expand Up @@ -451,6 +435,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
}

public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation locks are being held here, move state from started to relocated
Expand All @@ -460,6 +445,15 @@ public void relocated(String reason) throws IllegalIndexShardStateException, Int
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
// if the master cancelled the recovery, the target will be removed
// and the recovery will stopped.
// However, it is still possible that we concurrently end up here
// and therefore have to protect we don't mark the shard as relocated when
// its shard routing says otherwise.
if (shardRouting.relocating() == false) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
changeState(IndexShardState.RELOCATED, reason);
}
});
Expand Down
111 changes: 91 additions & 20 deletions core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -109,6 +110,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static java.util.Collections.emptyMap;
Expand All @@ -121,12 +123,27 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;

/**
* Simple unit-test IndexShard related operations.
*/
public class IndexShardTests extends IndexShardTestCase {

public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws IOException {
return ShardStateMetaData.FORMAT.loadLatestState(logger, shardPaths);
}

public static void write(ShardStateMetaData shardStateMetaData,
Path... shardPaths) throws IOException {
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths);
}

public static Engine getEngineFromShard(IndexShard shard) {
return shard.getEngineOrNull();
}

public void testWriteShardState() throws Exception {
try (NodeEnvironment env = newNodeEnvironment()) {
ShardId id = new ShardId("foo", "fooUUID", 1);
Expand Down Expand Up @@ -323,10 +340,10 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
}
case 2: {
// relocation source
indexShard = newStartedShard(false);
indexShard = newStartedShard(true);
ShardRouting routing = indexShard.routingEntry();
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
indexShard.updateRoutingEntry(routing);
indexShard.relocated("test");
break;
Expand Down Expand Up @@ -371,15 +388,6 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
closeShards(indexShard);
}

public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws IOException {
return ShardStateMetaData.FORMAT.loadLatestState(logger, shardPaths);
}

public static void write(ShardStateMetaData shardStateMetaData,
Path... shardPaths) throws IOException {
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths);
}

public void testAcquireIndexCommit() throws IOException {
final IndexShard shard = newStartedShard();
int numDocs = randomInt(20);
Expand Down Expand Up @@ -443,7 +451,6 @@ public void testSnapshotStore() throws IOException {
closeShards(newShard);
}


public void testAsyncFsync() throws InterruptedException, IOException {
IndexShard shard = newStartedShard();
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
Expand Down Expand Up @@ -500,7 +507,6 @@ public void testMinimumCompatVersion() throws IOException {
closeShards(test);
}


public void testShardStats() throws IOException {
IndexShard shard = newStartedShard();
ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(),
Expand Down Expand Up @@ -662,6 +668,7 @@ public void postDelete(Engine.Delete delete, Exception ex) {

public void testLockingBeforeAndAfterRelocated() throws Exception {
final IndexShard shard = newStartedShard(true);
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
CountDownLatch latch = new CountDownLatch(1);
Thread recoveryThread = new Thread(() -> {
latch.countDown();
Expand Down Expand Up @@ -692,6 +699,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception {

public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception {
final IndexShard shard = newStartedShard(true);
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
Thread recoveryThread = new Thread(() -> {
try {
shard.relocated("simulated recovery");
Expand Down Expand Up @@ -725,6 +733,7 @@ public void onResponse(Releasable releasable) {

public void testStressRelocated() throws Exception {
final IndexShard shard = newStartedShard(true);
shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
final int numThreads = randomIntBetween(2, 4);
Thread[] indexThreads = new Thread[numThreads];
CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads);
Expand Down Expand Up @@ -776,6 +785,75 @@ public void run() {
closeShards(shard);
}

public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
shard.relocated("test");
expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting));
closeShards(shard);
}

public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException, InterruptedException {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
shard.updateRoutingEntry(originalRouting);
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test"));
closeShards(shard);
}

public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
AtomicReference<Exception> relocationException = new AtomicReference<>();
Thread relocationThread = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
relocationException.set(e);
}

@Override
protected void doRun() throws Exception {
cyclicBarrier.await();
shard.relocated("test");
}
});
relocationThread.start();
AtomicReference<Exception> cancellingException = new AtomicReference<>();
Thread cancellingThread = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
cancellingException.set(e);
}

@Override
protected void doRun() throws Exception {
cyclicBarrier.await();
shard.updateRoutingEntry(originalRouting);
}
});
cancellingThread.start();
cyclicBarrier.await();
relocationThread.join();
cancellingThread.join();
if (shard.state() == IndexShardState.RELOCATED) {
logger.debug("shard was relocated successfully");
assertThat(cancellingException.get(), instanceOf(IllegalIndexShardStateException.class));
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(true));
assertThat(relocationException.get(), nullValue());
} else {
logger.debug("shard relocation was cancelled");
assertThat(relocationException.get(), instanceOf(IllegalIndexShardStateException.class));
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(false));
assertThat(cancellingException.get(), nullValue());

}
closeShards(shard);
}

public void testRecoverFromStore() throws IOException {
final IndexShard shard = newStartedShard(true);
int translogOps = 1;
Expand Down Expand Up @@ -1033,7 +1111,6 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
closeShards(shard);
}


public void testIndexingOperationListenersIsInvokedOnRecovery() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
Expand Down Expand Up @@ -1086,7 +1163,6 @@ public void postDelete(Engine.Delete delete) {
closeShards(newShard);
}


public void testSearchIsReleaseIfWrapperFails() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
Expand Down Expand Up @@ -1117,7 +1193,6 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
closeShards(newShard);
}


public void testTranslogRecoverySyncsTranslog() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
Expand Down Expand Up @@ -1362,8 +1437,4 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve
public void verify(String verificationToken, DiscoveryNode localNode) {
}
}

public static Engine getEngineFromShard(IndexShard shard) {
return shard.getEngineOrNull();
}
}

0 comments on commit 1c6e3f5

Please sign in to comment.