Skip to content

Commit

Permalink
Fix a downsample persistent task assignment bug (#106247) (#106280)
Browse files Browse the repository at this point in the history
If as part of the persistent task assignment the source downsample index no longer exists, then the persistent task framework will continuously try to find an assignment and fail with IndexNotFoundException (which gets logged as a warning on elected master node).

This fixes a bug in resolving the shard routing, so that if the index no longer exists any node is returned and the persistent task can fail gracefully at a later stage.

The original fix via #98769 didn't get this part right.

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
martijnvg and elasticmachine authored Apr 5, 2024
1 parent 2eb9729 commit 4d62a32
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/changelog/106247.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106247
summary: Fix a downsample persistent task assignment bug
area: Downsampling
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -135,7 +136,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
// If during re-assignment the source index was deleted, then we need to break out.
// Returning NO_NODE_FOUND just keeps the persistent task until the source index appears again (which would never happen)
// So let's return a node and then in the node operation we would just fail and stop this persistent task
var indexShardRouting = clusterState.routingTable().shardRoutingTable(params.shardId().getIndexName(), params.shardId().id());
var indexShardRouting = findShardRoutingTable(shardId, clusterState);
if (indexShardRouting == null) {
var node = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
Expand Down Expand Up @@ -176,6 +177,14 @@ private void delegate(final AllocatedPersistentTask task, final DownsampleShardT
);
}

private static IndexShardRoutingTable findShardRoutingTable(ShardId shardId, ClusterState clusterState) {
var indexRoutingTable = clusterState.routingTable().index(shardId.getIndexName());
if (indexRoutingTable != null) {
return indexRoutingTable.shard(shardId.getId());
}
return null;
}

static void realNodeOperation(
Client client,
IndicesService indicesService,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.downsample.DownsampleShardTask;
import org.junit.Before;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;

import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {

private ClusterState initialClusterState;
private DownsampleShardPersistentTaskExecutor executor;

@Before
public void setup() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Instant start = now.minus(2, ChronoUnit.HOURS);
Instant end = now.plus(40, ChronoUnit.MINUTES);
initialClusterState = DataStreamTestHelper.getClusterStateWithDataStream("metrics-app1", List.of(new Tuple<>(start, end)));
executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class));
}

public void testGetAssignment() {
var backingIndex = initialClusterState.metadata().dataStreams().get("metrics-app1").getWriteIndex();
var node = newNode();
var shardId = new ShardId(backingIndex, 0);
var clusterState = ClusterState.builder(initialClusterState)
.nodes(new DiscoveryNodes.Builder().add(node).build())
.routingTable(
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build())
)
)
.build();

var params = new DownsampleShardTaskParams(
new DownsampleConfig(new DateHistogramInterval("1h")),
shardId.getIndexName(),
1,
1,
shardId,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
}

public void testGetAssignmentMissingIndex() {
var backingIndex = initialClusterState.metadata().dataStreams().get("metrics-app1").getWriteIndex();
var node = newNode();
var shardId = new ShardId(backingIndex, 0);
var clusterState = ClusterState.builder(initialClusterState)
.nodes(new DiscoveryNodes.Builder().add(node).build())
.routingTable(
RoutingTable.builder()
.add(
IndexRoutingTable.builder(backingIndex)
.addShard(shardRoutingBuilder(shardId, node.getId(), true, STARTED).withRecoverySource(null).build())
)
)
.build();

var missingShardId = new ShardId(new Index("another_index", "uid"), 0);
var params = new DownsampleShardTaskParams(
new DownsampleConfig(new DateHistogramInterval("1h")),
missingShardId.getIndexName(),
1,
1,
missingShardId,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
}

private static DiscoveryNode newNode() {
return DiscoveryNodeUtils.create(
"node_" + UUIDs.randomBase64UUID(random()),
buildNewFakeTransportAddress(),
Map.of(),
DiscoveryNodeRole.roles()
);
}

}

0 comments on commit 4d62a32

Please sign in to comment.