Skip to content

Commit

Permalink
Fix assignments issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdupl committed Jan 15, 2015
1 parent b893dc3 commit d16372c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
9 changes: 5 additions & 4 deletions src/main/java/org/lancoder/master/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void dispatch(ClientTask task, Node node) {
private synchronized boolean assign(ClientTask task, Node node) {
boolean assigned = false;
if (!assignments.containsKey(task)) {
System.out.println("Task " + task.getTaskId() + " is assigned to " + node.getName());
this.assignments.put(task, node);
node.addTask(task);
assigned = true;
Expand All @@ -211,6 +212,7 @@ private synchronized boolean unassign(ClientTask task) {
boolean unassigned = false;
Node previousAssignee = this.assignments.remove(task);
if (previousAssignee != null) {
System.out.println("Node " + previousAssignee.getName() + " was unassigned from task " + task.getTaskId());
unassigned = true;
previousAssignee.getCurrentTasks().remove(task);
}
Expand All @@ -227,7 +229,6 @@ public boolean taskUpdated(ClientTask task, Node n) {
listener.handle(new Event(EventEnum.JOB_ENCODING_COMPLETED, job));
}
break;
case TASK_TODO:
case TASK_CANCELED:
unassign(task);
task.getProgress().reset();
Expand All @@ -244,8 +245,9 @@ public boolean taskUpdated(ClientTask task, Node n) {
task.getProgress().reset();
n.failure(); // Add a failure count to the node
break;
case TASK_TODO:
break;
}
updateNodesWork();
return false;
}

Expand Down Expand Up @@ -294,8 +296,7 @@ public void handle(Event event) {
public void unassingAll(Node n) {
ArrayList<ClientTask> tasks = n.getCurrentTasks();
for (ClientTask clientTask : tasks) {
assignments.remove(clientTask);
clientTask.getProgress().reset();
unassign(clientTask);
}
n.getCurrentTasks().clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private void dispatch(DispatchItem item) {
ClusterProtocol handled = null;
try (Socket socket = new Socket()) {
InetSocketAddress addr = new InetSocketAddress(node.getNodeAddress(), node.getNodePort());
socket.setSoTimeout(2000);
socket.connect(addr, 2000);
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
out.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.lancoder.common.pool.Pool;
import org.lancoder.common.pool.PoolWorker;
import org.lancoder.common.task.ClientTask;
import org.lancoder.master.NodeManager;

public class DispatcherPool extends Pool<DispatchItem> implements DispatcherListener {

Expand Down
27 changes: 17 additions & 10 deletions src/main/java/org/lancoder/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,28 @@ private ArrayList<ClientTask> getCurrentTasks() {
return this.node.getCurrentTasks();
}

public synchronized boolean startWork(ClientTask t) {
System.out.println("Got task " + t.getTaskId());
getCurrentTasks().add(t);
if (t instanceof ClientVideoTask && videoPool.hasFreeConverters()) {
ClientVideoTask vTask = (ClientVideoTask) t;
public boolean startWork(ClientTask task) {
System.out.println("Received task " + task.getTaskId() + " from master...");
boolean accepted = true;
getCurrentTasks().add(task);
if (task instanceof ClientVideoTask && videoPool.hasFreeConverters()) {
ClientVideoTask vTask = (ClientVideoTask) task;
videoPool.handle(vTask);
} else if (t instanceof ClientAudioTask && this.audioPool.hasFreeConverters() && videoPool.hasFreeConverters()) {
} else if (task instanceof ClientAudioTask && this.audioPool.hasFreeConverters()
&& videoPool.hasFreeConverters()) {
// video pool must also be free
ClientAudioTask aTask = (ClientAudioTask) t;
ClientAudioTask aTask = (ClientAudioTask) task;
audioPool.handle(aTask);
} else {
return false;
getCurrentTasks().remove(task);
accepted = false;
System.out.println("Refused task " + task.getTaskId());
}
if (accepted) {
System.out.println("Accepted task " + task.getTaskId());
task.getProgress().start();
updateStatus(NodeState.WORKING);
}
t.getProgress().start();
updateStatus(NodeState.WORKING);
return true;
}

Expand Down

0 comments on commit d16372c

Please sign in to comment.