Skip to content

Commit

Permalink
Merge pull request #4455 from inception-project/feature/4452-Keep-rec…
Browse files Browse the repository at this point in the history
…ommender-process-running-when-user-logs-out

#4452 - Keep recommender process running when user logs out
  • Loading branch information
reckart authored Jan 25, 2024
2 parents 92371e3 + 49ef7e3 commit 8bd6619
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public BulkProcessingPanel(String aId, IModel<Project> aModel)
queue(new LambdaAjaxButton<>("startProcessing", this::actionStartProcessing));

queue(new TaskMonitorPanel("runningProcesses").setPopupMode(false)
.setKeepRemovedTasks(true));
.setShowFinishedTasks(true));
}

private void actionStartProcessing(AjaxRequestTarget aTarget, Form<FormData> aForm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static de.tudarmstadt.ukp.clarin.webanno.model.AnnotationDocumentStateChangeFlag.EXPLICIT_ANNOTATOR_USER_ACTION;
import static de.tudarmstadt.ukp.inception.recommendation.api.model.LearningRecordChangeLocation.AUTO_ACCEPT;
import static de.tudarmstadt.ukp.inception.recommendation.api.model.LearningRecordUserAction.ACCEPTED;
import static de.tudarmstadt.ukp.inception.scheduling.TaskScope.PROJECT;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class BulkPredictionTask

public BulkPredictionTask(Builder<? extends Builder<?>> aBuilder)
{
super(aBuilder.withType(TYPE).withCancellable(true));
super(aBuilder.withType(TYPE).withCancellable(true).withScope(PROJECT));

recommender = aBuilder.recommender;
dataOwner = aBuilder.dataOwner;
Expand Down Expand Up @@ -231,7 +232,12 @@ public T withDataOwner(String aDataOwner)

public BulkPredictionTask build()
{
Validate.notNull(sessionOwner, "BulkPredictionTask requires a user");
withProject(recommender.getProject());

Validate.notNull(sessionOwner, "BulkPredictionTask requires a session owner");
Validate.notNull(dataOwner, "BulkPredictionTask requires a data owner");
Validate.notNull(recommender, "BulkPredictionTask requires a recommender");
Validate.notNull(project, "BulkPredictionTask requires a project");

return new BulkPredictionTask(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface SchedulingService
/**
* @return tasks which are no longer running (completed, failed).
*/
List<Task> getEndedTasks();
List<Task> getTasksPendingAcknowledgment();

List<Task> getScheduledAndRunningTasks();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class SchedulingServiceImpl

private final List<Task> runningTasks;
private final List<Task> enqueuedTasks;
private final List<Task> endedTasks;
private final List<Task> pendingAcknowledgement;
private final Set<Project> deletionPending;

@Autowired
Expand All @@ -87,7 +87,7 @@ public SchedulingServiceImpl(ApplicationContext aApplicationContext,
aConfig.getQueueSize(), this::beforeExecute, this::afterExecute);
runningTasks = Collections.synchronizedList(new ArrayList<>());
enqueuedTasks = Collections.synchronizedList(new ArrayList<>());
endedTasks = Collections.synchronizedList(new ArrayList<>());
pendingAcknowledgement = Collections.synchronizedList(new ArrayList<>());
deletionPending = Collections.synchronizedSet(new LinkedHashSet<>());
watchdog = Executors.newScheduledThreadPool(1);
watchdog.scheduleAtFixedRate(this::scheduleEligibleTasks, 5, 5, SECONDS);
Expand All @@ -106,14 +106,24 @@ private void afterExecute(Runnable aRunnable, Throwable aThrowable)
Validate.notNull(aRunnable, "Task cannot be null");

var task = (Task) aRunnable;
runningTasks.remove(aRunnable);

LOG.debug("Ended task [{}]: {}", task, task.getMonitor().getState());
if (!task.getMonitor().isCancelled() && task.getScope().isKeepAfterEnded()) {
endedTasks.add(task);
}
handleTaskEnded(task);

scheduleEligibleTasks();
}

private void handleTaskEnded(Task aTask)
{
runningTasks.remove(aTask);
if (aTask.getMonitor().isCancelled() || !aTask.getScope().isDestroyOnEnd()) {
pendingAcknowledgement.add(aTask);
}
else {
aTask.destroy();
}
}

/**
* @return tasks which have not been handed to the executor yet.
*/
Expand Down Expand Up @@ -148,20 +158,21 @@ public List<Task> getRunningTasks()
}

/**
* @return tasks which are no longer running (completed, failed).
* @return tasks which are no longer running (completed, failed) and which require the user to
* acknowledge the result.
*/
@Override
public List<Task> getEndedTasks()
public List<Task> getTasksPendingAcknowledgment()
{
// We return copy here, as else the list the receiver sees might be updated
// when new tasks are running or existing ones stopped.
return new ArrayList<>(endedTasks);
return new ArrayList<>(pendingAcknowledgement);
}

@Override
public List<Task> getScheduledAndRunningTasks()
{
List<Task> result = new ArrayList<>();
var result = new ArrayList<Task>();
result.addAll(getScheduledTasks());
result.addAll(getRunningTasks());
return result;
Expand All @@ -170,10 +181,11 @@ public List<Task> getScheduledAndRunningTasks()
@Override
public List<Task> getAllTasks()
{
List<Task> result = new ArrayList<>();
var result = new ArrayList<Task>();
result.addAll(getEnqueuedTasks());
result.addAll(getScheduledTasks());
result.addAll(getRunningTasks());
result.addAll(getTasksPendingAcknowledgment());
return result;
}

Expand Down Expand Up @@ -376,7 +388,7 @@ public synchronized Optional<Task> findTask(Predicate<Task> aPredicate)
.or(() -> executor.getQueue().stream().map(Task.class::cast).filter(aPredicate)
.findFirst())
.or(() -> runningTasks.stream().filter(aPredicate).findFirst())
.or(() -> endedTasks.stream().filter(aPredicate).findFirst());
.or(() -> pendingAcknowledgement.stream().filter(aPredicate).findFirst());
}

@Override
Expand All @@ -402,12 +414,14 @@ public synchronized void stopAllTasksMatching(Predicate<Task> aPredicate)
runningTasks.forEach(task -> {
if (aPredicate.test(task)) {
task.getMonitor().cancel();
// The task will be destroyed if necessary by the afterExecute callback
}
});

endedTasks.removeIf(runnable -> {
pendingAcknowledgement.removeIf(runnable -> {
var task = (Task) runnable;
if (aPredicate.test(task)) {
task.destroy();
return true;
}
return false;
Expand Down Expand Up @@ -495,15 +509,15 @@ public void destroy()
executor.getQueue().clear();
watchdog.shutdownNow();
executor.shutdownNow();
endedTasks.clear();
pendingAcknowledgement.clear();
}

private void logState()
{
getEnqueuedTasks().forEach(t -> LOG.debug("Queued : {}", t));
getScheduledTasks().forEach(t -> LOG.debug("Scheduled: {}", t));
getRunningTasks().forEach(t -> LOG.debug("Running : {}", t));
getEndedTasks().forEach(t -> LOG.debug("Ended : {}", t));
getEnqueuedTasks().forEach(t -> LOG.debug("Queued : {}", t));
getScheduledTasks().forEach(t -> LOG.debug("Scheduled : {}", t));
getRunningTasks().forEach(t -> LOG.debug("Running : {}", t));
getTasksPendingAcknowledgment().forEach(t -> LOG.debug("Pending ack : {}", t));
}

@Override
Expand All @@ -519,12 +533,8 @@ public void executeSync(Task aTask)
aTask.runSync();
}
finally {
runningTasks.remove(aTask);
LOG.debug("Ended task (sync) [{}]: {}", aTask, aTask.getMonitor().getState());
if (!aTask.getMonitor().isCancelled() && aTask.getScope().isKeepAfterEnded()) {
endedTasks.add(aTask);
}
aTask.destroy();
handleTaskEnded(aTask);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ public final void run()
runSync();
}
finally {
destroy();
MDC.remove(KEY_REPOSITORY_PATH);
MDC.remove(KEY_USERNAME);
MDC.remove(KEY_PROJECT_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public enum TaskScope
*/
PROJECT;

boolean isKeepAfterEnded()
boolean isDestroyOnEnd()
{
return this != EPHEMERAL;
return this == EPHEMERAL;
}

boolean isRemoveWhenUserSessionEnds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public interface SchedulerController
{
String BASE_URL = BASE_API_URL + "/scheduler";
String CANCEL = "cancel";
String ACKNOWLEDGE = "acknowledge";
String TASKS = "/tasks";
String PARAM_TASK_ID = "taskId";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public class TaskMonitorFooterItem
@Override
public Component create(String aId)
{
return new TaskMonitorPanel(aId);
return new TaskMonitorPanel(aId).setShowFinishedTasks(true).setPopupMode(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import java.util.Map;

import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringUtils;
import org.apache.wicket.authorization.Action;
import org.apache.wicket.authroles.authorization.strategies.role.annotations.AuthorizeAction;
import org.apache.wicket.markup.html.WebMarkupContainer;
import org.apache.wicket.model.Model;
import org.apache.wicket.protocol.http.servlet.ServletWebRequest;
import org.apache.wicket.request.Url;
import org.apache.wicket.request.cycle.RequestCycle;
import org.apache.wicket.spring.injection.annot.SpringBean;
Expand All @@ -48,7 +49,7 @@ public class TaskMonitorPanel
private @SpringBean ServletContext servletContext;

private boolean popupMode = true;
private boolean keepRemovedTasks = false;
private boolean showFinishedTasks = true;
private String typePattern = "";

public TaskMonitorPanel(String aId)
Expand All @@ -63,9 +64,9 @@ public TaskMonitorPanel setPopupMode(boolean aPopupMode)
return this;
}

public TaskMonitorPanel setKeepRemovedTasks(boolean aKeepRemovedTasks)
public TaskMonitorPanel setShowFinishedTasks(boolean aKeepRemovedTasks)
{
keepRemovedTasks = aKeepRemovedTasks;
showFinishedTasks = aKeepRemovedTasks;
return this;
}

Expand All @@ -89,7 +90,7 @@ protected void onConfigure()
setDefaultModel(Model.ofMap(Map.of( //
"csrfToken", getCsrfTokenFromSession(), //
"popupMode", popupMode, //
"keepRemovedTasks", keepRemovedTasks, //
"showFinishedTasks", showFinishedTasks, //
"typePattern", typePattern, //
"endpointUrl", constructEndpointUrl(), //
"wsEndpointUrl", constructWsEndpointUrl(), //
Expand All @@ -114,12 +115,16 @@ private String constructWsEndpointUrl()

public String getCsrfTokenFromSession()
{
var httpRequest = (HttpServletRequest) RequestCycle.get().getRequest()
.getContainerRequest();
var httpResponse = (HttpServletResponse) RequestCycle.get().getResponse()
.getContainerResponse();

var csrfTokenRepository = new HttpSessionCsrfTokenRepository();
var csrfToken = csrfTokenRepository.loadToken(
((ServletWebRequest) RequestCycle.get().getRequest()).getContainerRequest());
var csrfToken = csrfTokenRepository.loadDeferredToken(httpRequest, httpResponse);

if (csrfToken != null) {
return csrfToken.getToken();
return csrfToken.get().getToken();
}
else {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,16 @@ public void cancelTask(@PathVariable(PARAM_TASK_ID) int aTaskId)
schedulingService.stopAllTasksMatching(
t -> t.getId() == aTaskId && t.getUser().filter(user::equals).isPresent());
}

@PostMapping(//
value = TASKS + "/{" + PARAM_TASK_ID + "}/" + ACKNOWLEDGE, //
consumes = { ALL_VALUE }, //
produces = APPLICATION_JSON_VALUE)
public void acknowledgeResult(@PathVariable(PARAM_TASK_ID) int aTaskId)
{
var user = userService.getCurrentUser();

schedulingService.stopAllTasksMatching(
t -> t.getId() == aTaskId && t.getUser().filter(user::equals).isPresent());
}
}
Loading

0 comments on commit 8bd6619

Please sign in to comment.