From 4d62a328103cbbc138b188d133b87bb1ef72f4e9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 5 Apr 2024 09:57:52 +0200 Subject: [PATCH] Fix a downsample persistent task assignment bug (#106247) (#106280) If as part of the persistent task assignment the source downsample index no longer exists, then the persistent task framework will continuously try to find an assignment and fail with IndexNotFoundException (which gets logged as a warning on elected master node). This fixes a bug in resolving the shard routing, so that if the index no longer exists any node is returned and the persistent task can fail gracefully at a later stage. The original fix via #98769 didn't get this part right. Co-authored-by: Elastic Machine --- docs/changelog/106247.yaml | 5 + ...DownsampleShardPersistentTaskExecutor.java | 11 +- ...ampleShardPersistentTaskExecutorTests.java | 125 ++++++++++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 docs/changelog/106247.yaml create mode 100644 x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java diff --git a/docs/changelog/106247.yaml b/docs/changelog/106247.yaml new file mode 100644 index 0000000000000..5895dffd685a4 --- /dev/null +++ b/docs/changelog/106247.yaml @@ -0,0 +1,5 @@ +pr: 106247 +summary: Fix a downsample persistent task assignment bug +area: Downsampling +type: bug +issues: [] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 2392b35b4f411..b4116d42d25ca 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -23,6 +23,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamOutput; @@ -135,7 +136,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( // If during re-assignment the source index was deleted, then we need to break out. // Returning NO_NODE_FOUND just keeps the persistent task until the source index appears again (which would never happen) // So let's return a node and then in the node operation we would just fail and stop this persistent task - var indexShardRouting = clusterState.routingTable().shardRoutingTable(params.shardId().getIndexName(), params.shardId().id()); + var indexShardRouting = findShardRoutingTable(shardId, clusterState); if (indexShardRouting == null) { var node = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData); return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task"); @@ -176,6 +177,14 @@ private void delegate(final AllocatedPersistentTask task, final DownsampleShardT ); } + private static IndexShardRoutingTable findShardRoutingTable(ShardId shardId, ClusterState clusterState) { + var indexRoutingTable = clusterState.routingTable().index(shardId.getIndexName()); + if (indexRoutingTable != null) { + return indexRoutingTable.shard(shardId.getId()); + } + return null; + } + static void realNodeOperation( Client client, IndicesService indicesService, diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java new file mode 100644 index 0000000000000..06f6be27e9f3d --- /dev/null +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.elasticsearch.action.downsample.DownsampleConfig; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; +import org.junit.Before; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase { + + private ClusterState initialClusterState; + private DownsampleShardPersistentTaskExecutor executor; + + @Before + public void setup() { + Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); + Instant start = now.minus(2, ChronoUnit.HOURS); + Instant end = now.plus(40, ChronoUnit.MINUTES); + initialClusterState = DataStreamTestHelper.getClusterStateWithDataStream("metrics-app1", List.of(new Tuple<>(start, end))); + executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class)); + } + + public void testGetAssignment() { + var backingIndex = initialClusterState.metadata().dataStreams().get("metrics-app1").getWriteIndex(); + var node = newNode(); + var shardId = new ShardId(backingIndex, 0); + var clusterState = ClusterState.builder(initialClusterState) + .nodes(new DiscoveryNodes.Builder().add(node).build()) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(backingIndex) + .addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build()) + ) + ) + .build(); + + var params = new DownsampleShardTaskParams( + new DownsampleConfig(new DateHistogramInterval("1h")), + shardId.getIndexName(), + 1, + 1, + shardId, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY + ); + var result = executor.getAssignment(params, Set.of(node), clusterState); + assertThat(result.getExecutorNode(), equalTo(node.getId())); + } + + public void testGetAssignmentMissingIndex() { + var backingIndex = initialClusterState.metadata().dataStreams().get("metrics-app1").getWriteIndex(); + var node = newNode(); + var shardId = new ShardId(backingIndex, 0); + var clusterState = ClusterState.builder(initialClusterState) + .nodes(new DiscoveryNodes.Builder().add(node).build()) + .routingTable( + RoutingTable.builder() + .add( + IndexRoutingTable.builder(backingIndex) + .addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build()) + ) + ) + .build(); + + var missingShardId = new ShardId(new Index("another_index", "uid"), 0); + var params = new DownsampleShardTaskParams( + new DownsampleConfig(new DateHistogramInterval("1h")), + missingShardId.getIndexName(), + 1, + 1, + missingShardId, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY, + Strings.EMPTY_ARRAY + ); + var result = executor.getAssignment(params, Set.of(node), clusterState); + assertThat(result.getExecutorNode(), equalTo(node.getId())); + assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task")); + } + + private static DiscoveryNode newNode() { + return DiscoveryNodeUtils.create( + "node_" + UUIDs.randomBase64UUID(random()), + buildNewFakeTransportAddress(), + Map.of(), + DiscoveryNodeRole.roles() + ); + } + +}