Skip to content

Commit

Permalink
Make ILM force merging best effort (#43246)
Browse files Browse the repository at this point in the history
It's possible for force merges kicked off by ILM to silently stop (due
to a node relocating for example). In which case, the segment count may
not reach what the user configured. In the subsequent `SegmentCountStep`
waiting for the expected segment count may wait indefinitely. Because of
this, this commit makes force merges "best effort" and then changes the
`SegmentCountStep` to simply report (at INFO level) if the merge was not
successful.

Relates to #42824
Resolves #43245
  • Loading branch information
dakrone committed Jun 17, 2019
1 parent 2704239 commit b09ba08
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
Expand All @@ -17,13 +22,17 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.StreamSupport;
import java.util.stream.Collectors;

/**
* This {@link Step} evaluates whether force_merge was successful by checking the segment count.
*/
public class SegmentCountStep extends AsyncWaitStep {

private static final Logger logger = LogManager.getLogger(SegmentCountStep.class);
public static final String NAME = "segment-count";

private final int maxNumSegments;
Expand All @@ -41,10 +50,19 @@ public int getMaxNumSegments() {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
ActionListener.wrap(response -> {
long numberShardsLeftToMerge =
StreamSupport.stream(response.getIndices().get(indexMetaData.getIndex().getName()).spliterator(), false)
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
IndexSegments segments = response.getIndices().get(indexMetaData.getIndex().getName());
List<ShardSegments> unmergedShards = segments.getShards().values().stream()
.flatMap(iss -> Arrays.stream(iss.getShards()))
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
.collect(Collectors.toList());
if (unmergedShards.size() > 0) {
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
indexMetaData.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
}
// Force merging is best effort, so always return true that the condition has been met.
listener.onResponse(true, new Info(unmergedShards.size()));
}, listener::onFailure));
}

Expand Down Expand Up @@ -90,8 +108,12 @@ public long getNumberShardsLeftToMerge() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(),
"Waiting for [" + numberShardsLeftToMerge + "] shards " + "to forcemerge");
if (numberShardsLeftToMerge == 0) {
builder.field(MESSAGE.getPreferredName(), "all shards force merged successfully");
} else {
builder.field(MESSAGE.getPreferredName(),
"[" + numberShardsLeftToMerge + "] shards did not successfully force merge");
}
builder.field(SHARDS_TO_MERGE.getPreferredName(), numberShardsLeftToMerge);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void onFailure(Exception e) {
assertEquals(new SegmentCountStep.Info(0L), conditionInfo.get());
}

public void testIsConditionFails() {
public void testIsConditionIsTrueEvenWhenMoreSegments() {
int maxNumSegments = randomIntBetween(3, 10);
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Client client = Mockito.mock(Client.class);
Expand Down Expand Up @@ -191,8 +191,8 @@ public void onFailure(Exception e) {
}
});

assertFalse(conditionMetResult.get());
assertEquals(new SegmentCountStep.Info(1L), conditionInfo.get());
assertTrue(conditionMetResult.get());
assertEquals(new SegmentCountStep.Info(0L), conditionInfo.get());
}

public void testThrowsException() {
Expand Down

0 comments on commit b09ba08

Please sign in to comment.