Skip to content

Commit

Permalink
[HWORKS-680] In case of parallelism constraints, if a recipe has fail…
Browse files Browse the repository at this point in the history
…ed do not proceed to the next node in the pipelie but wait until the recipe has run successfully (karamelchef#10)

* [HWORKS-680] Blocked status

[HWORKS-680] In case of parallelism constraints, if a recipe has failed do not proceed to the next node in the pipelie but wait until the recipe has run successfully

* [HWORKS-680] Remove type cast
  • Loading branch information
kouzant authored and SirOibaf committed Sep 11, 2023
1 parent 21502f9 commit e778710
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 5 deletions.
12 changes: 12 additions & 0 deletions karamel-core/src/main/java/se/kth/karamel/backend/dag/DagNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import org.apache.log4j.Logger;
import se.kth.karamel.backend.running.model.tasks.RunRecipeTask;
import se.kth.karamel.backend.running.model.tasks.Task;
import se.kth.karamel.common.clusterdef.json.JsonCluster;
import se.kth.karamel.common.clusterdef.yaml.RuntimeConfiguration;
import se.kth.karamel.common.exception.DagConstructionException;
Expand Down Expand Up @@ -249,6 +250,17 @@ private void releaseSerializedTask() {
RunRecipeTask recipeTask = (RunRecipeTask) task;
RecipeSerialization serialization = dag.getSerializableRecipeCounter(recipeTask.getRecipeCanonicalName());
if (serialization != null) {
synchronized (serialization) {
serialization.setFailedStatus(recipeTask.getStatus().equals(Task.Status.FAILED));
logger.info(String.format("%s: Recipe %s RecipeSerializationID: %s Has recipe failed: %s",
((RunRecipeTask) task).getMachine().getId(),
((RunRecipeTask) task).getName(),
serialization.hashCode(),
serialization.hasFailed()));
logger.debug(String.format("%s: Recipe %s NotifyingAll", ((RunRecipeTask) task).getMachine().getId(),
((RunRecipeTask) task).getName()));
serialization.notifyAll();
}
serialization.release(recipeTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class RecipeSerialization {
private final Integer maxParallelism;
private final Set<RecipeSerializationClaim> claims;
private final ReentrantReadWriteLock claimsLock;
private boolean failed = false;

public RecipeSerialization(Integer parallelism) {
this.parallelism = new Semaphore(parallelism, true);
Expand Down Expand Up @@ -48,6 +49,7 @@ public void prepareToExecute(RunRecipeTask task) throws InterruptedException {
+ " at the moment because parallelism is limited. Available parallelism permits: "
+ parallelism.availablePermits() + "/" + maxParallelism + " - we wait until a permit becomes available."
+ " Current claims: " + printableClaims());
task.blocked();
parallelism.acquire();
}
claimsLock.writeLock().lock();
Expand Down Expand Up @@ -75,4 +77,12 @@ private String printableClaims() {
claimsLock.readLock().unlock();
}
}

public synchronized void setFailedStatus(boolean failed) {
this.failed = failed;
}

public synchronized boolean hasFailed() {
return failed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ public class RecipeSerializationClaim {
private final String id;
private final Instant claimedAt;

public static String serializationClaimId(RunRecipeTask task) {
return String.format("%s@%s", task.getRecipeCanonicalName(), task.getMachineId());
}

public RecipeSerializationClaim(RunRecipeTask task) {
id = String.format("%s@%s", task.getRecipeCanonicalName(), task.getMachineId());
id = serializationClaimId(task);
claimedAt = Instant.now();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.io.SequenceInputStream;
import java.nio.file.Files;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -133,41 +134,108 @@ private boolean anyFailure() {
}

private void prepareSerializedTask(Task task) throws InterruptedException {
Optional<RecipeSerialization> maybeRS = getRecipeSerialization(task);
if (maybeRS.isPresent()) {
RecipeSerialization recipeSerialization = maybeRS.get();
recipeSerialization.prepareToExecute((RunRecipeTask) task);
}
}

private Optional<RecipeSerialization> getRecipeSerialization(Task task) {
if (task instanceof RunRecipeTask) {
RunRecipeTask recipeTask = (RunRecipeTask) task;
RecipeSerialization serialization = task.getDagCallback().getDag()
.getSerializableRecipeCounter(recipeTask.getRecipeCanonicalName());
if (serialization != null) {
serialization.prepareToExecute(recipeTask);
}
return Optional.ofNullable(serialization);
}
return Optional.empty();
}

@Override
public void run() {
logger.debug(String.format("%s: Started SSH_Machine d'-'", machineEntity.getId()));
try {
boolean hasAbortedDueToFailure = false;
while (!stopping.get()) {
try {
if (machineEntity.getLifeStatus() == MachineRuntime.LifeStatus.CONNECTED
&& (machineEntity.getTasksStatus() == MachineRuntime.TasksStatus.ONGOING
|| machineEntity.getTasksStatus() == MachineRuntime.TasksStatus.EMPTY)) {
try {
boolean retry = false;
if (activeTask == null) {
if (taskQueue.isEmpty()) {
machineEntity.setTasksStatus(MachineRuntime.TasksStatus.EMPTY, null, null);
}
activeTask = taskQueue.take();
logger.debug(String.format("%s: Taking a new task from the queue.", machineEntity.getId()));
machineEntity.setTasksStatus(MachineRuntime.TasksStatus.ONGOING, null, null);
hasAbortedDueToFailure = false;
} else {
retry = true;
logger.debug(
String.format("%s: Retrying a task that didn't complete on last execution attempt.",
machineEntity.getId()));
}
logger.debug(String.format("%s: Task for execution.. '%s'", machineEntity.getId(), activeTask.getName()));

Optional<RecipeSerialization> recipeSerialization = getRecipeSerialization(activeTask);
if (recipeSerialization.isPresent()) {
RecipeSerialization rs = recipeSerialization.get();
// In retries we don't check if it has failed because we already know it has failed, hence retried
// If we do check, it will lead to deadlock
//
// After the task gotten the green light to proceed with running the task - parallelism claim
// has been satisfied, we check again if the task has failed. While waiting for the claim to be
// satisfied, the task might have failed on another node. In that case, we continue the loop without
// executing the task. Then we come here, where it is a retry but it has aborted due to failure
// and we wait until the failure is resolved
if (!retry || hasAbortedDueToFailure) {
logger.debug(String.format("%s: retry: %s hasAborted: %s", machineEntity.getId(),
activeTask.getName(), retry, hasAbortedDueToFailure));
synchronized (rs) {
logger.debug(String.format("%s: Recipe %s RecipeSerializationID: %s Has failed: %s",
activeTask.getMachine().getId(),
activeTask.getName(),
rs.hashCode(),
rs.hasFailed()));
if (rs.hasFailed()) {
logger.info(String.format("%s: Recipe %s has failed on another node. Wait until it succeeds",
machineEntity.getId(), activeTask.getName()));
rs.wait();
} else {
logger.debug(String.format("%s: Recipe %s has NOT failed on another node. Executing",
machineEntity.getId(), activeTask.getName()));
}
}
} else {
logger.debug(String.format("%s: %s it is a retry", machineEntity.getId(), activeTask.getName()));
}
}

prepareSerializedTask(activeTask);

if (recipeSerialization.isPresent()) {
RecipeSerialization rs = recipeSerialization.get();
// For explanation read above
if (!retry || hasAbortedDueToFailure) {
synchronized (rs) {
logger.debug(String.format("%s: %s Checking again after getting hold of the lock if recipe has " +
"failed", machineEntity.getId(), activeTask.getName()));
if (rs.hasFailed()) {
logger.info(String.format("%s: %s Recipe has failed on another node, releasing the lock and " +
"continue", machineEntity.getId(), activeTask.getName()));
rs.release((RunRecipeTask) activeTask);
logger.debug(String.format("%s: %s Released and continue", machineEntity.getId(),
activeTask.getName()));
hasAbortedDueToFailure = true;
continue;
}
}
}
}
runTask(activeTask);
hasAbortedDueToFailure = false;
} catch (InterruptedException ex) {
if (stopping.get()) {
logger.debug(String.format("%s: Stopping SSH_Machine", machineEntity.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class Task implements DagTask, TaskCallback {

public static enum Status {

WAITING, READY, EXIST, ONGOING, DONE, FAILED, SKIPPED;
WAITING, BLOCKED, READY, EXIST, ONGOING, DONE, FAILED, SKIPPED;
}
private Status status = Status.WAITING;
private final String name;
Expand Down Expand Up @@ -192,6 +192,10 @@ public void started() {
dagCallback.started();
}

public void blocked() {
status = Status.BLOCKED;
}

@Override
public void succeed() {
status = Status.DONE;
Expand Down

0 comments on commit e778710

Please sign in to comment.