Skip to content

Commit

Permalink
Fix interruption of markAllocationIdAsInSync (#100610)
Browse files Browse the repository at this point in the history
`IndexShard#markAllocationIdAsInSync` is interruptible because it may
block the thread on a monitor waiting for the local checkpoint to
advance, but we lost the ability to interrupt it on a recovery
cancellation in #95270.

Closes #96578
Closes #100589
  • Loading branch information
DaveCTurner authored Oct 11, 2023
1 parent 29e3d28 commit c4e55ab
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 4 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/100610.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 100610
summary: Fix interruption of `markAllocationIdAsInSync`
area: Recovery
type: bug
issues:
- 96578
- 100589
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand Down Expand Up @@ -70,6 +74,8 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.ReplicaShardAllocatorIT;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
Expand All @@ -85,6 +91,7 @@
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.GlobalCheckpointListeners;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -122,7 +129,9 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -132,6 +141,7 @@
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
Expand Down Expand Up @@ -1688,6 +1698,104 @@ public void testWaitForClusterStateToBeAppliedOnSourceNode() throws Exception {
}
}

public void testDeleteIndexDuringFinalization() throws Exception {
internalCluster().startMasterOnlyNode();
final var primaryNode = internalCluster().startDataOnlyNode();
String indexName = "test-index";
createIndex(indexName, indexSettings(1, 0).build());
ensureGreen(indexName);
final List<IndexRequestBuilder> indexRequests = IntStream.range(0, between(10, 500))
.mapToObj(n -> client().prepareIndex(indexName).setSource("foo", "bar"))
.toList();
indexRandom(randomBoolean(), true, true, indexRequests);
assertThat(indicesAdmin().prepareFlush(indexName).get().getFailedShards(), equalTo(0));

final var replicaNode = internalCluster().startDataOnlyNode();

final SubscribableListener<Void> recoveryCompleteListener = new SubscribableListener<>();
final PlainActionFuture<AcknowledgedResponse> deleteListener = new PlainActionFuture<>();

final var threadPool = internalCluster().clusterService().threadPool();

final var indexId = internalCluster().clusterService().state().routingTable().index(indexName).getIndex();
final var primaryIndexShard = internalCluster().getInstance(IndicesService.class, primaryNode)
.indexServiceSafe(indexId)
.getShard(0);
final var globalCheckpointBeforeRecovery = primaryIndexShard.getLastSyncedGlobalCheckpoint();

final var replicaNodeTransportService = asInstanceOf(
MockTransportService.class,
internalCluster().getInstance(TransportService.class, replicaNode)
);
replicaNodeTransportService.addRequestHandlingBehavior(
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
(handler, request, channel, task) -> handler.messageReceived(
request,
new TestTransportChannel(ActionTestUtils.assertNoFailureListener(response -> {
// Process the TRANSLOG_OPS response on the replica (avoiding failing it due to a concurrent delete) but
// before sending the response back send another document to the primary, advancing the GCP to prevent the replica
// being marked as in-sync (NB below we delay the replica write until after the index is deleted)
client().prepareIndex(indexName).setSource("foo", "baz").execute(ActionListener.noop());

primaryIndexShard.addGlobalCheckpointListener(
globalCheckpointBeforeRecovery + 1,
new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
}

@Override
public void accept(long globalCheckpoint, Exception e) {
assertNull(e);

// Now the GCP has advanced the replica won't be marked in-sync so respond to the TRANSLOG_OPS request
// to start recovery finalization
try {
channel.sendResponse(response);
} catch (IOException ex) {
fail(ex);
}

// Wait a short while for finalization to block on advancing the replica's GCP and then delete the index
threadPool.schedule(
() -> client().admin().indices().prepareDelete(indexName).execute(deleteListener),
TimeValue.timeValueMillis(100),
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
},
TimeValue.timeValueSeconds(10)
);
})),
task
)
);

// delay the delivery of the replica write until the end of the test so the replica never becomes in-sync
replicaNodeTransportService.addRequestHandlingBehavior(
BulkAction.NAME + "[s][r]",
(handler, request, channel, task) -> recoveryCompleteListener.addListener(
assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task))
)
);

// Create the replica to trigger the whole process
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// Wait for the index to be deleted
assertTrue(deleteListener.get(20, TimeUnit.SECONDS).isAcknowledged());

final var peerRecoverySourceService = internalCluster().getInstance(PeerRecoverySourceService.class, primaryNode);
assertBusy(() -> assertEquals(0, peerRecoverySourceService.numberOfOngoingRecoveries()));
recoveryCompleteListener.onResponse(null);
}

private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List<String> nodes, int shard) throws Exception {
assertThat(nodes, is(not(empty())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -426,7 +425,7 @@ public void onFailure(Exception e) {
}

static void runUnderPrimaryPermit(
CheckedRunnable<Exception> action,
Runnable action,
IndexShard primary,
CancellableThreads cancellableThreads,
ActionListener<Void> listener
Expand Down Expand Up @@ -1260,7 +1259,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
*/
final SubscribableListener<Void> markInSyncStep = new SubscribableListener<>();
runUnderPrimaryPermit(
() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
() -> cancellableThreads.execute(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint)),
shard,
cancellableThreads,
markInSyncStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.elasticsearch.xcontent.XContentParser.Token;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -2036,7 +2037,17 @@ protected static boolean isTurkishLocale() {
|| Locale.getDefault().getLanguage().equals(new Locale("az").getLanguage());
}

public static void fail(Throwable t, String msg, Object... args) {
public static <T> T fail(Throwable t, String msg, Object... args) {
throw new AssertionError(org.elasticsearch.common.Strings.format(msg, args), t);
}

public static <T> T fail(Throwable t) {
return fail(t, "unexpected");
}

@SuppressWarnings("unchecked")
public static <T> T asInstanceOf(Class<T> clazz, Object o) {
assertThat(o, Matchers.instanceOf(clazz));
return (T) o;
}
}

0 comments on commit c4e55ab

Please sign in to comment.