From fbf335dcf147ccc65843cc83e9ca96804269a4ca Mon Sep 17 00:00:00 2001 From: Nikola Grcevski <6207777+grcevski@users.noreply.github.com> Date: Wed, 25 May 2022 09:39:51 -0400 Subject: [PATCH] Extract transport cluster settings/ilm execute logic (#86941) Extract execute logic from the transport actions for cluster update settings and ILM put/delete to support future reuse for operator file based updates. Relates to #86224 --- .../TransportClusterUpdateSettingsAction.java | 50 ++++--- .../TransportDeleteLifecycleAction.java | 63 +++++---- .../action/TransportPutLifecycleAction.java | 124 +++++++++++------- 3 files changed, 143 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 3dfcce005e190..4f19aad41bc5e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; @@ -137,11 +138,7 @@ protected void masterOperation( final ClusterState state, final ActionListener listener ) { - final SettingsUpdater updater = new SettingsUpdater(clusterSettings); - submitUnbatchedTask(UPDATE_TASK_SOURCE, new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { - - private volatile boolean changed = false; - + submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) { @Override protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate()); @@ -225,21 +222,40 @@ public void onFailure(Exception e) { logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e); super.onFailure(e); } - - @Override - public ClusterState execute(final ClusterState currentState) { - final ClusterState clusterState = updater.updateSettings( - currentState, - clusterSettings.upgradeSettings(request.transientSettings()), - clusterSettings.upgradeSettings(request.persistentSettings()), - logger - ); - changed = clusterState != currentState; - return clusterState; - } }); } + public static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask { + protected volatile boolean changed = false; + protected final SettingsUpdater updater; + protected final ClusterUpdateSettingsRequest request; + private final ClusterSettings clusterSettings; + + public ClusterUpdateSettingsTask( + final ClusterSettings clusterSettings, + Priority priority, + ClusterUpdateSettingsRequest request, + ActionListener listener + ) { + super(priority, request, listener); + this.clusterSettings = clusterSettings; + this.updater = new SettingsUpdater(clusterSettings); + this.request = request; + } + + @Override + public ClusterState execute(final ClusterState currentState) { + final ClusterState clusterState = updater.updateSettings( + currentState, + clusterSettings.upgradeSettings(request.transientSettings()), + clusterSettings.upgradeSettings(request.persistentSettings()), + logger + ); + changed = clusterState != currentState; + return clusterState; + } + } + @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { clusterService.submitUnbatchedStateUpdateTask(source, task); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java index 458cd3e183927..1387093782e48 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java @@ -59,34 +59,43 @@ public TransportDeleteLifecycleAction( @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { - submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - String policyToDelete = request.getPolicyName(); - List indicesUsingPolicy = currentState.metadata() - .indices() - .values() - .stream() - .filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName())) - .map(idxMeta -> idxMeta.getIndex().getName()) - .toList(); - if (indicesUsingPolicy.isEmpty() == false) { - throw new IllegalArgumentException( - "Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy - ); - } - ClusterState.Builder newState = ClusterState.builder(currentState); - IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE); - if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) { - throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName()); - } - SortedMap newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas()); - newPolicies.remove(request.getPolicyName()); - IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode()); - newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); - return newState.build(); + submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new DeleteLifecyclePolicyTask(request, listener)); + } + + public static class DeleteLifecyclePolicyTask extends AckedClusterStateUpdateTask { + private final Request request; + + public DeleteLifecyclePolicyTask(Request request, ActionListener listener) { + super(request, listener); + this.request = request; + } + + @Override + public ClusterState execute(ClusterState currentState) { + String policyToDelete = request.getPolicyName(); + List indicesUsingPolicy = currentState.metadata() + .indices() + .values() + .stream() + .filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName())) + .map(idxMeta -> idxMeta.getIndex().getName()) + .toList(); + if (indicesUsingPolicy.isEmpty() == false) { + throw new IllegalArgumentException( + "Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy + ); + } + ClusterState.Builder newState = ClusterState.builder(currentState); + IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE); + if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) { + throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName()); } - }); + SortedMap newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas()); + newPolicies.remove(request.getPolicyName()); + IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode()); + newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); + return newState.build(); + } } @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java index 9e136fc15238e..7baecc4754672 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java @@ -109,61 +109,85 @@ protected void masterOperation(Task task, Request request, ClusterState state, A } } - submitUnbatchedTask("put-lifecycle-" + request.getPolicy().getName(), new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - final IndexLifecycleMetadata currentMetadata = currentState.metadata() - .custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); - final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas() - .get(request.getPolicy().getName()); + submitUnbatchedTask( + "put-lifecycle-" + request.getPolicy().getName(), + new UpdateLifecyclePolicyTask(request, listener, licenseState, filteredHeaders, xContentRegistry, client) + ); + } - // Double-check for no-op in the state update task, in case it was changed/reset in the meantime - if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) { - return currentState; - } + public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask { + private final Request request; + private final XPackLicenseState licenseState; + private final Map filteredHeaders; + private final NamedXContentRegistry xContentRegistry; + private final Client client; - validatePrerequisites(request.getPolicy(), currentState); + public UpdateLifecyclePolicyTask( + Request request, + ActionListener listener, + XPackLicenseState licenseState, + Map filteredHeaders, + NamedXContentRegistry xContentRegistry, + Client client + ) { + super(request, listener); + this.request = request; + this.licenseState = licenseState; + this.filteredHeaders = filteredHeaders; + this.xContentRegistry = xContentRegistry; + this.client = client; + } - ClusterState.Builder stateBuilder = ClusterState.builder(currentState); - long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L; - SortedMap newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas()); - LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata( - request.getPolicy(), - filteredHeaders, - nextVersion, - Instant.now().toEpochMilli() - ); - LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata); - if (oldPolicy == null) { - logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName()); - } else { - logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); - } - IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode()); - stateBuilder.metadata( - Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build() - ); - ClusterState nonRefreshedState = stateBuilder.build(); - if (oldPolicy == null) { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + final IndexLifecycleMetadata currentMetadata = currentState.metadata() + .custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); + final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas().get(request.getPolicy().getName()); + + // Double-check for no-op in the state update task, in case it was changed/reset in the meantime + if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) { + return currentState; + } + + validatePrerequisites(request.getPolicy(), currentState, licenseState); + + ClusterState.Builder stateBuilder = ClusterState.builder(currentState); + long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L; + SortedMap newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas()); + LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata( + request.getPolicy(), + filteredHeaders, + nextVersion, + Instant.now().toEpochMilli() + ); + LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata); + if (oldPolicy == null) { + logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName()); + } else { + logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); + } + IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode()); + stateBuilder.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); + ClusterState nonRefreshedState = stateBuilder.build(); + if (oldPolicy == null) { + return nonRefreshedState; + } else { + try { + return updateIndicesForPolicy( + nonRefreshedState, + xContentRegistry, + client, + oldPolicy.getPolicy(), + lifecyclePolicyMetadata, + licenseState + ); + } catch (Exception e) { + logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e); + // Revert to the non-refreshed state return nonRefreshedState; - } else { - try { - return updateIndicesForPolicy( - nonRefreshedState, - xContentRegistry, - client, - oldPolicy.getPolicy(), - lifecyclePolicyMetadata, - licenseState - ); - } catch (Exception e) { - logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e); - // Revert to the non-refreshed state - return nonRefreshedState; - } } } - }); + } } @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here @@ -193,7 +217,7 @@ static boolean isNoopUpdate( * @param policy The lifecycle policy * @param state The cluster state */ - private void validatePrerequisites(LifecyclePolicy policy, ClusterState state) { + private static void validatePrerequisites(LifecyclePolicy policy, ClusterState state, XPackLicenseState licenseState) { List phasesWithSearchableSnapshotActions = policy.getPhases() .values() .stream()