From a14ad5fbb6768ee3ce3bb6083b91d86b85a2676a Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 9 Feb 2024 12:24:16 +0000 Subject: [PATCH] Finalize all snapshots completed by shard snapshot updates (#105245) Today when processing a batch of `ShardSnapshotUpdate` tasks each update's listener considers whether the corresponding snapshot has completed and, if so, it enqueues it for finalization. This is somewhat inefficient since we may be processing many shard snapshot updates for the same few snapshots, but there's no need to check each snapshot for completion more than once. It's also insufficient since the completion of a shard snapshot may cause the completion of subsequent snapshots too (e.g. they can go from state `QUEUED` straight to `MISSING`). This commit detaches the completion handling from the individual shard snapshot updates and instead makes sure that any snapshot that reaches a completed state is enqueued for finalization. Closes #104939 --- docs/changelog/105245.yaml | 6 + .../snapshots/SnapshotsService.java | 99 ++++++--- .../snapshots/SnapshotResiliencyTests.java | 204 ++++++++++++++++-- .../snapshots/SnapshotsServiceTests.java | 2 +- 4 files changed, 267 insertions(+), 44 deletions(-) create mode 100644 docs/changelog/105245.yaml diff --git a/docs/changelog/105245.yaml b/docs/changelog/105245.yaml new file mode 100644 index 0000000000000..f6093f2c7435e --- /dev/null +++ b/docs/changelog/105245.yaml @@ -0,0 +1,6 @@ +pr: 105245 +summary: Finalize all snapshots completed by shard snapshot updates +area: Snapshot/Restore +type: bug +issues: + - 104939 diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1785a0d7a8415..a9bbd35de8257 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -70,7 +71,6 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; -import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; @@ -188,6 +188,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement private final MasterServiceTaskQueue masterServiceTaskQueue; + private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler; + /** * Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the * cluster state. The number of concurrent operations in a cluster state is defined as the sum of @@ -238,6 +240,7 @@ public SnapshotsService( this.systemIndices = systemIndices; this.masterServiceTaskQueue = clusterService.createTaskQueue("snapshots-service", Priority.NORMAL, new SnapshotTaskExecutor()); + this.shardSnapshotUpdateCompletionHandler = this::handleShardSnapshotUpdateCompletion; } /** @@ -3084,16 +3087,19 @@ static final class SnapshotShardsUpdateContext { // updates that were used to update an existing in-progress shard snapshot private final Set executedUpdates = new HashSet<>(); - // enqueues a reroute because some shard snapshots finished - private final Runnable rerouteRunnable; + // handles the completion of some shard-snapshot updates, performing the next possible actions + private final ShardSnapshotUpdateCompletionHandler completionHandler; + + // entries that became complete due to this batch of updates + private final List newlyCompletedEntries = new ArrayList<>(); SnapshotShardsUpdateContext( ClusterStateTaskExecutor.BatchExecutionContext batchExecutionContext, - Runnable rerouteRunnable + ShardSnapshotUpdateCompletionHandler completionHandler ) { this.batchExecutionContext = batchExecutionContext; this.initialState = batchExecutionContext.initialState(); - this.rerouteRunnable = new RunOnce(rerouteRunnable); // RunOnce to avoid enqueueing O(#shards) listeners + this.completionHandler = completionHandler; this.updatesByRepo = new HashMap<>(); for (final var taskContext : batchExecutionContext.taskContexts()) { if (taskContext.getTask() instanceof ShardSnapshotUpdate task) { @@ -3113,7 +3119,11 @@ SnapshotsInProgress computeUpdatedState() { } final List newEntries = new ArrayList<>(oldEntries.size()); for (SnapshotsInProgress.Entry entry : oldEntries) { - newEntries.add(applyToEntry(entry, updates.getValue())); + final var newEntry = applyToEntry(entry, updates.getValue()); + newEntries.add(newEntry); + if (newEntry != entry && newEntry.state().completed()) { + newlyCompletedEntries.add(newEntry); + } } updated = updated.withUpdatedEntriesForRepo(repoName, newEntries); } @@ -3132,12 +3142,20 @@ SnapshotsInProgress computeUpdatedState() { void completeWithUpdatedState(SnapshotsInProgress snapshotsInProgress) { if (updatesByRepo.isEmpty() == false) { final var result = new ShardSnapshotUpdateResult(initialState.metadata(), snapshotsInProgress); - for (final var taskContext : batchExecutionContext.taskContexts()) { - if (taskContext.getTask() instanceof ShardSnapshotUpdate task) { - taskContext.success(() -> { - rerouteRunnable.run(); - task.listener.onResponse(result); - }); + try ( + var onCompletionRefs = new RefCountingRunnable( + () -> completionHandler.handleCompletion(result, newlyCompletedEntries, updatesByRepo.keySet()) + ) + ) { + for (final var taskContext : batchExecutionContext.taskContexts()) { + if (taskContext.getTask() instanceof ShardSnapshotUpdate task) { + final var ref = onCompletionRefs.acquire(); + taskContext.success(() -> { + try (ref) { + task.listener.onResponse(result); + } + }); + } } } } @@ -3376,6 +3394,37 @@ private ImmutableOpenMap.Builder shardsBuilder() { */ record ShardSnapshotUpdateResult(Metadata metadata, SnapshotsInProgress snapshotsInProgress) {} + interface ShardSnapshotUpdateCompletionHandler { + void handleCompletion( + ShardSnapshotUpdateResult shardSnapshotUpdateResult, + List newlyCompletedEntries, + Set updatedRepositories + ); + } + + private void handleShardSnapshotUpdateCompletion( + ShardSnapshotUpdateResult shardSnapshotUpdateResult, + List newlyCompletedEntries, + Set updatedRepositories + ) { + // Maybe this state update completed one or more snapshots. If we are not already ending them because of some earlier update, end + // them now. + final var snapshotsInProgress = shardSnapshotUpdateResult.snapshotsInProgress(); + for (final var newlyCompletedEntry : newlyCompletedEntries) { + if (endingSnapshots.contains(newlyCompletedEntry.snapshot()) == false) { + endSnapshot(newlyCompletedEntry, shardSnapshotUpdateResult.metadata, null); + } + } + // Likewise this state update may enable some new shard clones on any affected repository, so check them all. + for (final var updatedRepository : updatedRepositories) { + startExecutableClones(snapshotsInProgress, updatedRepository); + } + // Also shard snapshot completions may free up some shards to move to other nodes, so we must trigger a reroute. + if (updatedRepositories.isEmpty() == false) { + rerouteService.reroute("after shards snapshot update", Priority.NORMAL, ActionListener.noop()); + } + } + /** * An update to the snapshot state of a shard. * @@ -3455,23 +3504,13 @@ private void innerUpdateSnapshotState( ShardSnapshotStatus updatedState, ActionListener listener ) { - var update = new ShardSnapshotUpdate(snapshot, shardId, repoShardId, updatedState, listener.delegateFailure((delegate, result) -> { - try { - delegate.onResponse(null); - } finally { - // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent - // state update we check if its state is completed and end it if it is. - final SnapshotsInProgress snapshotsInProgress = result.snapshotsInProgress(); - if (endingSnapshots.contains(snapshot) == false) { - final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(snapshot); - // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo - if (updatedEntry != null && updatedEntry.state().completed()) { - endSnapshot(updatedEntry, result.metadata(), null); - } - } - startExecutableClones(snapshotsInProgress, snapshot.getRepository()); - } - })); + var update = new ShardSnapshotUpdate( + snapshot, + shardId, + repoShardId, + updatedState, + listener.delegateFailure((delegate, result) -> delegate.onResponse(null)) + ); logger.trace("received updated snapshot restore state [{}]", update); masterServiceTaskQueue.submitTask("update snapshot state", update, null); } @@ -3751,7 +3790,7 @@ public ClusterState execute(BatchExecutionContext batchExecutionCo final ClusterState state = batchExecutionContext.initialState(); final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext( batchExecutionContext, - () -> rerouteService.reroute("after shards snapshot update", Priority.NORMAL, ActionListener.noop()) + shardSnapshotUpdateCompletionHandler ); final SnapshotsInProgress initialSnapshots = SnapshotsInProgress.get(state); SnapshotsInProgress snapshotsInProgress = shardsUpdateContext.computeUpdatedState(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index c5d5ecc1f90e8..75fe169d45a50 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RequestValidators; import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; @@ -24,6 +25,8 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction; @@ -67,9 +70,11 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.WriteRequest; @@ -119,6 +124,7 @@ import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.network.NetworkModule; @@ -130,6 +136,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -179,10 +186,15 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.telemetry.tracing.Tracer; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.transport.DisruptableMockTransport; +import org.elasticsearch.transport.TestTransportChannel; +import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.junit.After; @@ -201,6 +213,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1231,6 +1244,142 @@ public void testRunConcurrentSnapshots() { } } + public void testSnapshotCompletedByNodeLeft() { + + // A transport interceptor that throttles the shard snapshot status updates to run one at a time, for more interesting interleavings + final TransportInterceptor throttlingInterceptor = new TransportInterceptor() { + private final ThrottledTaskRunner runner = new ThrottledTaskRunner( + SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME + "-throttle", + 1, + SnapshotResiliencyTests.this::scheduleNow + ); + + @Override + public TransportRequestHandler interceptHandler( + String action, + Executor executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + if (action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) { + return (request, channel, task) -> ActionListener.run( + new ChannelActionListener<>(channel), + l -> runner.enqueueTask( + l.delegateFailureAndWrap( + (ll, ref) -> actualHandler.messageReceived( + request, + new TestTransportChannel(ActionListener.releaseAfter(ll, ref)), + task + ) + ) + ) + ); + } else { + return actualHandler; + } + } + }; + + setupTestCluster(1, 1, node -> node.isMasterNode() ? throttlingInterceptor : TransportService.NOOP_TRANSPORT_INTERCEPTOR); + + final var masterNode = testClusterNodes.randomMasterNodeSafe(); + final var client = masterNode.client; + final var masterClusterService = masterNode.clusterService; + + final var indices = IntStream.range(0, between(1, 4)).mapToObj(i -> "index-" + i).toList(); + final var repoName = "repo"; + final var originalSnapshotName = "original-snapshot"; + + var testListener = SubscribableListener + + // Create the repo and indices + .newForked(stepListener -> { + try (var listeners = new RefCountingListener(stepListener)) { + client().admin() + .cluster() + .preparePutRepository(repoName) + .setType(FsRepository.TYPE) + .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) + .execute(listeners.acquire(createRepoResponse -> {})); + + for (final var index : indices) { + client.admin() + .indices() + .create( + new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(1)), + listeners.acquire(createIndexResponse -> {}) + ); + } + } + }) + + // Take a full snapshot for use as the source for future clones + .andThen( + (l, ignored) -> client().admin() + .cluster() + .prepareCreateSnapshot(repoName, originalSnapshotName) + .setWaitForCompletion(true) + .execute(l.map(v -> null)) + ); + + final var snapshotCount = between(1, 10); + for (int i = 0; i < snapshotCount; i++) { + // Launch a random set of snapshots and clones, one at a time for more interesting interleavings + if (randomBoolean()) { + final var snapshotName = "snapshot-" + i; + testListener = testListener.andThen( + (stepListener, v) -> scheduleNow( + ActionRunnable.wrap( + stepListener, + l -> client.admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .setIndices(randomNonEmptySubsetOf(indices).toArray(String[]::new)) + .setPartial(true) + .execute(l.map(v1 -> null)) + ) + ) + ); + } else { + final var cloneName = "clone-" + i; + testListener = testListener.andThen((stepListener, v) -> scheduleNow(ActionRunnable.wrap(stepListener, l -> { + // The clone API only responds when the clone is complete, but we only want to wait until the clone starts so we watch + // the cluster state instead. + ClusterServiceUtils.addTemporaryStateListener( + masterClusterService, + cs -> SnapshotsInProgress.get(cs) + .forRepo(repoName) + .stream() + .anyMatch( + e -> e.snapshot().getSnapshotId().getName().equals(cloneName) + && e.isClone() + && e.shardsByRepoShardId().isEmpty() == false + ) + ).addListener(l); + client.admin() + .cluster() + .prepareCloneSnapshot(repoName, originalSnapshotName, cloneName) + .setIndices(randomNonEmptySubsetOf(indices).toArray(String[]::new)) + .execute(ActionTestUtils.assertNoFailureListener(r -> {})); + }))); + } + } + + testListener = testListener.andThen((l, ignored) -> scheduleNow(() -> { + // Once all snapshots & clones have started, drop the data node and wait for all snapshot activity to complete + testClusterNodes.disconnectNode(testClusterNodes.randomDataNodeSafe()); + ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty()).addListener(l); + })); + + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue( + "executed all runnable tasks but test steps are still incomplete: " + + Strings.toString(SnapshotsInProgress.get(masterClusterService.state()), true, true), + testListener.isDone() + ); + safeAwait(testListener); // shouldn't throw + } + private RepositoryData getRepositoryData(Repository repository) { final PlainActionFuture res = new PlainActionFuture<>(); repository.getRepositoryData(deterministicTaskQueue::scheduleNow, res); @@ -1358,7 +1507,15 @@ private void runUntil(Supplier fulfilled, long timeout) { } private void setupTestCluster(int masterNodes, int dataNodes) { - testClusterNodes = new TestClusterNodes(masterNodes, dataNodes); + setupTestCluster(masterNodes, dataNodes, ignored -> TransportService.NOOP_TRANSPORT_INTERCEPTOR); + } + + private void setupTestCluster( + int masterNodes, + int dataNodes, + TestClusterNodes.TransportInterceptorFactory transportInterceptorFactory + ) { + testClusterNodes = new TestClusterNodes(masterNodes, dataNodes, transportInterceptorFactory); startCluster(); } @@ -1432,11 +1589,11 @@ private final class TestClusterNodes { */ private final Set disconnectedNodes = new HashSet<>(); - TestClusterNodes(int masterNodes, int dataNodes) { + TestClusterNodes(int masterNodes, int dataNodes, TransportInterceptorFactory transportInterceptorFactory) { for (int i = 0; i < masterNodes; ++i) { nodes.computeIfAbsent("node" + i, nodeName -> { try { - return newMasterNode(nodeName); + return newMasterNode(nodeName, transportInterceptorFactory); } catch (IOException e) { throw new AssertionError(e); } @@ -1445,7 +1602,7 @@ private final class TestClusterNodes { for (int i = 0; i < dataNodes; ++i) { nodes.computeIfAbsent("data-node" + i, nodeName -> { try { - return newDataNode(nodeName); + return newDataNode(nodeName, transportInterceptorFactory); } catch (IOException e) { throw new AssertionError(e); } @@ -1461,17 +1618,19 @@ public TestClusterNode nodeById(final String nodeId) { .orElseThrow(() -> new AssertionError("Could not find node by id [" + nodeId + ']')); } - private TestClusterNode newMasterNode(String nodeName) throws IOException { - return newNode(nodeName, DiscoveryNodeRole.MASTER_ROLE); + private TestClusterNode newMasterNode(String nodeName, TransportInterceptorFactory transportInterceptorFactory) throws IOException { + return newNode(nodeName, DiscoveryNodeRole.MASTER_ROLE, transportInterceptorFactory); } - private TestClusterNode newDataNode(String nodeName) throws IOException { - return newNode(nodeName, DiscoveryNodeRole.DATA_ROLE); + private TestClusterNode newDataNode(String nodeName, TransportInterceptorFactory transportInterceptorFactory) throws IOException { + return newNode(nodeName, DiscoveryNodeRole.DATA_ROLE, transportInterceptorFactory); } - private TestClusterNode newNode(String nodeName, DiscoveryNodeRole role) throws IOException { + private TestClusterNode newNode(String nodeName, DiscoveryNodeRole role, TransportInterceptorFactory transportInterceptorFactory) + throws IOException { return new TestClusterNode( - DiscoveryNodeUtils.builder(randomAlphaOfLength(10)).name(nodeName).roles(Collections.singleton(role)).build() + DiscoveryNodeUtils.builder(randomAlphaOfLength(10)).name(nodeName).roles(Collections.singleton(role)).build(), + transportInterceptorFactory ); } @@ -1565,12 +1724,18 @@ public TestClusterNode currentMaster(ClusterState state) { return master; } + interface TransportInterceptorFactory { + TransportInterceptor createTransportInterceptor(DiscoveryNode node); + } + private final class TestClusterNode { private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( Stream.concat(ClusterModule.getNamedWriteables().stream(), NetworkModule.getNamedWriteables().stream()).toList() ); + private final TransportInterceptorFactory transportInterceptorFactory; + private final TransportService transportService; private final ClusterService clusterService; @@ -1611,8 +1776,9 @@ private final class TestClusterNode { private Coordinator coordinator; - TestClusterNode(DiscoveryNode node) throws IOException { + TestClusterNode(DiscoveryNode node, TransportInterceptorFactory transportInterceptorFactory) throws IOException { this.node = node; + this.transportInterceptorFactory = transportInterceptorFactory; final Environment environment = createEnvironment(node.getName()); threadPool = deterministicTaskQueue.getThreadPool(runnable -> DeterministicTaskQueue.onNodeLog(node, runnable)); masterService = new FakeThreadPoolMasterService(node.getName(), threadPool, deterministicTaskQueue::scheduleNow); @@ -1696,7 +1862,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { transportService = mockTransport.createTransportService( settings, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, + transportInterceptorFactory.createTransportInterceptor(node), a -> node, null, emptySet() @@ -2080,6 +2246,17 @@ protected void assertSnapshotOrGenericThread() { indexNameExpressionResolver ) ); + actions.put( + CloneSnapshotAction.INSTANCE, + new TransportCloneSnapshotAction( + transportService, + clusterService, + threadPool, + snapshotsService, + actionFilters, + indexNameExpressionResolver + ) + ); actions.put( ClusterRerouteAction.INSTANCE, new TransportClusterRerouteAction( @@ -2154,7 +2331,8 @@ public void restart() { scheduleSoon(() -> { try { final TestClusterNode restartedNode = new TestClusterNode( - DiscoveryNodeUtils.create(node.getName(), node.getId(), node.getAddress(), emptyMap(), node.getRoles()) + DiscoveryNodeUtils.create(node.getName(), node.getId(), node.getAddress(), emptyMap(), node.getRoles()), + transportInterceptorFactory ); nodes.put(node.getName(), restartedNode); restartedNode.start(oldState); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index eb95e82120570..56a28b11edfe7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -504,7 +504,7 @@ private static void assertIsNoop(ClusterState state, SnapshotsService.SnapshotTa private static ClusterState applyUpdates(ClusterState state, SnapshotsService.SnapshotTask... updates) throws Exception { return ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(state, batchExecutionContext -> { final SnapshotsInProgress existing = SnapshotsInProgress.get(batchExecutionContext.initialState()); - final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, () -> {}); + final var context = new SnapshotsService.SnapshotShardsUpdateContext(batchExecutionContext, (a, b, c) -> {}); final SnapshotsInProgress updated = context.computeUpdatedState(); context.completeWithUpdatedState(updated); if (existing == updated) {