Skip to content

Commit

Permalink
ILM: Add total_shards_per_node setting to searchable snapshot (elasti…
Browse files Browse the repository at this point in the history
…c#112972)

Allows setting index total_shards_per_node in the SearchableSnapshot action of ILM to remediate hot spot in shard allocation for searchable snapshot index.

Closes elastic#112261
  • Loading branch information
samxbr authored Sep 23, 2024
1 parent 58021c3 commit 80dd563
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 29 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/112972.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 112972
summary: "ILM: Add `total_shards_per_node` setting to searchable snapshot"
area: ILM+SLM
type: enhancement
issues:
- 112261
5 changes: 4 additions & 1 deletion docs/reference/ilm/actions/ilm-searchable-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ index>> prefixed with `partial-` to the frozen tier. In other phases, the action

In the frozen tier, the action will ignore the setting
<<total-shards-per-node,`index.routing.allocation.total_shards_per_node`>>, if it was present in the original index,
to account for the difference in the number of nodes between the frozen and the other tiers.
to account for the difference in the number of nodes between the frozen and the other tiers. To set <<total-shards-per-node,`index.routing.allocation.total_shards_per_node`>> for searchable snapshots, set the `total_shards_per_node` option in the frozen phase's `searchable_snapshot` action within the ILM policy.


WARNING: Don't include the `searchable_snapshot` action in both the hot and cold
Expand Down Expand Up @@ -74,6 +74,9 @@ will be performed on the hot nodes. If using a `searchable_snapshot` action in t
force merge will be performed on whatever tier the index is *prior* to the `cold` phase (either
`hot` or `warm`).

`total_shards_per_node`::
The maximum number of shards (replicas and primaries) that will be allocated to a single node for the searchable snapshot index. Defaults to unbounded.

[[ilm-searchable-snapshot-ex]]
==== Examples
////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ static TransportVersion def(int id) {
public static final TransportVersion FAILURE_STORE_STATUS_IN_INDEX_RESPONSE = def(8_746_00_0);
public static final TransportVersion ESQL_AGGREGATION_OPERATOR_STATUS_FINISH_NANOS = def(8_747_00_0);
public static final TransportVersion ML_TELEMETRY_MEMORY_ADDED = def(8_748_00_0);
public static final TransportVersion ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE = def(8_749_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;

import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -37,17 +39,34 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep {

private final String restoredIndexPrefix;
private final MountSearchableSnapshotRequest.Storage storageType;
@Nullable
private final Integer totalShardsPerNode;

public MountSnapshotStep(
StepKey key,
StepKey nextStepKey,
Client client,
String restoredIndexPrefix,
MountSearchableSnapshotRequest.Storage storageType
MountSearchableSnapshotRequest.Storage storageType,
@Nullable Integer totalShardsPerNode
) {
super(key, nextStepKey, client);
this.restoredIndexPrefix = restoredIndexPrefix;
this.storageType = Objects.requireNonNull(storageType, "a storage type must be specified");
if (totalShardsPerNode != null && totalShardsPerNode < 1) {
throw new IllegalArgumentException("[" + SearchableSnapshotAction.TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1");
}
this.totalShardsPerNode = totalShardsPerNode;
}

public MountSnapshotStep(
StepKey key,
StepKey nextStepKey,
Client client,
String restoredIndexPrefix,
MountSearchableSnapshotRequest.Storage storageType
) {
this(key, nextStepKey, client, restoredIndexPrefix, storageType, null);
}

@Override
Expand All @@ -63,6 +82,11 @@ public MountSearchableSnapshotRequest.Storage getStorage() {
return storageType;
}

@Nullable
public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
String indexName = indexMetadata.getIndex().getName();
Expand Down Expand Up @@ -140,6 +164,9 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
final Settings.Builder settingsBuilder = Settings.builder();

overrideTierPreference(this.getKey().phase()).ifPresent(override -> settingsBuilder.put(DataTier.TIER_PREFERENCE, override));
if (totalShardsPerNode != null) {
settingsBuilder.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), totalShardsPerNode);
}

final MountSearchableSnapshotRequest mountSearchableSnapshotRequest = new MountSearchableSnapshotRequest(
TimeValue.MAX_VALUE,
Expand All @@ -148,9 +175,9 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl
snapshotName,
indexName,
settingsBuilder.build(),
ignoredIndexSettings(this.getKey().phase()),
ignoredIndexSettings(),
// we'll not wait for the snapshot to complete in this step as the async steps are executed from threads that shouldn't
// perform expensive operations (ie. clusterStateProcessed)
// perform expensive operations (i.e. clusterStateProcessed)
false,
storageType
);
Expand Down Expand Up @@ -198,23 +225,27 @@ static Optional<String> overrideTierPreference(String phase) {
* setting, the restored index would be captured by the ILM runner and, depending on what ILM execution state was captured at snapshot
* time, make it's way forward from _that_ step forward in the ILM policy. We'll re-set this setting on the restored index at a later
* step once we restored a deterministic execution state
* - index.routing.allocation.total_shards_per_node: It is likely that frozen tier has fewer nodes than the hot tier.
* Keeping this setting runs the risk that we will not have enough nodes to allocate all the shards in the
* frozen tier and the user does not have any way of fixing this. For this reason, we ignore this setting when moving to frozen.
* - index.routing.allocation.total_shards_per_node: It is likely that frozen tier has fewer nodes than the hot tier. If this setting
* is not specifically set in the frozen tier, keeping this setting runs the risk that we will not have enough nodes to
* allocate all the shards in the frozen tier and the user does not have any way of fixing this. For this reason, we ignore this
* setting when moving to frozen. We do not ignore this setting if it is specifically set in the mount searchable snapshot step
* of frozen tier.
*/
static String[] ignoredIndexSettings(String phase) {
String[] ignoredIndexSettings() {
ArrayList<String> ignoredSettings = new ArrayList<>();
ignoredSettings.add(LifecycleSettings.LIFECYCLE_NAME);
// if we are mounting a searchable snapshot in the hot phase, then we should not change the total_shards_per_node setting
if (TimeseriesLifecycleType.FROZEN_PHASE.equals(phase)) {
return new String[] {
LifecycleSettings.LIFECYCLE_NAME,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey() };
// if total_shards_per_node setting is specifically set for the frozen phase and not propagated from previous phase,
// then it should not be ignored
if (TimeseriesLifecycleType.FROZEN_PHASE.equals(this.getKey().phase()) && this.totalShardsPerNode == null) {
ignoredSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey());
}
return new String[] { LifecycleSettings.LIFECYCLE_NAME };
return ignoredSettings.toArray(new String[0]);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType);
return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode);
}

@Override
Expand All @@ -228,6 +259,7 @@ public boolean equals(Object obj) {
MountSnapshotStep other = (MountSnapshotStep) obj;
return super.equals(obj)
&& Objects.equals(restoredIndexPrefix, other.restoredIndexPrefix)
&& Objects.equals(storageType, other.storageType);
&& Objects.equals(storageType, other.storageType)
&& Objects.equals(totalShardsPerNode, other.totalShardsPerNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY;
Expand All @@ -49,6 +50,7 @@ public class SearchableSnapshotAction implements LifecycleAction {

public static final ParseField SNAPSHOT_REPOSITORY = new ParseField("snapshot_repository");
public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField("total_shards_per_node");
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep.NAME + "-check-prerequisites";
public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep.NAME + "-check-existing-snapshot";
Expand All @@ -58,12 +60,13 @@ public class SearchableSnapshotAction implements LifecycleAction {

private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(
NAME,
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1])
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1], (Integer) a[2])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), SNAPSHOT_REPOSITORY);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), TOTAL_SHARDS_PER_NODE);
}

public static SearchableSnapshotAction parse(XContentParser parser) {
Expand All @@ -72,22 +75,36 @@ public static SearchableSnapshotAction parse(XContentParser parser) {

private final String snapshotRepository;
private final boolean forceMergeIndex;
@Nullable
private final Integer totalShardsPerNode;

public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex, @Nullable Integer totalShardsPerNode) {
if (Strings.hasText(snapshotRepository) == false) {
throw new IllegalArgumentException("the snapshot repository must be specified");
}
this.snapshotRepository = snapshotRepository;
this.forceMergeIndex = forceMergeIndex;

if (totalShardsPerNode != null && totalShardsPerNode < 1) {
throw new IllegalArgumentException("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1");
}
this.totalShardsPerNode = totalShardsPerNode;
}

public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
this(snapshotRepository, forceMergeIndex, null);
}

public SearchableSnapshotAction(String snapshotRepository) {
this(snapshotRepository, true);
this(snapshotRepository, true, null);
}

public SearchableSnapshotAction(StreamInput in) throws IOException {
this.snapshotRepository = in.readString();
this.forceMergeIndex = in.readBoolean();
this.totalShardsPerNode = in.getTransportVersion().onOrAfter(ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE)
? in.readOptionalInt()
: null;
}

boolean isForceMergeIndex() {
Expand All @@ -98,6 +115,10 @@ public String getSnapshotRepository() {
return snapshotRepository;
}

public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
assert false;
Expand Down Expand Up @@ -298,7 +319,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
waitForGreenRestoredIndexKey,
client,
getRestoredIndexPrefix(mountSnapshotKey),
storageType
storageType,
totalShardsPerNode
);
WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep(
waitForGreenRestoredIndexKey,
Expand Down Expand Up @@ -402,13 +424,19 @@ public String getWriteableName() {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(snapshotRepository);
out.writeBoolean(forceMergeIndex);
if (out.getTransportVersion().onOrAfter(ILM_ADD_SEARCHABLE_SNAPSHOT_TOTAL_SHARDS_PER_NODE)) {
out.writeOptionalInt(totalShardsPerNode);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SNAPSHOT_REPOSITORY.getPreferredName(), snapshotRepository);
builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex);
if (totalShardsPerNode != null) {
builder.field(TOTAL_SHARDS_PER_NODE.getPreferredName(), totalShardsPerNode);
}
builder.endObject();
return builder;
}
Expand All @@ -422,12 +450,14 @@ public boolean equals(Object o) {
return false;
}
SearchableSnapshotAction that = (SearchableSnapshotAction) o;
return Objects.equals(snapshotRepository, that.snapshotRepository) && Objects.equals(forceMergeIndex, that.forceMergeIndex);
return Objects.equals(snapshotRepository, that.snapshotRepository)
&& Objects.equals(forceMergeIndex, that.forceMergeIndex)
&& Objects.equals(totalShardsPerNode, that.totalShardsPerNode);
}

@Override
public int hashCode() {
return Objects.hash(snapshotRepository, forceMergeIndex);
return Objects.hash(snapshotRepository, forceMergeIndex, totalShardsPerNode);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
frozenTime,
Collections.singletonMap(
SearchableSnapshotAction.NAME,
new SearchableSnapshotAction(randomAlphaOfLength(10), randomBoolean())
new SearchableSnapshotAction(
randomAlphaOfLength(10),
randomBoolean(),
(randomBoolean() ? null : randomIntBetween(1, 100))
)
)
)
);
Expand Down
Loading

0 comments on commit 80dd563

Please sign in to comment.