forked from lucafon/hbase-object-mapper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CitizenReducer.java
26 lines (22 loc) · 1.12 KB
/
CitizenReducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.flipkart.hbaseobjectmapper.testcases.mr.samples;
import com.flipkart.hbaseobjectmapper.HBObjectMapper;
import com.flipkart.hbaseobjectmapper.testcases.entities.CitizenSummary;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import java.io.IOException;
public class CitizenReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {
private final HBObjectMapper hbObjectMapper = new HBObjectMapper();
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0, count = 0;
for (IntWritable value : values) {
sum += value.get();
count++;
}
float averageAge = (float) sum / (float) count;
CitizenSummary citizenSummary = new CitizenSummary();
citizenSummary.setAverageAge(averageAge);
context.write(hbObjectMapper.getRowKey(citizenSummary), hbObjectMapper.writeValueAsPut(citizenSummary));
}
}