Skip to content

Commit

Permalink
Remove AllocatedPersistentTask.getState() (#30858)
Browse files Browse the repository at this point in the history
This commit removes the method AllocatedPersistentTask.getState() that
exposes the internal state of an AllocatedPersistentTask and replaces
it with a new isCompleted() method. Related to #29608.
  • Loading branch information
tlrx committed May 29, 2018
1 parent 857f3d3 commit ac1b5c7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;

Expand All @@ -38,18 +37,16 @@
* Represents a executor node operation that corresponds to a persistent task
*/
public class AllocatedPersistentTask extends CancellableTask {
private volatile String persistentTaskId;
private volatile long allocationId;

private final AtomicReference<State> state;
@Nullable
private volatile Exception failure;

private volatile String persistentTaskId;
private volatile long allocationId;
private volatile @Nullable Exception failure;
private volatile PersistentTasksService persistentTasksService;
private volatile Logger logger;
private volatile TaskManager taskManager;


public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
Expand Down Expand Up @@ -101,24 +98,10 @@ public Exception getFailure() {
return failure;
}

boolean markAsCancelled() {
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
}

public State getState() {
return state.get();
}

public long getAllocationId() {
return allocationId;
}

public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}

/**
* Waits for this persistent task to have the desired state.
*/
Expand All @@ -128,6 +111,14 @@ public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
}

final boolean isCompleted() {
return state.get() == State.COMPLETED;
}

boolean markAsCancelled() {
return state.compareAndSet(State.STARTED, State.PENDING_CANCEL);
}

public void markAsCompleted() {
completeAndNotifyIfNeeded(null);
}
Expand All @@ -138,11 +129,10 @@ public void markAsFailed(Exception e) {
} else {
completeAndNotifyIfNeeded(e);
}

}

private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
final State prevState = state.getAndSet(State.COMPLETED);
if (prevState == State.COMPLETED) {
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
} else {
Expand Down Expand Up @@ -173,4 +163,10 @@ public void onFailure(Exception e) {
}
}
}

public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void clusterChanged(ClusterChangedEvent event) {

for (Long id : notVisitedTasks) {
AllocatedPersistentTask task = runningTasks.get(id);
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
if (task.isCompleted()) {
// Result was sent to the caller and the caller acknowledged acceptance of the result
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
Expand All @@ -73,7 +74,6 @@ public void setUp() throws Exception {
threadPool = new TestThreadPool(getClass().getName());
}


@Override
@After
public void tearDown() throws Exception {
Expand All @@ -95,7 +95,7 @@ private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings
return state.build();
}

public void testStartTask() throws Exception {
public void testStartTask() {
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
Expand Down Expand Up @@ -131,8 +131,8 @@ public void testStartTask() throws Exception {

if (added == false) {
logger.info("No local node action was added");

}

MetaData.Builder metaData = MetaData.builder(state.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
Expand All @@ -149,6 +149,7 @@ public void testStartTask() throws Exception {

// Make sure action wasn't called again
assertThat(executor.executions.size(), equalTo(1));
assertThat(executor.get(0).task.isCompleted(), is(false));

// Start another task on this node
state = newClusterState;
Expand All @@ -157,10 +158,15 @@ public void testStartTask() throws Exception {

// Make sure action was called this time
assertThat(executor.size(), equalTo(2));
assertThat(executor.get(1).task.isCompleted(), is(false));

// Finish both tasks
executor.get(0).task.markAsFailed(new RuntimeException());
executor.get(1).task.markAsCompleted();

assertThat(executor.get(0).task.isCompleted(), is(true));
assertThat(executor.get(1).task.isCompleted(), is(true));

String failedTaskId = executor.get(0).task.getPersistentTaskId();
String finishedTaskId = executor.get(1).task.getPersistentTaskId();
executor.clear();
Expand All @@ -186,7 +192,6 @@ public void testStartTask() throws Exception {
// Make sure action was only allocated on this node once
assertThat(executor.size(), equalTo(1));
}

}

public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
Expand Down Expand Up @@ -300,7 +305,6 @@ public void sendCompletionNotification(String taskId, long allocationId, Excepti

// Check the the task is now removed from task manager
assertThat(taskManager.getTasks().values(), empty());

}

private <Params extends PersistentTaskParams> ClusterState addTask(ClusterState state, String action, Params params,
Expand Down

0 comments on commit ac1b5c7

Please sign in to comment.