forked from rcmccartney/mapreduce
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MasterJob.java
110 lines (93 loc) · 3.75 KB
/
MasterJob.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package mapreduce;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MasterJob<K extends Serializable, V> {
protected Mapper<K, V> currentJob;
protected HashMap<K, Integer> keyCounts;
protected HashMap<Integer, K> keyWorker;
protected Master master;
protected Map<K, List<Integer>> key_workers_map = new HashMap<>(); //Map b/w key and list of Worker Ids it came from
protected int wCount = 0; //to keep track of number of workers who have sent keys
protected int wShuffleCount = 0; //keeps track of # of workers who finished mapping
//Map b/w WorkerId (Integer) and List of Transfer Messages for this workerId i.e List<<Key, AddressOfWorkerPeer>>
protected Map<Integer, List<Object[]>> worker_messages_map = new HashMap<>();
public MasterJob(Mapper<K, V> mr, Master master) {
this.master = master;
currentJob = mr;
keyCounts = new HashMap<>();
keyWorker = new HashMap<>();
}
public synchronized void receiveWorkerKey(byte[] barr, int id) {
byte[] bInt = new byte[4];
byte[] keyArr = new byte[barr.length-4]; //subtract last 4 for integer
System.arraycopy(barr, 0, keyArr, 0, barr.length-4);
System.arraycopy(barr, barr.length-4, bInt, 0, 4);
K key = currentJob.readBytes(keyArr);
aggregate(key, Utils.byteArrayToInt(bInt));
keyWorker.put(id, key);
//aggregate key_workers_map
if(key_workers_map.containsKey(key)){
key_workers_map.get(key).add(id); // append worker ID its corresponding key mapping
} else {
List<Integer> list = new ArrayList<>();
list.add(id);
key_workers_map.put(key, list);
}
}
public void aggregate(K key, int count) {
if (keyCounts.containsKey(key))
keyCounts.put(key, keyCounts.get(key)+count);
else
keyCounts.put(key, count);
}
public void setKeyTransferComplete(int id) {
for(K key: keyCounts.keySet())
System.out.println("Key: " + key + " Value: " + keyCounts.get(key));
//TODO: change wCount to compare with current actual # of workers, since keys can be less total # of workers in the system
++wCount;
if(wCount == master.workerQueue.size()){ // master now has all the keys from the workers
coordinateKeysOnWorkers();
}
}
public void receiveWorkerResults(byte[] barr) {
// TODO Auto-generated method stub
}
public synchronized void coordinateKeysOnWorkers(){
System.out.println("Coodinate Keys called");
int numOfKs = key_workers_map.keySet().size();
int numOfWs = master.workerQueue.size();
int incr = 1;
if(numOfKs > numOfWs){
incr = (numOfKs % numOfWs == 0) ? (numOfKs / numOfWs) : ((numOfKs / numOfWs)+1);
}
int kIdx = 0, wQIdx = 0;// WorkerConnection currWoker = master.workerQueue.get(0);
for (K key : key_workers_map.keySet()){
Object[] transferMessage = new Object[]{key, //contains key, ipaddress and port to send
master.workerQueue.get(wQIdx).clientSocket.getInetAddress().getHostAddress(),
master.workerIDAndPorts.get(master.workerQueue.get(wQIdx).getId())};
for (Integer wId : key_workers_map.get(key)){
//if(wId == master.workerQueue.get(wQIdx).id)//if its the worker to which the key's assigned, then skip
//continue;
if(worker_messages_map.containsKey(wId)){
worker_messages_map.get(wId).add(transferMessage);
} else {
List<Object[]> messages = new ArrayList<Object[]>();
messages.add(transferMessage);
worker_messages_map.put(wId, messages);
}
}
if ((++kIdx) % incr == 0){
++wQIdx;
}
} //worker_messages_map populated
for (Map.Entry<Integer, List<Object[]>> entry : worker_messages_map.entrySet()){
WorkerConnection wc = master.getWCwithId(entry.getKey());
wc.writeWorker(Utils.M2W_COORD_KEYS);
wc.writeObjToWorker(entry.getValue());
}
}
}