forked from rcmccartney/mapreduce
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Job.java
169 lines (150 loc) · 4.9 KB
/
Job.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package mapreduce;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Job<K extends Serializable, V> {
protected Worker worker;
protected Mapper<K, V> mr;
protected HashMap<K, List<V>> mapOutput;
protected HashMap<K, V> finalOut;
protected String[] files;
public Job(Worker worker, Mapper<K, V> mr, String...strings) {
this.worker = worker;
this.mr = mr;
this.mr.setJob(this);
files = strings;
mapOutput = new HashMap<>();
finalOut = new HashMap<>();
}
public void begin() {
// since this is reading from a file don't do it in parallel,
// speed limit is reading from disk
for (final String filename : files)
// convenience function provided if user doesn't want to call 'emit'
emit(mr.map(new File(filename)));
// now the output map has been populated, so it needs to be shuffled and sorted
// first notify Master of the keys you have at this node, and their sizes
sendAllKeysToMaster();
}
public void reduce() {
ArrayList<Thread> thrs = new ArrayList<>();
for(final K key: mapOutput.keySet()) {
thrs.add(new Thread(new Runnable() {
public void run() {
finalOut.put(key, mr.reduce(key, mapOutput.get(key)));
}
}));
thrs.get(thrs.size()-1).start();
}
// wait for all the threads to finish
for(Thread t: thrs){
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// TODO this is testing code to be removed,
for( K key : finalOut.keySet() ) {
System.out.println("key: " + key + " result: " + finalOut.get(key));
}
//finalOut holds the results of this MR job, send it to Master
//worker.writeMaster(Utils.W2M_RESULTS);
sendResults();
}
public void emit(K key, V value) {
if (mapOutput.containsKey(key))
mapOutput.get(key).add(value);
else {
List<V> l = new ArrayList<>();
l.add(value);
mapOutput.put(key, l);
}
}
public void emit(HashMap<K, V> tmp) {
if (tmp == null)
return;
for(K key: tmp.keySet())
emit(key, tmp.get(key));
}
public void receiveKVAndAggregate (Object k, Object v){
//wMinusOneCount++;
K key = (K)k;
List<V> valList = (List<V>) v;
if(mapOutput.containsKey(key)){
mapOutput.get(key).addAll(valList);
} else {
mapOutput.put(key, valList);
}
//System.out.println("Recieved from Worker: ");
//System.out.println("Key: " + key);
//System.out.println("Key's: type" + key.getClass().getName());
System.out.println("List<Value>: " + valList);
}
public void receiveKeyAssignments() {
// need key, ip address, and port from Master to fwd your values there
// K key = mr.parse("F");
// TODO
// Then send all the P2P traffic...
// P2P traffic should remove values from output as it is sent and add to it as it
// is received,
//then once P2P is fished call this.reduce()
// so output has all the key - list of Values
try {
ObjectInputStream objInStream = new ObjectInputStream(worker.in);
List<Object[]> keyTransferMsgs = (List<Object[]>) objInStream.readObject();
System.out.println("Received TMsgs: " + keyTransferMsgs);
//objInStream.close();
//List<Object[]> msgList = (List<Object[]>)Utils.gson.fromJson(br.readLine(), List.class);
for (Object[] o : keyTransferMsgs){
K k = (K)o[0];
String peerAddress = (String) o[1];
Integer peerPort = (Integer) o[2]; //testing on same machine
List<V> v = mapOutput.get(k);
//System.out.println("key" + k);
//System.out.println("peeraddes" + peerAddress);
//System.out.println("pp" + peerPort);
mapOutput.remove(k); //so that only keys assigned to this worker are left in mapOutput
worker.wP2P.send(k, v, peerAddress, peerPort); //sends key and its value list as object[]
}
//A worker sends this message, so that master can keep track of workers who are ready for reduce
worker.writeMaster(Utils.W2M_KEYSHUFFLED);
} catch (IOException | ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendAllKeysToMaster() {
byte[] data;
for (K key: mapOutput.keySet()) {
worker.writeMaster(Utils.W2M_KEY);
data = Utils.concat(mr.getBytes(key),
Utils.intToByteArray(mapOutput.get(key).size()));
worker.writeMaster(data);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
worker.writeMaster(Utils.W2M_KEY_COMPLETE);
System.out.println("Keys transferred to Master");
}
public void sendResults() {
// TODO send results to Master
for (Map.Entry<K, V> e : finalOut.entrySet()){
worker.writeMaster(Utils.W2M_RESULTS);
System.out.println("Sent results to master");
worker.writeObjToMaster(new Object[]{e.getKey(), e.getValue()});
}
worker.writeMaster(Utils.W2M_JOBDONE);
}
public void stopExecution() {
}
}