Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 71: MultiThreadBatchTask - revised and functional. #73

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,30 @@
/**
* @author Ivan Habernal
*/
public class MultiThreadBatchTask
extends BatchTask
{
public class MultiThreadBatchTask extends BatchTask {
private final Log log = LogFactory.getLog(getClass());

private int nThreads = 1;

/**
* Explicit no-args constructor
*/
public MultiThreadBatchTask() { }

/**
* Constructor with number of threads.
*
* @param n The number of threads to use for the MultiThreadBatchTask.
*/
public MultiThreadBatchTask(int n) {
this.nThreads = n;
}

public void setNumberOfThreadsToUse(int n) {
this.nThreads = n;
}

@Override
protected void executeConfiguration(TaskContext aContext, Map<String, Object> aConfig,
Set<String> aExecutedSubtasks)
throws ExecutionException, LifeCycleException
{
protected void executeConfiguration(TaskContext aContext, Map<String, Object> aConfig, Set<String> aExecutedSubtasks) throws ExecutionException, LifeCycleException {
if (log.isTraceEnabled()) {
// Show all subtasks executed so far
for (String est : aExecutedSubtasks) {
Expand All @@ -79,183 +93,109 @@ protected void executeConfiguration(TaskContext aContext, Map<String, Object> aC
}

Queue<Task> queue = new LinkedList<>(tasks);
// Set<Task> loopDetection = new HashSet<>();
// List<UnresolvedImportException> deferralReasons = new ArrayList<>();

ConcurrentMap<Task, Throwable> exceptionsFromLastLoop;
Map<Task, ExecutionThread> threads = new HashMap<>(); // keeps track of the execution threads; TODO MW: do we really need this or can we work with the futures list only?
Map<Future<?>, Task> futures = new HashMap<Future<?>, Task>(); // keeps track of submitted Futures and their associated tasks
ConcurrentMap<Task, Throwable> exceptionsFromLastLoop = null; // will be instantiated with all exceptions from current loop
ConcurrentMap<Task, Throwable> exceptionsFromCurrentLoop = new ConcurrentHashMap<>();

// ThreadPoolExecutorFactoryBean factory = new ThreadPoolExecutorFactoryBean();
// factory.setCorePoolSize(4);

int outerLoopCounter = 0;

// main loop
do {
Map<Task, ExecutionThread> threads = new HashMap<>();

ExecutorService executor = Executors.newFixedThreadPool(2);
++outerLoopCounter;

threads.clear();
futures.clear();
ExecutorService executor = Executors.newFixedThreadPool(nThreads);

// set the exceptions from the last loop
exceptionsFromLastLoop = new ConcurrentHashMap<>(exceptionsFromCurrentLoop);

// Fix MW: Clear exceptionFromCurrentLoop; otherwise the loop with run at most twice.
// Fix MW: Clear exceptionsFromCurrentLoop; otherwise the loop with run at most twice.
exceptionsFromCurrentLoop.clear();

// process all tasks from the queue
while (!queue.isEmpty()) {
Task task = queue.poll();

TaskContextMetadata execution = getExistingExecution(aContext, task, aConfig,
aExecutedSubtasks);
TaskContextMetadata execution = getExistingExecution(aContext, task, aConfig, aExecutedSubtasks);

// Check if a subtask execution compatible with the present configuration has
// does already exist ...
// Check if a subtask execution compatible with the present configuration already exists.
if (execution == null) {
// ... otherwise execute it with the present configuration
log.info("Executing task [" + task.getType() + "]");

// set scope here so that the inherited scopes are considered
// set scope here so that tasks added to scope in this loop are considered
if (task instanceof BatchTask) {
((BatchTask) task).setScope(scope);
}

// try {
// execution = runNewExecution(aContext, task, aConfig, aExecutedSubtasks);

// ExecutionThread thread = (ExecutionThread) factory.newThread(new ExecutionThread(aContext, task, aConfig,
// aExecutedSubtasks));

ExecutionThread thread = new ExecutionThread(aContext, task, aConfig,
aExecutedSubtasks);

// TaskUncaughtExceptionHandler exceptionHandler = new TaskUncaughtExceptionHandler(
// exceptionsFromCurrentLoop, task);
// thread.setUncaughtExceptionHandler(exceptionHandler);

ExecutionThread thread = new ExecutionThread(aContext, task, aConfig, aExecutedSubtasks);
threads.put(task, thread);

// TODO xxx
// thread.start();
// executor.execute(thread);
// factory.createThread(thread)
Future<?> future = executor.submit(thread);

// and run it
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (java.util.concurrent.ExecutionException e) {
exceptionsFromCurrentLoop.putIfAbsent(task, e.getCause());
}

// // Record new/existing execution
// aExecutedSubtasks.add(execution.getId());
// scope.add(execution.getId());
// loopDetection.clear();
// deferralReasons.clear();
// }
// catch (UnresolvedImportException e) {
// Add task back to queue
// log.debug("Deferring execution of task [" + task.getType() + "]: "
// + e.getMessage());
// queue.add(task);
//
// Detect endless loop
// if (loopDetection.contains(task)) {
// StringBuilder details = new StringBuilder();
// for (UnresolvedImportException r : deferralReasons) {
// details.append("\n -");
// details.append(r.getMessage());
// }
//
// throw an UnresolvedImportException in case there is an outer BatchTask which needs to be executed first
// throw new UnresolvedImportException(e, details.toString());
// }
// Record failed execution
// loopDetection.add(task);
// deferralReasons.add(e);
// }
futures.put(executor.submit(thread), task);
}
else {
log.debug("Using existing execution [" + execution.getId() + "]");

// Record new/existing execution
aExecutedSubtasks.add(execution.getId());
scope.add(execution.getId());
// loopDetection.clear();
// deferralReasons.clear();
}
}

// try and get results from all futures to check for failed executions
for(Map.Entry<Future<?>, Task> entry : futures.entrySet()){
try {
entry.getKey().get();
}
catch(java.util.concurrent.ExecutionException ex) {
Task task = entry.getValue();
// TODO MW: add a retry-counter here to prevent endless loops?
log.info("Task exec failed for [" + task.getType() + "]");
// record the failed task, so that it can be re-added to the queue
exceptionsFromCurrentLoop.put(task, ex);
}
catch(InterruptedException ex){
// thread interrupted, exit
throw new RuntimeException(ex);
}
}

// TODO xxx

// executor.shutdown();
// while (!executor.isTerminated()) {
// empty
// }

// wait for completing all threads
// for (ExecutionThread thread : threads.values()) {
// try {
// thread.join();
// }
// catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// }
// try {
System.out.println("Calling shutdown");
executor.shutdown();
// }
// catch (InterruptedException e) {
// throw new RuntimeException(e);
// }

// empty the queue -- it's already empty!!
// queue.clear();

System.out.println("All threads finished");
log.debug("Calling shutdown");
executor.shutdown();
log.debug("All threads finished");

// collect the results
for (Map.Entry<Task, ExecutionThread> entry : threads.entrySet()) {
ExecutionThread thread = entry.getValue();
Task task = entry.getKey();
Task task = entry.getKey();
ExecutionThread thread = entry.getValue();
TaskContextMetadata execution = thread.getTaskContextMetadata();

// probably failed
if (execution == null) {
Throwable exception = exceptionsFromCurrentLoop.get(task);
if (!(exception instanceof UnresolvedImportException)) {
if (!(exception instanceof UnresolvedImportException) && !(exception instanceof java.util.concurrent.ExecutionException)) {
throw new RuntimeException(exception);
}
exceptionsFromCurrentLoop.put(task, exception);

// put it to the queue
// re-add to the queue
queue.add(task);
}
else {

// Record new/existing execution
aExecutedSubtasks.add(execution.getId());
scope.add(execution.getId());
}
}
} while (!exceptionsFromCurrentLoop.keySet().equals(exceptionsFromLastLoop.keySet())); // END OF DO; finish if the same tasks failed again

}
// finish if the same tasks failed again
while (!exceptionsFromCurrentLoop.keySet().equals(exceptionsFromLastLoop.keySet()));


if (!exceptionsFromCurrentLoop.isEmpty()) {
// collect all details
StringBuilder details = new StringBuilder();
for (Throwable throwable : exceptionsFromCurrentLoop.values()) {
details.append("\n -");
details.append(throwable.getMessage());

}

// we re-throw the first exception
Expand All @@ -267,34 +207,35 @@ protected void executeConfiguration(TaskContext aContext, Map<String, Object> aC
// otherwise wrap it
throw new RuntimeException(details.toString(), next);
}

log.info("MultiThreadBatchTask completed successfully. Total number of outer loop runs: " + outerLoopCounter);
}

protected class ExecutionThread
extends Thread
{

/**
* Represents a task's execution thread,
* together with the associated context, config and scope.
*
*/
protected class ExecutionThread extends Thread {
private final TaskContext aContext;
private final Task task;
private final Map<String, Object> aConfig;
private final Set<String> scope;

private TaskContextMetadata taskContextMetadata;

public ExecutionThread(TaskContext aContext, Task aTask, Map<String, Object> aConfig,
Set<String> aScope)
{
public ExecutionThread(TaskContext aContext, Task aTask, Map<String, Object> aConfig, Set<String> aScope){
this.aContext = aContext;
this.task = aTask;
this.aConfig = aConfig;
this.scope = aScope;
}

@Override public void run()
{
@Override public void run() {
TaskExecutionService execService = aContext.getExecutionService();
TaskExecutionEngine engine = execService.createEngine(task);
engine.setContextFactory(new ScopedTaskContextFactory(execService
.getContextFactory(), aConfig, scope));
engine.setContextFactory(new ScopedTaskContextFactory(execService .getContextFactory(), aConfig, scope));

String uuid;
try {
uuid = engine.run(task);
Expand All @@ -309,34 +250,8 @@ public ExecutionThread(TaskContext aContext, Task aTask, Map<String, Object> aCo
/**
* Returns the result of the run.
*/
public TaskContextMetadata getTaskContextMetadata()
{
public TaskContextMetadata getTaskContextMetadata() {
return taskContextMetadata;
}
}

@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
protected class TaskUncaughtExceptionHandler
implements Thread.UncaughtExceptionHandler

{
private final ConcurrentMap<Task, Throwable> thrownExceptions;
private final Task task;

public TaskUncaughtExceptionHandler(ConcurrentMap<Task, Throwable> resultingExceptionMap,
Task task)
{
this.thrownExceptions = resultingExceptionMap;
this.task = task;
}

@Override
public void uncaughtException(Thread t, Throwable e)
{
System.err.println(
"Task: " + task.getClass().getSimpleName() + " in thread " + t.getId()
+ " threw an exception: " + e.toString());
thrownExceptions.put(task, e);
}
}
}
Loading