Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make EnrichPolicyRunner more properly async #111321

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -126,8 +127,9 @@ public void runPolicyLocally(
}

task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.SCHEDULED));
Runnable runnable = createPolicyRunner(policyName, policy, enrichIndexName, task, listener);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
var policyRunner = createPolicyRunner(policyName, policy, enrichIndexName, task);
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(ActionRunnable.wrap(ActionListener.assertOnce(listener), policyRunner::run));
} catch (Exception e) {
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.FAILED));
throw e;
Expand Down Expand Up @@ -206,18 +208,16 @@ public void onFailure(Exception exception) {
});
}

private Runnable createPolicyRunner(
private EnrichPolicyRunner createPolicyRunner(
String policyName,
EnrichPolicy policy,
String enrichIndexName,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener
ExecuteEnrichPolicyTask task
) {
return new EnrichPolicyRunner(
policyName,
policy,
task,
listener,
clusterService,
indicesService,
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

public class EnrichPolicyRunner implements Runnable {
public class EnrichPolicyRunner {

private static final Logger logger = LogManager.getLogger(EnrichPolicyRunner.class);

Expand All @@ -92,7 +92,6 @@ public class EnrichPolicyRunner implements Runnable {
private final String policyName;
private final EnrichPolicy policy;
private final ExecuteEnrichPolicyTask task;
private final ActionListener<ExecuteEnrichPolicyStatus> listener;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final Client client;
Expand All @@ -105,7 +104,6 @@ public class EnrichPolicyRunner implements Runnable {
String policyName,
EnrichPolicy policy,
ExecuteEnrichPolicyTask task,
ActionListener<ExecuteEnrichPolicyStatus> listener,
ClusterService clusterService,
IndicesService indicesService,
Client client,
Expand All @@ -117,7 +115,6 @@ public class EnrichPolicyRunner implements Runnable {
this.policyName = Objects.requireNonNull(policyName);
this.policy = Objects.requireNonNull(policy);
this.task = Objects.requireNonNull(task);
this.listener = Objects.requireNonNull(listener);
this.clusterService = Objects.requireNonNull(clusterService);
this.indicesService = indicesService;
this.client = wrapClient(client, policyName, task, clusterService);
Expand All @@ -127,8 +124,7 @@ public class EnrichPolicyRunner implements Runnable {
this.maxForceMergeAttempts = maxForceMergeAttempts;
}

@Override
public void run() {
public void run(ActionListener<ExecuteEnrichPolicyStatus> listener) {
try {
logger.info("Policy [{}]: Running enrich policy", policyName);
task.setStatus(new ExecuteEnrichPolicyStatus(ExecuteEnrichPolicyStatus.PolicyPhases.RUNNING));
Expand All @@ -139,7 +135,7 @@ public void run() {
// This call does not set the origin to ensure that the user executing the policy has permission to access the source index
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex(toMappings(getIndexResponse), clusterService.getSettings());
prepareAndCreateEnrichIndex(toMappings(getIndexResponse), clusterService.getSettings(), l);
}));
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -204,9 +200,7 @@ static void validateMappings(
}
}

private record MappingTypeAndFormat(String type, String format) {

}
private record MappingTypeAndFormat(String type, String format) {}

private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat(
String fieldName,
Expand Down Expand Up @@ -436,7 +430,11 @@ static boolean isIndexableField(MapperService mapperService, String field, Strin
}
}

private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings, Settings settings) {
private void prepareAndCreateEnrichIndex(
List<Map<String, Object>> mappings,
Settings settings,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
int numberOfReplicas = settings.getAsInt(ENRICH_MIN_NUMBER_OF_REPLICAS_NAME, 0);
Settings enrichIndexSettings = Settings.builder()
.put("index.number_of_shards", 1)
Expand All @@ -453,23 +451,23 @@ private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings, Set
.indices()
.create(
createEnrichIndexRequest,
listener.delegateFailure((l, createIndexResponse) -> prepareReindexOperation(enrichIndexName))
listener.delegateFailure((l, createIndexResponse) -> prepareReindexOperation(enrichIndexName, l))
);
}

private void prepareReindexOperation(final String destinationIndexName) {
private void prepareReindexOperation(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
// Check to make sure that the enrich pipeline exists, and create it if it is missing.
if (EnrichPolicyReindexPipeline.exists(clusterService.state()) == false) {
EnrichPolicyReindexPipeline.create(
enrichOriginClient(),
listener.delegateFailure((l, r) -> transferDataToEnrichIndex(destinationIndexName))
listener.delegateFailure((l, r) -> transferDataToEnrichIndex(destinationIndexName, l))
);
} else {
transferDataToEnrichIndex(destinationIndexName);
transferDataToEnrichIndex(destinationIndexName, listener);
}
}

private void transferDataToEnrichIndex(final String destinationIndexName) {
private void transferDataToEnrichIndex(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
logger.debug("Policy [{}]: Transferring source data to new enrich index [{}]", policyName, destinationIndexName);
// Filter down the source fields to just the ones required by the policy
final Set<String> retainFields = new HashSet<>();
Expand Down Expand Up @@ -540,13 +538,17 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
bulkByScrollResponse.getCreated(),
destinationIndexName
);
forceMergeEnrichIndex(destinationIndexName, 1);
forceMergeEnrichIndex(destinationIndexName, 1, delegate);
}
}
});
}

private void forceMergeEnrichIndex(final String destinationIndexName, final int attempt) {
private void forceMergeEnrichIndex(
final String destinationIndexName,
final int attempt,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
logger.debug(
"Policy [{}]: Force merging newly created enrich index [{}] (Attempt {}/{})",
policyName,
Expand All @@ -558,21 +560,29 @@ private void forceMergeEnrichIndex(final String destinationIndexName, final int
.indices()
.forceMerge(
new ForceMergeRequest(destinationIndexName).maxNumSegments(1),
listener.delegateFailure((l, r) -> refreshEnrichIndex(destinationIndexName, attempt))
listener.delegateFailure((l, r) -> refreshEnrichIndex(destinationIndexName, attempt, l))
);
}

private void refreshEnrichIndex(final String destinationIndexName, final int attempt) {
private void refreshEnrichIndex(
final String destinationIndexName,
final int attempt,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
logger.debug("Policy [{}]: Refreshing enrich index [{}]", policyName, destinationIndexName);
enrichOriginClient().admin()
.indices()
.refresh(
new RefreshRequest(destinationIndexName),
listener.delegateFailure((l, r) -> ensureSingleSegment(destinationIndexName, attempt))
listener.delegateFailure((l, r) -> ensureSingleSegment(destinationIndexName, attempt, l))
);
}

protected void ensureSingleSegment(final String destinationIndexName, final int attempt) {
protected void ensureSingleSegment(
final String destinationIndexName,
final int attempt,
ActionListener<ExecuteEnrichPolicyStatus> listener
) {
enrichOriginClient().admin()
.indices()
.segments(new IndicesSegmentsRequest(destinationIndexName), listener.delegateFailureAndWrap((l, indicesSegmentResponse) -> {
Expand Down Expand Up @@ -644,29 +654,29 @@ protected void ensureSingleSegment(final String destinationIndexName, final int
nextAttempt,
maxForceMergeAttempts
);
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
forceMergeEnrichIndex(destinationIndexName, nextAttempt, listener);
}
} else {
// Force merge down to one segment successful
setIndexReadOnly(destinationIndexName);
setIndexReadOnly(destinationIndexName, listener);
}
}));
}

private void setIndexReadOnly(final String destinationIndexName) {
private void setIndexReadOnly(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
logger.debug("Policy [{}]: Setting new enrich index [{}] to be read only", policyName, destinationIndexName);
UpdateSettingsRequest request = new UpdateSettingsRequest(destinationIndexName).setPreserveExisting(true)
.settings(Settings.builder().put("index.auto_expand_replicas", "0-all").put("index.blocks.write", "true"));
enrichOriginClient().admin()
.indices()
.updateSettings(request, listener.delegateFailure((l, r) -> waitForIndexGreen(destinationIndexName)));
.updateSettings(request, listener.delegateFailure((l, r) -> waitForIndexGreen(destinationIndexName, l)));
}

private void waitForIndexGreen(final String destinationIndexName) {
private void waitForIndexGreen(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus();
enrichOriginClient().admin()
.cluster()
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName, l)));
}

/**
Expand Down Expand Up @@ -720,7 +730,7 @@ private void validateIndexBeforePromotion(String destinationIndexName, ClusterSt
}
}

private void updateEnrichPolicyAlias(final String destinationIndexName) {
private void updateEnrichPolicyAlias(final String destinationIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
Expand Down
Loading