Skip to content

Commit

Permalink
#1066 Recommender status info
Browse files Browse the repository at this point in the history
- started on websocket messaging
  • Loading branch information
UWinch committed Jul 18, 2019
1 parent 8a71d86 commit c05b036
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 5 deletions.
4 changes: 4 additions & 0 deletions inception-recommendation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@
<artifactId>dkpro-core-api-ner-asl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.wicket</groupId>
<artifactId>wicket-native-websocket-core</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ public Predictions computePredictions(User aUser, Project aProject,

// Perform the actual prediction
recommendationEngine.predict(ctx, predictionCas);

// Extract the suggestions from the data which the recommender has written
// into the CAS
List<AnnotationSuggestion> suggestions = extractSuggestions(aUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.wicket.markup.html.panel.Panel;
import org.apache.wicket.model.IModel;
import org.apache.wicket.model.LoadableDetachableModel;
import org.apache.wicket.protocol.ws.api.WebSocketBehavior;
import org.apache.wicket.protocol.ws.api.WebSocketRequestHandler;
import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
import org.apache.wicket.spring.injection.annot.SpringBean;
import org.wicketstuff.event.annotation.OnEvent;

Expand Down Expand Up @@ -111,6 +114,19 @@ protected void populateItem(ListItem<Recommender> item)
searchResultGroups.setModel(LoadableDetachableModel.of(() -> recommendationService
.listEnabledRecommenders(aModel.getObject().getProject())));
add(searchResultGroups);

add(new WebSocketBehavior() {

private static final long serialVersionUID = 1L;

@Override
protected void onPush(WebSocketRequestHandler aHandler, IWebSocketPushMessage aMessage)
{
// TODO: message specific stuff
aHandler.add(searchResultGroups);
}

});
}

public AnnotatorState getModelObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import de.tudarmstadt.ukp.inception.recommendation.event.RecommenderEvaluationResultEvent;
import de.tudarmstadt.ukp.inception.scheduling.SchedulingService;
import de.tudarmstadt.ukp.inception.scheduling.Task;
import de.tudarmstadt.ukp.inception.scheduling.TaskState;
import de.tudarmstadt.ukp.inception.scheduling.TaskUpdateEvent;

/**
* This task evaluates all available classification tools for all annotation layers of the current
Expand Down Expand Up @@ -178,9 +180,8 @@ protected List<CAS> initialize()
threshold);
}

appEventPublisher.publishEvent(new RecommenderEvaluationResultEvent(this,
recommender, user.getUsername(), result,
System.currentTimeMillis() - start, activated));
publishFinishedEvents(user, recommender, start, result,
activated);
}
catch (Throwable e) {
log.error("[{}][{}]: Failed", user.getUsername(), recommenderName, e);
Expand All @@ -194,6 +195,17 @@ protected List<CAS> initialize()
"SelectionTask after activating recommenders"));
}

private void publishFinishedEvents(User user, Recommender recommender, long start,
EvaluationResult result, boolean activated)
{
String username = user.getUsername();
appEventPublisher.publishEvent(new RecommenderEvaluationResultEvent(this,
recommender, username, result,
System.currentTimeMillis() - start, activated));
getSchedulerCallback().accept(new TaskUpdateEvent(username, TaskState.SELECTION_FINISHED,
result.getTrainDataRatio(), recommender.getId(), activated));
}

private List<CAS> readCasses(Project aProject, String aUserName)
{
List<CAS> casses = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import de.tudarmstadt.ukp.inception.recommendation.api.recommender.RecommenderContext;
import de.tudarmstadt.ukp.inception.scheduling.SchedulingService;
import de.tudarmstadt.ukp.inception.scheduling.Task;
import de.tudarmstadt.ukp.inception.scheduling.TaskState;
import de.tudarmstadt.ukp.inception.scheduling.TaskUpdateEvent;

/**
* This consumer trains a new classifier model, if a classification tool was selected before.
Expand Down Expand Up @@ -184,6 +186,9 @@ protected List<TrainingDocument> initialize()

ctx.close();
recommendationService.putContext(user, recommender, ctx);

getSchedulerCallback().accept(new TaskUpdateEvent(user.getUsername(),
TaskState.TRAINING_FINISHED, recommender.getId()));
}
catch (Throwable e) {
log.info("[{}][{}]: Training failed ({} ms)", user.getUsername(),
Expand Down
4 changes: 4 additions & 0 deletions inception-scheduling/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,9 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.wicket</groupId>
<artifactId>wicket-native-websocket-core</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.wicket.Application;
import org.apache.wicket.protocol.ws.WebSocketSettings;
import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.security.core.session.SessionInformation;
import org.springframework.security.core.session.SessionRegistry;
import org.springframework.stereotype.Component;

import de.tudarmstadt.ukp.inception.scheduling.config.SchedulingProperties;
Expand All @@ -40,6 +46,7 @@ public class SchedulingService

private final ApplicationContext applicationContext;
private final ThreadPoolExecutor executor;
private @Autowired SessionRegistry sessionRegistry;

private final List<Task> runningTasks;

Expand Down Expand Up @@ -94,6 +101,9 @@ public synchronized void enqueue(Task aTask)

log.debug("Enqueuing task [{}]", aTask);

// set callback to websocket distribution
aTask.setSchedulerCallback(this::distributeWebSocketMessage);

// This autowires the task fields manually.
AutowireCapableBeanFactory factory = applicationContext.getAutowireCapableBeanFactory();
factory.autowireBean(aTask);
Expand Down Expand Up @@ -122,5 +132,25 @@ public void destroy()
executor.shutdownNow();
}


public void distributeWebSocketMessage(TaskUpdateEvent aTaskUpdate)
{
Application application = Application.get();
WebSocketSettings webSocketSettings = WebSocketSettings.Holder.get(application);
IWebSocketConnectionRegistry webSocketConnectionRegistry = webSocketSettings
.getConnectionRegistry();

// get all connections for the user
List<IWebSocketConnection> userConnections = new ArrayList<>();
for (SessionInformation sessionInfo : sessionRegistry
.getAllSessions(aTaskUpdate.getUsername(), false)) {
userConnections.addAll(webSocketConnectionRegistry.getConnections(application,
sessionInfo.getSessionId()));
}

// send message to all connections
for (IWebSocketConnection connection : userConnections) {
connection.sendMessage(new TaskWebSocketPushMessage(aTaskUpdate.getProgress(),
aTaskUpdate.getState(), aTaskUpdate.getRecommenderId(), aTaskUpdate.isActive()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.commons.lang3.Validate.notNull;

import java.util.Objects;
import java.util.function.Consumer;

import de.tudarmstadt.ukp.clarin.webanno.model.Project;
import de.tudarmstadt.ukp.clarin.webanno.security.model.User;
Expand All @@ -30,6 +31,7 @@ public abstract class Task
private final User user;
private final Project project;
private final String trigger;
private Consumer<TaskUpdateEvent> schedulerCallback;

public Task(User aUser, Project aProject, String aTrigger)
{
Expand Down Expand Up @@ -85,4 +87,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(user, project);
}

protected void setSchedulerCallback(Consumer<TaskUpdateEvent> aCallback)
{
schedulerCallback = aCallback;
}

protected Consumer<TaskUpdateEvent> getSchedulerCallback()
{
return schedulerCallback;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package de.tudarmstadt.ukp.inception.scheduling;

public enum TaskState
{
TRAINING_STARTED, TRAINING_FINISHED, SELECTION_STARTED, SELECTION_FINISHED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package de.tudarmstadt.ukp.inception.scheduling;


public class TaskUpdateEvent
{
private final String username;
private final double progress;
private final TaskState state;
private final long recommenderId;
private final boolean active;

//TODO: add evaluationResult?

public TaskUpdateEvent(String aName, TaskState aState, double aProgress, long aRecommenderId,
boolean aActive)
{
username = aName;
state = aState;
progress = aProgress;
recommenderId = aRecommenderId;
active = aActive;
}

public TaskUpdateEvent(String aUsername, TaskState aState, long aRecommenderId)
{
this(aUsername, aState, 1, aRecommenderId, true);
}

public String getUsername()
{
return username;
}

public double getProgress()
{
return progress;
}

public TaskState getState()
{
return state;
}

public long getRecommenderId()
{
return recommenderId;
}

public boolean isActive()
{
return active;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package de.tudarmstadt.ukp.inception.scheduling;

import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;

public class TaskWebSocketPushMessage
implements IWebSocketPushMessage
{
private static final long serialVersionUID = 1L;

public TaskWebSocketPushMessage(double aProgress, TaskState aState, long aRecommenderId,
boolean aActive)
{
// TODO Auto-generated constructor stub
}

}

0 comments on commit c05b036

Please sign in to comment.