Skip to content

Commit

Permalink
Make EnrichPolicyRunner more properly async (#111321)
Browse files Browse the repository at this point in the history
Today `EnrichPolicyRunner` carries its listener in a field, with various
morally-async methods masquerading as synchronous ones because they
don't accept the listener from the caller as one might expect.

This commit removes the `listener` field in favour of passing a listener
explicitly between the methods that require it, making it easier to spot
listener leaks.
  • Loading branch information
DaveCTurner authored Jul 29, 2024
1 parent f9007c5 commit fb19b4b
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 420 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -126,8 +127,9 @@ public void runPolicyLocally(
}

task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
Runnable runnable = createPolicyRunner(policyName, policy, enrichIndexName, task, listener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
var policyRunner = createPolicyRunner(policyName, policy, enrichIndexName, task);
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(ActionRunnable.wrap(ActionListener.assertOnce(listener), policyRunner::run));
} catch (Exception e) {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
throw e;
Expand Down Expand Up @@ -206,18 +208,16 @@ public void onFailure(Exception exception) {
});
}

private Runnable createPolicyRunner(
private EnrichPolicyRunner createPolicyRunner(
String policyName,
EnrichPolicy policy,
String enrichIndexName,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener
ExecuteEnrichPolicyTask task
) {
return new EnrichPolicyRunner(
policyName,
policy,
task,
listener,
clusterService,
indicesService,
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

public class EnrichPolicyRunner implements Runnable {
public class EnrichPolicyRunner {

private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);

Expand All @@ -92,7 +92,6 @@ public class EnrichPolicyRunner implements Runnable {
private final String policyName;
private final EnrichPolicy policy;
private final ExecuteEnrichPolicyTask task;
private final ActionListener<ExecuteEnrichPolicyStatus> listener;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final Client client;
Expand All @@ -105,7 +104,6 @@ public class EnrichPolicyRunner implements Runnable {
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener,
ClusterService clusterService,
IndicesService indicesService,
Client client,
Expand All @@ -117,7 +115,6 @@ public class EnrichPolicyRunner implements Runnable {
this.policyName = Objects.requireNonNull(policyName);
this.policy = Objects.requireNonNull(policy);
this.task = Objects.requireNonNull(task);
this.listener = Objects.requireNonNull(listener);
this.clusterService = Objects.requireNonNull(clusterService);
this.indicesService = indicesService;
this.client = wrapClient(client, policyName, task, clusterService);
Expand All @@ -127,8 +124,7 @@ public class EnrichPolicyRunner implements Runnable {
this.maxForceMergeAttempts = maxForceMergeAttempts;
}

@Override
public void run() {
public void run(ActionListener<ExecuteEnrichPolicyStatus> listener) {
try {
logger.info("Policy [{}]: Running enrich policy", policyName);
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING));
Expand All @@ -139,7 +135,7 @@ public void run() {
// This call does not set the origin to ensure that the user executing the policy has permission to access the source index
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex(toMappings(getIndexResponse), clusterService.getSettings());
prepareAndCreateEnrichIndex(toMappings(getIndexResponse), clusterService.getSettings(), l);
}));
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -204,9 +200,7 @@ static void validateMappings(
}
}

private record MappingTypeAndFormat(String type, String format) {

}
private record MappingTypeAndFormat(String type, String format) {}

private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat(
String fieldName,
Expand Down Expand Up @@ -436,7 +430,11 @@ static boolean isIndexableField(MapperService mapperService, String field, Strin
}
}

private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings, Settings settings) {
private void prepareAndCreateEnrichIndex(
List<Map<String, Object>> mappings,
Settings settings,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
int numberOfReplicas = settings.getAsInt(ENRICH_MIN_NUMBER_OF_REPLICAS_NAME, 0);
Settings enrichIndexSettings = Settings.builder()
.put("index.number_of_shards", 1)
Expand All @@ -453,23 +451,23 @@ private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings, Set
.indices()
.create(
createEnrichIndexRequest,
listener.delegateFailure((l, createIndexResponse) -> prepareReindexOperation(enrichIndexName))
listener.delegateFailure((l, createIndexResponse) -> prepareReindexOperation(enrichIndexName, l))
);
}

private void prepareReindexOperation(final String destinationIndexName) {
private void prepareReindexOperation(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
// Check to make sure that the enrich pipeline exists, and create it if it is missing.
if (EnrichPolicyReindexPipeline.exists(clusterService.state()) == false) {
EnrichPolicyReindexPipeline.create(
enrichOriginClient(),
listener.delegateFailure((l, r) -> transferDataToEnrichIndex(destinationIndexName))
listener.delegateFailure((l, r) -> transferDataToEnrichIndex(destinationIndexName, l))
);
} else {
transferDataToEnrichIndex(destinationIndexName);
transferDataToEnrichIndex(destinationIndexName, listener);
}
}

private void transferDataToEnrichIndex(final String destinationIndexName) {
private void transferDataToEnrichIndex(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName);
// Filter down the source fields to just the ones required by the policy
final Set<String> retainFields = new HashSet<>();
Expand Down Expand Up @@ -540,13 +538,17 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
bulkByScrollResponse.getCreated(),
destinationIndexName
);
forceMergeEnrichIndex(destinationIndexName, 1);
forceMergeEnrichIndex(destinationIndexName, 1, delegate);
}
}
});
}

private void forceMergeEnrichIndex(final String destinationIndexName, final int attempt) {
private void forceMergeEnrichIndex(
final String destinationIndexName,
final int attempt,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
logger.debug(
"Policy [{}]: Force merging newly created enrich index [{}] (Attempt {}/{})",
policyName,
Expand All @@ -558,21 +560,29 @@ private void forceMergeEnrichIndex(final String destinationIndexName, final int
.indices()
.forceMerge(
new ForceMergeRequest(destinationIndexName).maxNumSegments(1),
listener.delegateFailure((l, r) -> refreshEnrichIndex(destinationIndexName, attempt))
listener.delegateFailure((l, r) -> refreshEnrichIndex(destinationIndexName, attempt, l))
);
}

private void refreshEnrichIndex(final String destinationIndexName, final int attempt) {
private void refreshEnrichIndex(
final String destinationIndexName,
final int attempt,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
logger.debug("Policy [{}]: Refreshing enrich index [{}]", policyName, destinationIndexName);
enrichOriginClient().admin()
.indices()
.refresh(
new RefreshRequest(destinationIndexName),
listener.delegateFailure((l, r) -> ensureSingleSegment(destinationIndexName, attempt))
listener.delegateFailure((l, r) -> ensureSingleSegment(destinationIndexName, attempt, l))
);
}

protected void ensureSingleSegment(final String destinationIndexName, final int attempt) {
protected void ensureSingleSegment(
final String destinationIndexName,
final int attempt,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
enrichOriginClient().admin()
.indices()
.segments(new IndicesSegmentsRequest(destinationIndexName), listener.delegateFailureAndWrap((l, indicesSegmentResponse) -> {
Expand Down Expand Up @@ -644,29 +654,29 @@ protected void ensureSingleSegment(final String destinationIndexName, final int
nextAttempt,
maxForceMergeAttempts
);
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
forceMergeEnrichIndex(destinationIndexName, nextAttempt, listener);
}
} else {
// Force merge down to one segment successful
setIndexReadOnly(destinationIndexName);
setIndexReadOnly(destinationIndexName, listener);
}
}));
}

private void setIndexReadOnly(final String destinationIndexName) {
private void setIndexReadOnly(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
logger.debug("Policy [{}]: Setting new enrich index [{}] to be read only", policyName, destinationIndexName);
UpdateSettingsRequest request = new UpdateSettingsRequest(destinationIndexName).setPreserveExisting(true)
.settings(Settings.builder().put("index.auto_expand_replicas", "0-all").put("index.blocks.write", "true"));
enrichOriginClient().admin()
.indices()
.updateSettings(request, listener.delegateFailure((l, r) -> waitForIndexGreen(destinationIndexName)));
.updateSettings(request, listener.delegateFailure((l, r) -> waitForIndexGreen(destinationIndexName, l)));
}

private void waitForIndexGreen(final String destinationIndexName) {
private void waitForIndexGreen(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus();
enrichOriginClient().admin()
.cluster()
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName, l)));
}

/**
Expand Down Expand Up @@ -720,7 +730,7 @@ private void validateIndexBeforePromotion(String destinationIndexName, ClusterSt
}
}

private void updateEnrichPolicyAlias(final String destinationIndexName) {
private void updateEnrichPolicyAlias(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
Expand Down
Loading

0 comments on commit fb19b4b

Please sign in to comment.