Skip to content

Commit

Permalink
Fix enrich policy runner exception handling on empty segments response (
Browse files Browse the repository at this point in the history
#111290) (#111371)

* Fix enrich segment action listener exception logic

* Update docs/changelog/111290.yaml

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
jbaiera and elasticmachine authored Jul 29, 2024
1 parent d43021e commit 3f30e38
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 37 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111290.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111290
summary: Fix enrich policy runner exception handling on empty segments response
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class IndicesSegmentResponse extends ChunkedBroadcastResponse {

private volatile Map<String, IndexSegments> indicesSegments;

IndicesSegmentResponse(
public IndicesSegmentResponse(
ShardSegments[] shards,
int totalShards,
int successfulShards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.FilterClient;
import org.elasticsearch.client.internal.OriginSettingClient;
Expand Down Expand Up @@ -572,48 +572,82 @@ private void refreshEnrichIndex(final String destinationIndexName, final int att
protected void ensureSingleSegment(final String destinationIndexName, final int attempt) {
enrichOriginClient().admin()
.indices()
.segments(new IndicesSegmentsRequest(destinationIndexName), new DelegatingActionListener<>(listener) {
@Override
public void onResponse(IndicesSegmentResponse indicesSegmentResponse) {
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName);
if (indexSegments == null) {
.segments(new IndicesSegmentsRequest(destinationIndexName), listener.delegateFailureAndWrap((l, indicesSegmentResponse) -> {
int failedShards = indicesSegmentResponse.getFailedShards();
if (failedShards > 0) {
// Encountered a problem while querying the segments for the enrich index. Try and surface the problem in the log.
logger.warn(
"Policy [{}]: Encountered [{}] shard level failures while querying the segments for enrich index [{}]. "
+ "Turn on DEBUG logging for details.",
policyName,
failedShards,
enrichIndexName
);
if (logger.isDebugEnabled()) {
DefaultShardOperationFailedException[] shardFailures = indicesSegmentResponse.getShardFailures();
int failureNumber = 1;
String logPrefix = "Policy [" + policyName + "]: Encountered shard failure [";
String logSuffix = " of "
+ shardFailures.length
+ "] while querying segments for enrich index ["
+ enrichIndexName
+ "]. Shard [";
for (DefaultShardOperationFailedException shardFailure : shardFailures) {
logger.debug(
logPrefix + failureNumber + logSuffix + shardFailure.index() + "][" + shardFailure.shardId() + "]",
shardFailure.getCause()
);
failureNumber++;
}
}
}
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName);
if (indexSegments == null) {
if (indicesSegmentResponse.getShardFailures().length == 0) {
throw new ElasticsearchException(
"Could not locate segment information for newly created index [{}]",
destinationIndexName
);
} else {
DefaultShardOperationFailedException shardFailure = indicesSegmentResponse.getShardFailures()[0];
throw new ElasticsearchException(
"Could not obtain segment information for newly created index [{}]; shard info [{}][{}]",
shardFailure.getCause(),
destinationIndexName,
shardFailure.index(),
shardFailure.shardId()
);
}
Map<Integer, IndexShardSegments> indexShards = indexSegments.getShards();
assert indexShards.size() == 1 : "Expected enrich index to contain only one shard";
ShardSegments[] shardSegments = indexShards.get(0).shards();
assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point";
ShardSegments primarySegments = shardSegments[0];
if (primarySegments.getSegments().size() > 1) {
int nextAttempt = attempt + 1;
if (nextAttempt > maxForceMergeAttempts) {
delegate.onFailure(
new ElasticsearchException(
"Force merging index [{}] attempted [{}] times but did not result in one segment.",
destinationIndexName,
attempt,
maxForceMergeAttempts
)
);
} else {
logger.debug(
"Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})",
policyName,
primarySegments.getSegments().size(),
nextAttempt,
maxForceMergeAttempts
);
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
}
}
Map<Integer, IndexShardSegments> indexShards = indexSegments.getShards();
assert indexShards.size() == 1 : "Expected enrich index to contain only one shard";
ShardSegments[] shardSegments = indexShards.get(0).shards();
assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point";
ShardSegments primarySegments = shardSegments[0];
if (primarySegments.getSegments().size() > 1) {
int nextAttempt = attempt + 1;
if (nextAttempt > maxForceMergeAttempts) {
throw new ElasticsearchException(
"Force merging index [{}] attempted [{}] times but did not result in one segment.",
destinationIndexName,
attempt,
maxForceMergeAttempts
);
} else {
// Force merge down to one segment successful
setIndexReadOnly(destinationIndexName);
logger.debug(
"Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})",
policyName,
primarySegments.getSegments().size(),
nextAttempt,
maxForceMergeAttempts
);
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
}
} else {
// Force merge down to one segment successful
setIndexReadOnly(destinationIndexName);
}
});
}));
}

private void setIndexReadOnly(final String destinationIndexName) {
Expand Down
Loading

0 comments on commit 3f30e38

Please sign in to comment.