Skip to content

Commit

Permalink
Remove LegacyClusterTaskResultActionListener (#89459)
Browse files Browse the repository at this point in the history
Also removes the now-unused legacy method
`ClusterStateTaskListener#onClusterStateProcessed`.

Closes #83784
  • Loading branch information
DaveCTurner authored Aug 18, 2022
1 parent c238aa1 commit 18328b0
Show file tree
Hide file tree
Showing 24 changed files with 26 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "should not be called";
}

private ClusterStateAckListener getAckListener(String indexName) {
return new ClusterStateAckListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,10 @@ record RolloverTask(
RolloverResponse trialRolloverResponse,
ActionListener<RolloverResponse> listener
) implements ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

record RolloverExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,6 @@ default String describeTasks(List<T> tasks) {
return output.toString();
}

/**
* A {@link Consumer} for passing to {@link ClusterStateTaskExecutor.TaskContext#success} which preserves the
* legacy behaviour of calling {@link ClusterStateTaskListener#clusterStateProcessed} or {@link ClusterStateTaskListener#onFailure}.
* <p>
* New implementations should use a dedicated listener rather than relying on this legacy behaviour.
*/
// TODO remove all remaining usages of this listener
@Deprecated
record LegacyClusterTaskResultActionListener(ClusterStateTaskListener task, ClusterState originalState)
implements
Consumer<ClusterState> {
@Override
public void accept(ClusterState publishedState) {
task.clusterStateProcessed(originalState, publishedState);
}
}

/**
* A task to be executed, along with callbacks for the executor to record the outcome of this task's execution. The executor must
* call exactly one of these methods for every task in its batch.
Expand All @@ -108,10 +91,7 @@ interface TaskContext<T extends ClusterStateTaskListener> {
* method and must instead call {@link #success(Runnable, ClusterStateAckListener)}, passing the task itself as the {@code
* clusterStateAckListener} argument.
*
* @param onPublicationSuccess An action executed when (if?) the cluster state update succeeds. The task's {@link
* ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master
* service once the task execution has succeeded, but legacy implementations may supply a listener
* which calls this methods.
* @param onPublicationSuccess An action executed when (if?) the cluster state update succeeds.
*/
void success(Runnable onPublicationSuccess);

Expand All @@ -122,10 +102,7 @@ interface TaskContext<T extends ClusterStateTaskListener> {
* method and must instead call {@link #success(Consumer, ClusterStateAckListener)}, passing the task itself as the {@code
* clusterStateAckListener} argument.
*
* @param publishedStateConsumer A consumer of the cluster state that was ultimately published. The task's {@link
* ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master
* service once the task execution has succeeded, but legacy implementations may supply a listener
* which calls this methods.
* @param publishedStateConsumer A consumer of the cluster state that was ultimately published.
* <p>
* The consumer should prefer not to use the published state for things like determining the result
* of a task. The task may have been executed as part of a batch, and later tasks in the batch may
Expand All @@ -143,10 +120,7 @@ interface TaskContext<T extends ClusterStateTaskListener> {
* Note that some tasks implement {@link ClusterStateAckListener} and can listen for acks themselves. If so, you must pass the task
* itself as the {@code clusterStateAckListener} argument.
*
* @param onPublicationSuccess An action executed when (if?) the cluster state update succeeds. The task's {@link
* ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master
* service once the task execution has succeeded, but legacy implementations may supply a listener
* which calls this methods.
* @param onPublicationSuccess An action executed when (if?) the cluster state update succeeds.
*
* @param clusterStateAckListener A listener for acknowledgements from nodes. If the publication succeeds then this listener is
* completed as nodes ack the state update. If the publication fails then the failure
Expand All @@ -160,10 +134,7 @@ interface TaskContext<T extends ClusterStateTaskListener> {
* Note that some tasks implement {@link ClusterStateAckListener} and can listen for acks themselves. If so, you must pass the task
* itself as the {@code clusterStateAckListener} argument.
*
* @param publishedStateConsumer A consumer of the cluster state that was ultimately published. The task's {@link
* ClusterStateTaskListener#clusterStateProcessed} method is not called directly by the master
* service once the task execution has succeeded, but legacy implementations may supply a listener
* which calls this methods.
* @param publishedStateConsumer A consumer of the cluster state that was ultimately published.
* <p>
* The consumer should prefer not to use the published state for things like determining the result
* of a task. The task may have been executed as part of a batch, and later tasks in the batch may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.cluster.service.MasterService;

public interface ClusterStateTaskListener {

/**
* A callback for when task execution fails. May receive a {@link NotMasterException} if this node stopped being the master before this
* task was executed or a {@link ProcessClusterEventTimeoutException} if the task timed out before it was executed. If the task fails
Expand All @@ -28,19 +27,4 @@ public interface ClusterStateTaskListener {
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
void onFailure(Exception e);

/**
* Called when the result of the {@link ClusterStateTaskExecutor#execute} method has been processed properly by all listeners.
*
* The {@param newState} parameter is the state that was ultimately published. This can lead to surprising behaviour if tasks are
* batched together: a later task in the batch may undo or overwrite the changes made by an earlier task. In general you should prefer
* to ignore the published state and instead handle the success of a publication via the listener that the executor passes to
* {@link ClusterStateTaskExecutor.TaskContext#success}.
*
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
// TODO: replace all remaining usages of this method with dedicated listeners and then remove it.
default void clusterStateProcessed(ClusterState oldState, ClusterState newState) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -50,20 +47,15 @@ public ClusterStateUpdateTask(Priority priority, TimeValue timeout) {
public abstract ClusterState execute(ClusterState currentState) throws Exception;

/**
* A callback for when task execution fails. May receive a {@link NotMasterException} if this node stopped being the master before this
* task was executed or a {@link ProcessClusterEventTimeoutException} if the task timed out before it was executed. If the task fails
* during execution then this method receives the corresponding exception. If the task executes successfully but the resulting cluster
* state publication fails then this method receives a {@link FailedToCommitClusterStateException}. If publication fails then a new
* master is elected and the update might or might not take effect, depending on whether or not the newly-elected master accepted the
* published state that failed to be committed.
* <p>
* Use {@link MasterService#isPublishFailureException} to detect the "expected" master failure cases if needed.
* <p>
* Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
* ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
* using a more specific logger and at a less dramatic log level.
* Called when the result of the {@link #execute} method has been processed properly by all listeners.
*
* The {@param newState} parameter is the state that was ultimately published.
*
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
public abstract void onFailure(Exception e);
public void clusterStateProcessed(ClusterState initialState, ClusterState newState) {}

/**
* If the cluster state update task wasn't processed by the provided timeout, call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ public LocalMasterServiceTask(Priority priority) {

protected void execute(ClusterState currentState) {}

@Override
public final void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}

protected void onPublicationComplete() {}

public void submit(MasterService masterService, String source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ public int hashCode() {
public record FailedShardUpdateTask(FailedShardEntry entry, ActionListener<TransportResponse.Empty> listener)
implements
ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
logger.log(
Expand All @@ -548,11 +547,6 @@ public void onFailure(Exception e) {
);
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "should not be called";
}
}

public void shardStarted(
Expand Down Expand Up @@ -859,11 +853,6 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "should not be called";
}

@Override
public String toString() {
return "StartedShardUpdateTask{entry=" + entry + ", listener=" + listener + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.node.DiscoveryNode;

Expand Down Expand Up @@ -38,11 +37,6 @@ public void onFailure(Exception e) {
}
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}

@Override
public String toString() {
final StringBuilder stringBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ public void onFailure(final Exception e) {
logger.log(MasterService.isPublishFailureException(e) ? Level.DEBUG : Level.ERROR, "unexpected failure during [node-left]", e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}

@Override
public String toString() {
final StringBuilder stringBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,10 @@ public ClusterState execute(BatchExecutionContext<AddBlocksToCloseTask> batchExe
private record AddBlocksToCloseTask(CloseIndexClusterStateUpdateRequest request, ActionListener<CloseIndexResponse> listener)
implements
ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

private class CloseIndicesExecutor implements ClusterStateTaskExecutor<CloseIndicesTask> {
Expand Down Expand Up @@ -292,16 +286,10 @@ private record CloseIndicesTask(
Map<Index, CloseIndexResponse.IndexResult> verifyResults,
ActionListener<CloseIndexResponse> listener
) implements ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

/**
Expand Down Expand Up @@ -547,11 +535,6 @@ private record AddBlocksTask(AddIndexBlockClusterStateUpdateRequest request, Act
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

private static class FinalizeBlocksExecutor implements ClusterStateTaskExecutor<FinalizeBlocksTask> {
Expand Down Expand Up @@ -592,16 +575,10 @@ private record FinalizeBlocksTask(
Map<Index, AddBlockResult> verifyResults,
ActionListener<AddIndexBlockResponse> listener
) implements ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

/**
Expand Down Expand Up @@ -1243,10 +1220,5 @@ public void onAckTimeout() {
public TimeValue ackTimeout() {
return request.ackTimeout();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ private abstract static class TemplateClusterStateUpdateTask implements ClusterS
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "should not be called";
}

ClusterState execute(ClusterState currentState) {
final Settings normalizedSettings = Settings.builder()
.put(request.settings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,6 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
*/
abstract static class UpsertHealthMetadataTask implements ClusterStateTaskListener {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "never called";
}

@Override
public void onFailure(@Nullable Exception e) {
logger.error("failure during health metadata update", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ abstract static class PipelineClusterStateUpdateTask implements ClusterStateTask
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "should not be called";
}
}

public IngestService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3381,11 +3381,6 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "never called";
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down
Loading

0 comments on commit 18328b0

Please sign in to comment.