Skip to content

Commit

Permalink
Extract transport cluster settings/ilm execute logic (elastic#86941)
Browse files Browse the repository at this point in the history
Extract execute logic from the transport actions for cluster
update settings and ILM put/delete to support future reuse for
operator file based updates.

Relates to elastic#86224
  • Loading branch information
grcevski authored and salvatore-campagna committed May 26, 2022
1 parent 909bdd3 commit 8e24f5c
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -137,11 +138,7 @@ protected void masterOperation(
final ClusterState state,
final ActionListener<ClusterUpdateSettingsResponse> listener
) {
final SettingsUpdater updater = new SettingsUpdater(clusterSettings);
submitUnbatchedTask(UPDATE_TASK_SOURCE, new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) {

private volatile boolean changed = false;

submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
@Override
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
Expand Down Expand Up @@ -225,21 +222,40 @@ public void onFailure(Exception e) {
logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e);
super.onFailure(e);
}

@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState clusterState = updater.updateSettings(
currentState,
clusterSettings.upgradeSettings(request.transientSettings()),
clusterSettings.upgradeSettings(request.persistentSettings()),
logger
);
changed = clusterState != currentState;
return clusterState;
}
});
}

public static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask {
protected volatile boolean changed = false;
protected final SettingsUpdater updater;
protected final ClusterUpdateSettingsRequest request;
private final ClusterSettings clusterSettings;

public ClusterUpdateSettingsTask(
final ClusterSettings clusterSettings,
Priority priority,
ClusterUpdateSettingsRequest request,
ActionListener<? extends AcknowledgedResponse> listener
) {
super(priority, request, listener);
this.clusterSettings = clusterSettings;
this.updater = new SettingsUpdater(clusterSettings);
this.request = request;
}

@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState clusterState = updater.updateSettings(
currentState,
clusterSettings.upgradeSettings(request.transientSettings()),
clusterSettings.upgradeSettings(request.persistentSettings()),
logger
);
changed = clusterState != currentState;
return clusterState;
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,34 +59,43 @@ public TransportDeleteLifecycleAction(

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
String policyToDelete = request.getPolicyName();
List<String> indicesUsingPolicy = currentState.metadata()
.indices()
.values()
.stream()
.filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
.map(idxMeta -> idxMeta.getIndex().getName())
.toList();
if (indicesUsingPolicy.isEmpty() == false) {
throw new IllegalArgumentException(
"Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
);
}
ClusterState.Builder newState = ClusterState.builder(currentState);
IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
}
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
newPolicies.remove(request.getPolicyName());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
return newState.build();
submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new DeleteLifecyclePolicyTask(request, listener));
}

public static class DeleteLifecyclePolicyTask extends AckedClusterStateUpdateTask {
private final Request request;

public DeleteLifecyclePolicyTask(Request request, ActionListener<AcknowledgedResponse> listener) {
super(request, listener);
this.request = request;
}

@Override
public ClusterState execute(ClusterState currentState) {
String policyToDelete = request.getPolicyName();
List<String> indicesUsingPolicy = currentState.metadata()
.indices()
.values()
.stream()
.filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
.map(idxMeta -> idxMeta.getIndex().getName())
.toList();
if (indicesUsingPolicy.isEmpty() == false) {
throw new IllegalArgumentException(
"Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
);
}
ClusterState.Builder newState = ClusterState.builder(currentState);
IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
}
});
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
newPolicies.remove(request.getPolicyName());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
return newState.build();
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,61 +109,85 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
}
}

submitUnbatchedTask("put-lifecycle-" + request.getPolicy().getName(), new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final IndexLifecycleMetadata currentMetadata = currentState.metadata()
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas()
.get(request.getPolicy().getName());
submitUnbatchedTask(
"put-lifecycle-" + request.getPolicy().getName(),
new UpdateLifecyclePolicyTask(request, listener, licenseState, filteredHeaders, xContentRegistry, client)
);
}

// Double-check for no-op in the state update task, in case it was changed/reset in the meantime
if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
return currentState;
}
public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask {
private final Request request;
private final XPackLicenseState licenseState;
private final Map<String, String> filteredHeaders;
private final NamedXContentRegistry xContentRegistry;
private final Client client;

validatePrerequisites(request.getPolicy(), currentState);
public UpdateLifecyclePolicyTask(
Request request,
ActionListener<AcknowledgedResponse> listener,
XPackLicenseState licenseState,
Map<String, String> filteredHeaders,
NamedXContentRegistry xContentRegistry,
Client client
) {
super(request, listener);
this.request = request;
this.licenseState = licenseState;
this.filteredHeaders = filteredHeaders;
this.xContentRegistry = xContentRegistry;
this.client = client;
}

ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
request.getPolicy(),
filteredHeaders,
nextVersion,
Instant.now().toEpochMilli()
);
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
if (oldPolicy == null) {
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
} else {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
stateBuilder.metadata(
Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()
);
ClusterState nonRefreshedState = stateBuilder.build();
if (oldPolicy == null) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final IndexLifecycleMetadata currentMetadata = currentState.metadata()
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas().get(request.getPolicy().getName());

// Double-check for no-op in the state update task, in case it was changed/reset in the meantime
if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
return currentState;
}

validatePrerequisites(request.getPolicy(), currentState, licenseState);

ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
request.getPolicy(),
filteredHeaders,
nextVersion,
Instant.now().toEpochMilli()
);
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
if (oldPolicy == null) {
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
} else {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
stateBuilder.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
ClusterState nonRefreshedState = stateBuilder.build();
if (oldPolicy == null) {
return nonRefreshedState;
} else {
try {
return updateIndicesForPolicy(
nonRefreshedState,
xContentRegistry,
client,
oldPolicy.getPolicy(),
lifecyclePolicyMetadata,
licenseState
);
} catch (Exception e) {
logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
// Revert to the non-refreshed state
return nonRefreshedState;
} else {
try {
return updateIndicesForPolicy(
nonRefreshedState,
xContentRegistry,
client,
oldPolicy.getPolicy(),
lifecyclePolicyMetadata,
licenseState
);
} catch (Exception e) {
logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
// Revert to the non-refreshed state
return nonRefreshedState;
}
}
}
});
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
Expand Down Expand Up @@ -193,7 +217,7 @@ static boolean isNoopUpdate(
* @param policy The lifecycle policy
* @param state The cluster state
*/
private void validatePrerequisites(LifecyclePolicy policy, ClusterState state) {
private static void validatePrerequisites(LifecyclePolicy policy, ClusterState state, XPackLicenseState licenseState) {
List<Phase> phasesWithSearchableSnapshotActions = policy.getPhases()
.values()
.stream()
Expand Down

0 comments on commit 8e24f5c

Please sign in to comment.