Skip to content

Commit

Permalink
dkpro#71 and dkpro#73 - MultiThreadBatchTask - revised and functional.
Browse files Browse the repository at this point in the history
  • Loading branch information
reckart authored and mwunderlich committed Aug 20, 2015
1 parent 8e553ea commit 154724e
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 129 deletions.
11 changes: 11 additions & 0 deletions dkpro-lab-core/src/main/java/de/tudarmstadt/ukp/dkpro/lab/Lab.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Properties;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
Expand Down Expand Up @@ -63,6 +64,16 @@ public static Lab newInstance(String aContext)
return lab;
}

public void setProperty(String aKey, String aValue)
{
context.getBean("Properties", Properties.class).setProperty(aKey, aValue);
}

public String getProperty(String aKey)
{
return context.getBean("Properties", Properties.class).getProperty(aKey);
}

public TaskExecutionService getTaskExecutionService()
{
return (TaskExecutionService) context.getBean("TaskExecutionService");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Map;
import java.util.Properties;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.core.io.Resource;

import de.tudarmstadt.ukp.dkpro.lab.engine.TaskContextFactory;
Expand All @@ -32,6 +34,9 @@
public class DefaultTaskExecutionService
implements TaskExecutionService
{
@Autowired
private AutowireCapableBeanFactory beanFactory;

private TaskContextFactory contextFactory;

private final Map<Class<? extends Task>, Class<? extends TaskExecutionEngine>> map;
Expand All @@ -56,6 +61,7 @@ public TaskExecutionEngine createEngine(Task aConfiguration)
if (taskClass.isAssignableFrom(aConfiguration.getClass())) {
TaskExecutionEngine engine = map.get(taskClass).newInstance();
engine.setContextFactory(contextFactory);
beanFactory.autowireBean(engine);
return engine;
}
}
Expand Down Expand Up @@ -87,6 +93,22 @@ public void setMappingDescriptors(Resource[] aResources)
}
}
}

public void registerEngine(Class<? extends Task> aTaskClazz,
Class<? extends TaskExecutionEngine> aEngineClazz)
{
map.put(aTaskClazz, aEngineClazz);
}

public void unregisterEngine(Class<? extends Task> aTaskClazz)
{
map.remove(aTaskClazz);
}

public Class<? extends TaskExecutionEngine> getEngine(Class<? extends Task> aTaskClazz)
{
return map.get(aTaskClazz);
}

public void setContextFactory(TaskContextFactory aContextFactory)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;

import de.tudarmstadt.ukp.dkpro.lab.engine.ExecutionException;
import de.tudarmstadt.ukp.dkpro.lab.engine.LifeCycleException;
Expand All @@ -50,6 +51,35 @@ public class MultiThreadBatchTaskEngine
extends BatchTaskEngine
{
private final Log log = LogFactory.getLog(getClass());

public static final String PROP_THREADS = "engine.batch.maxThreads";

@Value("#{ @Properties['" + PROP_THREADS + "'] }")
private int maxThreads = Runtime.getRuntime().availableProcessors() - 1;

/**
* Explicit no-args constructor
*/
public MultiThreadBatchTaskEngine()
{
// Nothing to do.
}

/**
* Constructor with number of threads.
*
* @param aNThreads
* The number of threads to use for the MultiThreadBatchTask.
*/
public MultiThreadBatchTaskEngine(int aNThreads)
{
setMaxThreads(aNThreads);
}

public void setMaxThreads(int aNThreads)
{
maxThreads = aNThreads;
}

@Override
protected void executeConfiguration(BatchTask aConfiguration, TaskContext aContext,
Expand Down Expand Up @@ -80,25 +110,29 @@ protected void executeConfiguration(BatchTask aConfiguration, TaskContext aConte
}

Queue<Task> queue = new LinkedList<>(aConfiguration.getTasks());
// Set<Task> loopDetection = new HashSet<>();
// List<UnresolvedImportException> deferralReasons = new ArrayList<>();

ConcurrentMap<Task, Throwable> exceptionsFromLastLoop;
// keeps track of the execution threads;
// TODO MW: do we really need this or can we work with the futures list only?
Map<Task, ExecutionThread> threads = new HashMap<>();
// keeps track of submitted Futures and their associated tasks
Map<Future<?>, Task> futures = new HashMap<Future<?>, Task>();
// will be instantiated with all exceptions from current loop
ConcurrentMap<Task, Throwable> exceptionsFromLastLoop = null;
ConcurrentMap<Task, Throwable> exceptionsFromCurrentLoop = new ConcurrentHashMap<>();

// ThreadPoolExecutorFactoryBean factory = new ThreadPoolExecutorFactoryBean();
// factory.setCorePoolSize(4);
int outerLoopCounter = 0;

// main loop
do {
Map<Task, ExecutionThread> threads = new HashMap<>();
outerLoopCounter++;

ExecutorService executor = Executors.newFixedThreadPool(2);
threads.clear();
futures.clear();
ExecutorService executor = Executors.newFixedThreadPool(maxThreads);

// set the exceptions from the last loop
exceptionsFromLastLoop = new ConcurrentHashMap<>(exceptionsFromCurrentLoop);

// Fix MW: Clear exceptionFromCurrentLoop; otherwise the loop with run at most twice.
// Fix MW: Clear exceptionsFromCurrentLoop; otherwise the loop with run at most twice.
exceptionsFromCurrentLoop.clear();

// process all tasks from the queue
Expand All @@ -115,127 +149,63 @@ protected void executeConfiguration(BatchTask aConfiguration, TaskContext aConte
log.info("Executing task [" + task.getType() + "]");

// set scope here so that the inherited scopes are considered
// set scope here so that tasks added to scope in this loop are considered
if (task instanceof BatchTask) {
((BatchTask) task).setScope(scope);
}

// try {
// execution = runNewExecution(aContext, task, aConfig, aExecutedSubtasks);

// ExecutionThread thread = (ExecutionThread) factory.newThread(new ExecutionThread(aContext, task, aConfig,
// aExecutedSubtasks));

ExecutionThread thread = new ExecutionThread(aContext, task, aConfig,
aExecutedSubtasks);

// TaskUncaughtExceptionHandler exceptionHandler = new TaskUncaughtExceptionHandler(
// exceptionsFromCurrentLoop, task);
// thread.setUncaughtExceptionHandler(exceptionHandler);

threads.put(task, thread);

// TODO xxx
// thread.start();
// executor.execute(thread);
// factory.createThread(thread)
Future<?> future = executor.submit(thread);

// and run it
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (java.util.concurrent.ExecutionException e) {
exceptionsFromCurrentLoop.putIfAbsent(task, e.getCause());
}

// // Record new/existing execution
// aExecutedSubtasks.add(execution.getId());
// scope.add(execution.getId());
// loopDetection.clear();
// deferralReasons.clear();
// }
// catch (UnresolvedImportException e) {
// Add task back to queue
// log.debug("Deferring execution of task [" + task.getType() + "]: "
// + e.getMessage());
// queue.add(task);
//
// Detect endless loop
// if (loopDetection.contains(task)) {
// StringBuilder details = new StringBuilder();
// for (UnresolvedImportException r : deferralReasons) {
// details.append("\n -");
// details.append(r.getMessage());
// }
//
// throw an UnresolvedImportException in case there is an outer BatchTask which needs to be executed first
// throw new UnresolvedImportException(e, details.toString());
// }
// Record failed execution
// loopDetection.add(task);
// deferralReasons.add(e);
// }
futures.put(executor.submit(thread), task);
}
else {
log.debug("Using existing execution [" + execution.getId() + "]");

// Record new/existing execution
aExecutedSubtasks.add(execution.getId());
scope.add(execution.getId());
// loopDetection.clear();
// deferralReasons.clear();
}
}

// TODO xxx

// executor.shutdown();
// while (!executor.isTerminated()) {
// empty
// }

// wait for completing all threads
// for (ExecutionThread thread : threads.values()) {
// try {
// thread.join();
// }
// catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// }
// try {
System.out.println("Calling shutdown");
executor.shutdown();
// }
// catch (InterruptedException e) {
// throw new RuntimeException(e);
// }

// empty the queue -- it's already empty!!
// queue.clear();

System.out.println("All threads finished");
// try and get results from all futures to check for failed executions
for(Map.Entry<Future<?>, Task> entry : futures.entrySet()){
try {
entry.getKey().get();
}
catch(java.util.concurrent.ExecutionException ex) {
Task task = entry.getValue();
// TODO MW: add a retry-counter here to prevent endless loops?
log.info("Task exec failed for [" + task.getType() + "]");
// record the failed task, so that it can be re-added to the queue
exceptionsFromCurrentLoop.put(task, ex);
}
catch(InterruptedException ex){
// thread interrupted, exit
throw new RuntimeException(ex);
}
}

log.debug("Calling shutdown");
executor.shutdown();
log.debug("All threads finished");

// collect the results
for (Map.Entry<Task, ExecutionThread> entry : threads.entrySet()) {
ExecutionThread thread = entry.getValue();
Task task = entry.getKey();
ExecutionThread thread = entry.getValue();
TaskContextMetadata execution = thread.getTaskContextMetadata();

// probably failed
if (execution == null) {
Throwable exception = exceptionsFromCurrentLoop.get(task);
if (!(exception instanceof UnresolvedImportException)) {
if (!(exception instanceof UnresolvedImportException)
&& !(exception instanceof java.util.concurrent.ExecutionException)) {
throw new RuntimeException(exception);
}
exceptionsFromCurrentLoop.put(task, exception);

// put it to the queue
// re-add to the queue
queue.add(task);
}
else {
Expand All @@ -248,15 +218,15 @@ protected void executeConfiguration(BatchTask aConfiguration, TaskContext aConte

}
// finish if the same tasks failed again
while (!exceptionsFromCurrentLoop.keySet().equals(exceptionsFromLastLoop.keySet()));
while (!exceptionsFromCurrentLoop.keySet().equals(exceptionsFromLastLoop.keySet()));
// END OF DO; finish if the same tasks failed again

if (!exceptionsFromCurrentLoop.isEmpty()) {
// collect all details
StringBuilder details = new StringBuilder();
for (Throwable throwable : exceptionsFromCurrentLoop.values()) {
details.append("\n -");
details.append(throwable.getMessage());

}

// we re-throw the first exception
Expand All @@ -268,8 +238,14 @@ protected void executeConfiguration(BatchTask aConfiguration, TaskContext aConte
// otherwise wrap it
throw new RuntimeException(details.toString(), next);
}
log.info("MultiThreadBatchTask completed successfully. Total number of outer loop runs: "
+ outerLoopCounter);
}

/**
* Represents a task's execution thread,
* together with the associated context, config and scope.
*/
protected class ExecutionThread
extends Thread
{
Expand Down Expand Up @@ -315,29 +291,4 @@ public TaskContextMetadata getTaskContextMetadata()
return taskContextMetadata;
}
}

@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
protected class TaskUncaughtExceptionHandler
implements Thread.UncaughtExceptionHandler

{
private final ConcurrentMap<Task, Throwable> thrownExceptions;
private final Task task;

public TaskUncaughtExceptionHandler(ConcurrentMap<Task, Throwable> resultingExceptionMap,
Task task)
{
this.thrownExceptions = resultingExceptionMap;
this.task = task;
}

@Override
public void uncaughtException(Thread t, Throwable e)
{
System.err.println(
"Task: " + task.getClass().getSimpleName() + " in thread " + t.getId()
+ " threw an exception: " + e.toString());
thrownExceptions.put(task, e);
}
}
}
13 changes: 12 additions & 1 deletion dkpro-lab-core/src/main/resources/META-INF/spring/context.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd">

<context:annotation-config />

<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="properties" ref="Properties"/>
</bean>

<bean id="Properties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
</bean>

<bean id="TaskExecutionService"
class="de.tudarmstadt.ukp.dkpro.lab.engine.impl.DefaultTaskExecutionService">
<property name="mappingDescriptors" value="classpath*:META-INF/lab/engines.properties" />
Expand Down
Loading

0 comments on commit 154724e

Please sign in to comment.