Skip to content

Commit

Permalink
[PLAT-14036] Enable auto retry of aborted shutdown tasks on internal …
Browse files Browse the repository at this point in the history
…portals

Summary:
The changes are
1. Remove the duplicate method getTaskUUID() of TaskInfo and update all the references. (The number of files is large due to this).
2  Save YBA version information in the task to know which YBA is the creator.
2. Add runtime config to retry tasks with the same version within certain time window.
3. Ignore any retry submission error.
4. Set correlation ID correctly.

Test Plan:
1. UT passed.
2. CreateU and EditU ran on two universes.
3. Aborted the tasks and retry to see if retries still work with the cleanup changes.
4. Shutdown down YBA.
5. Restarted YBA and verified.

Reviewers: amalyshev, yshchetinin, sanketh, vkumar

Reviewed By: amalyshev

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D38693
  • Loading branch information
nkhogen committed Oct 7, 2024
1 parent 730c883 commit ecfc1f9
Show file tree
Hide file tree
Showing 33 changed files with 453 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ public CompletableFuture<?> executeJob(RuntimeParams runtime) {
String errMsg =
String.format(
"Auto master failover task %s (%s) failed for universe %s",
tf.getTaskType(),
tf.getTaskUUID(),
universe.getUniverseUUID());
tf.getTaskType(), tf.getUuid(), universe.getUniverseUUID());
throw new RuntimeException(errMsg);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import com.google.inject.Singleton;
import com.yugabyte.yw.commissioner.TaskExecutor.RunnableTask;
import com.yugabyte.yw.commissioner.TaskExecutor.TaskExecutionListener;
import com.yugabyte.yw.common.*;
import com.yugabyte.yw.common.PlatformExecutorFactory;
import com.yugabyte.yw.common.PlatformServiceException;
import com.yugabyte.yw.common.ProviderEditRestrictionManager;
import com.yugabyte.yw.common.RedactingService;
import com.yugabyte.yw.common.RedactingService.RedactionTarget;
import com.yugabyte.yw.common.backuprestore.BackupUtil;
import com.yugabyte.yw.common.config.GlobalConfKeys;
Expand All @@ -26,9 +29,20 @@
import com.yugabyte.yw.models.TaskInfo;
import com.yugabyte.yw.models.Universe;
import com.yugabyte.yw.models.helpers.TaskType;
import java.util.*;
import java.util.concurrent.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import play.inject.ApplicationLifecycle;
Expand Down Expand Up @@ -77,7 +91,7 @@ public Commissioner(
* @param taskType the task type.
* @return true if abortable.
*/
public boolean isTaskAbortable(TaskType taskType) {
public static boolean isTaskTypeAbortable(TaskType taskType) {
return TaskExecutor.isTaskAbortable(taskType.getTaskClass());
}

Expand All @@ -87,7 +101,7 @@ public boolean isTaskAbortable(TaskType taskType) {
* @param taskType the task type.
* @return true if retryable.
*/
public boolean isTaskRetryable(TaskType taskType) {
public static boolean isTaskTypeRetryable(TaskType taskType) {
return TaskExecutor.isTaskRetryable(taskType.getTaskClass());
}

Expand Down Expand Up @@ -159,7 +173,7 @@ private void onTaskCreated(RunnableTask taskRunnable, ITaskParams taskParams) {
*/
public boolean abortTask(UUID taskUUID, boolean force) {
TaskInfo taskInfo = TaskInfo.getOrBadRequest(taskUUID);
if (!force && !isTaskAbortable(taskInfo.getTaskType())) {
if (!force && !isTaskTypeAbortable(taskInfo.getTaskType())) {
throw new PlatformServiceException(
BAD_REQUEST, String.format("Invalid task type: Task %s cannot be aborted", taskUUID));
}
Expand Down Expand Up @@ -251,7 +265,7 @@ public Optional<ObjectNode> buildTaskStatus(

// Get subtask groups and add other details to it if applicable.
UserTaskDetails userTaskDetails;
Optional<RunnableTask> optional = taskExecutor.maybeGetRunnableTask(taskInfo.getTaskUUID());
Optional<RunnableTask> optional = taskExecutor.maybeGetRunnableTask(taskInfo.getUuid());
if (optional.isPresent()) {
userTaskDetails = taskInfo.getUserTaskDetails(optional.get().getTaskCache());
} else {
Expand All @@ -273,34 +287,42 @@ public Optional<ObjectNode> buildTaskStatus(
responseJson.set("details", details);

// Set abortable if eligible.
responseJson.put("abortable", false);
if (taskExecutor.isTaskRunning(task.getTaskUUID())) {
// Task is abortable only when it is running.
responseJson.put("abortable", isTaskAbortable(taskInfo.getTaskType()));
}

boolean retryable = false;
responseJson.put("abortable", isTaskAbortable(taskInfo));
// Set retryable if eligible.
if (isTaskRetryable(taskInfo.getTaskType())
&& TaskInfo.ERROR_STATES.contains(taskInfo.getTaskState())) {
if (task.getTargetType() == CustomerTask.TargetType.Provider) {
CustomerTask lastTask = lastTaskByTarget.get(task.getTargetUUID());
retryable = lastTask != null && lastTask.getTaskUUID().equals(task.getTaskUUID());
} else {
Set<String> taskUuidsToAllowRetry =
updatingTasks.getOrDefault(task.getTargetUUID(), Collections.emptySet());
retryable = taskUuidsToAllowRetry.contains(taskInfo.getTaskUUID().toString());
}
}
boolean retryable =
isTaskRetryable(
taskInfo,
tf -> {
if (task.getTargetType() == CustomerTask.TargetType.Provider) {
CustomerTask lastTask = lastTaskByTarget.get(task.getTargetUUID());
return lastTask != null && lastTask.getTaskUUID().equals(task.getTaskUUID());
}
Set<String> taskUuidsToAllowRetry =
updatingTasks.getOrDefault(task.getTargetUUID(), Collections.emptySet());
return taskUuidsToAllowRetry.contains(taskInfo.getUuid().toString());
});
responseJson.put("retryable", retryable);
if (isTaskPaused(taskInfo.getTaskUUID())) {
if (isTaskPaused(taskInfo.getUuid())) {
// Set this only if it is true. The thread is just parking. From the task state
// perspective, it is still running.
responseJson.put("paused", true);
}
return Optional.of(responseJson);
}

public boolean isTaskAbortable(TaskInfo taskInfo) {
return isTaskTypeAbortable(taskInfo.getTaskType())
&& taskExecutor.isTaskRunning(taskInfo.getUuid());
}

public boolean isTaskRetryable(TaskInfo taskInfo, Predicate<TaskInfo> moreCondition) {
if (isTaskTypeRetryable(taskInfo.getTaskType())
&& TaskInfo.ERROR_STATES.contains(taskInfo.getTaskState())) {
return moreCondition.test(taskInfo);
}
return false;
}

public ObjectNode getVersionInfo(CustomerTask task, TaskInfo taskInfo) {
ObjectNode versionNumbers = Json.newObject();
JsonNode taskParams = taskInfo.getTaskParams();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public DefaultTaskExecutionListener(

@Override
public void beforeTask(TaskInfo taskInfo) {
MDC.put(Commissioner.TASK_ID, taskInfo.getTaskUUID().toString());
MDC.put(Commissioner.TASK_ID, taskInfo.getUuid().toString());
log.info("About to execute task {}", taskInfo);
if (beforeTaskConsumer != null) {
beforeTaskConsumer.accept(taskInfo);
Expand All @@ -34,6 +34,6 @@ public void beforeTask(TaskInfo taskInfo) {
public void afterTask(TaskInfo taskInfo, Throwable t) {
MDC.remove(Commissioner.TASK_ID);
log.info("Task {} is completed", taskInfo);
providerEditRestrictionManager.onTaskFinished(taskInfo.getTaskUUID());
providerEditRestrictionManager.onTaskFinished(taskInfo.getUuid());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ TaskInfo createTaskInfo(ITask task, UUID taskUUID) {
RedactingService.filterSecretFields(task.getTaskParams(), RedactionTarget.APIS));
// Set the owner info.
taskInfo.setOwner(taskOwner);
taskInfo.setVersion(Util.getYbaVersion());
return taskInfo;
}

Expand Down Expand Up @@ -978,7 +979,7 @@ public String toString() {
}

public UUID getTaskUUID() {
return taskInfo.getTaskUUID();
return taskInfo.getUuid();
}

// This is invoked from tasks to save the updated task details generally in transaction with
Expand Down Expand Up @@ -1075,7 +1076,7 @@ synchronized void updateTaskDetailsOnError(TaskInfo.State state, Throwable t) {
log.error(
"Failed to execute task type {} UUID {} details {}, hit error.",
taskInfo.getTaskType(),
taskInfo.getTaskUUID(),
taskInfo.getUuid(),
redactedTaskParams,
t);

Expand Down Expand Up @@ -1143,7 +1144,7 @@ public TaskCache getTaskCache() {
/** Invoked by the ExecutorService. Do not invoke this directly. */
@Override
public void run() {
UUID taskUUID = getTaskInfo().getTaskUUID();
UUID taskUUID = getTaskInfo().getUuid();
try {
getTask().setUserTaskUUID(taskUUID);
super.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Slf4j
public class CheckNodeReachable extends NodeTaskBase {

private NodeUniverseManager nodeUniverseManager;
private final NodeUniverseManager nodeUniverseManager;

@Inject
protected CheckNodeReachable(
Expand Down
6 changes: 2 additions & 4 deletions managed/src/main/java/com/yugabyte/yw/common/AppInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public AppInit(
throws ReflectiveOperationException {
try {
log.info("Yugaware Application has started");

setYbaVersion(ConfigHelper.getCurrentVersion(environment));
if (environment.isTest()) {
String dbDriverKey = "db.default.driver";
if (config.hasPath(dbDriverKey)) {
Expand Down Expand Up @@ -205,7 +205,6 @@ public AppInit(
for (ExtraMigration m : ExtraMigration.getAll()) {
m.run(extraMigrationManager);
}

// Import new local releases into release metadata
releaseManager.importLocalReleases();
releaseManager.updateCurrentReleases();
Expand Down Expand Up @@ -248,6 +247,7 @@ public AppInit(
taskManager.handleAllPendingTasks();
taskManager.updateUniverseSoftwareUpgradeStateSet();
taskManager.handlePendingConsistencyTasks();
taskManager.handleAutoRetryAbortedTasks();

// Fail all incomplete support bundle creations.
supportBundleCleanup.markAllRunningSupportBundlesFailed();
Expand Down Expand Up @@ -319,8 +319,6 @@ public AppInit(
log.info("Completed initialization in " + elapsedStr + " seconds.");
}

setYbaVersion(ConfigHelper.getCurrentVersion(environment));

// Fix up DB paths again to handle any new files (ybc) that moved during AppInit.
if (config.getBoolean("yb.fixPaths")) {
log.debug("Fixing up file paths a second time.");
Expand Down
Loading

0 comments on commit ecfc1f9

Please sign in to comment.