forked from insightbook/data-science-from-scratch
-
Notifications
You must be signed in to change notification settings - Fork 1
/
ch24_mapreduce.py
189 lines (143 loc) · 5.55 KB
/
ch24_mapreduce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from __future__ import division
import math, random, re, datetime
from collections import defaultdict, Counter
from functools import partial
from naive_bayes import tokenize
def word_count_old(documents):
"""word count not using MapReduce"""
return Counter(word
for document in documents
for word in tokenize(document))
def wc_mapper(document):
"""for each word in the document, emit (word,1)"""
for word in tokenize(document):
yield (word, 1)
def wc_reducer(word, counts):
"""sum up the counts for a word"""
yield (word, sum(counts))
def word_count(documents):
"""count the words in the input documents using MapReduce"""
# place to store grouped values
collector = defaultdict(list)
for document in documents:
for word, count in wc_mapper(document):
collector[word].append(count)
return [output
for word, counts in collector.iteritems()
for output in wc_reducer(word, counts)]
def map_reduce(inputs, mapper, reducer):
"""runs MapReduce on the inputs using mapper and reducer"""
collector = defaultdict(list)
for input in inputs:
for key, value in mapper(input):
collector[key].append(value)
return [output
for key, values in collector.iteritems()
for output in reducer(key,values)]
def reduce_with(aggregation_fn, key, values):
"""reduces a key-values pair by applying aggregation_fn to the values"""
yield (key, aggregation_fn(values))
def values_reducer(aggregation_fn):
"""turns a function (values -> output) into a reducer"""
return partial(reduce_with, aggregation_fn)
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))
#
# Analyzing Status Updates
#
status_updates = [
{"id": 1,
"username" : "joelgrus",
"text" : "Is anyone interested in a data science book?",
"created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
"liked_by" : ["data_guy", "data_gal", "bill"] },
# add your own
]
def data_science_day_mapper(status_update):
"""yields (day_of_week, 1) if status_update contains "data science" """
if "data science" in status_update["text"].lower():
day_of_week = status_update["created_at"].weekday()
yield (day_of_week, 1)
data_science_days = map_reduce(status_updates,
data_science_day_mapper,
sum_reducer)
def words_per_user_mapper(status_update):
user = status_update["username"]
for word in tokenize(status_update["text"]):
yield (user, (word, 1))
def most_popular_word_reducer(user, words_and_counts):
"""given a sequence of (word, count) pairs,
return the word with the highest total count"""
word_counts = Counter()
for word, count in words_and_counts:
word_counts[word] += count
word, count = word_counts.most_common(1)[0]
yield (user, (word, count))
user_words = map_reduce(status_updates,
words_per_user_mapper,
most_popular_word_reducer)
def liker_mapper(status_update):
user = status_update["username"]
for liker in status_update["liked_by"]:
yield (user, liker)
distinct_likers_per_user = map_reduce(status_updates,
liker_mapper,
count_distinct_reducer)
#
# matrix multiplication
#
def matrix_multiply_mapper(m, element):
"""m is the common dimension (columns of A, rows of B)
element is a tuple (matrix_name, i, j, value)"""
matrix, i, j, value = element
if matrix == "A":
for column in range(m):
# A_ij is the jth entry in the sum for each C_i_column
yield((i, column), (j, value))
else:
for row in range(m):
# B_ij is the ith entry in the sum for each C_row_j
yield((row, j), (i, value))
def matrix_multiply_reducer(m, key, indexed_values):
results_by_index = defaultdict(list)
for index, value in indexed_values:
results_by_index[index].append(value)
# sum up all the products of the positions with two results
sum_product = sum(results[0] * results[1]
for results in results_by_index.values()
if len(results) == 2)
if sum_product != 0.0:
yield (key, sum_product)
if __name__ == "__main__":
documents = ["data science", "big data", "science fiction"]
wc_mapper_results = [result
for document in documents
for result in wc_mapper(document)]
print "wc_mapper results"
print wc_mapper_results
print
print "word count results"
print word_count(documents)
print
print "word count using map_reduce function"
print map_reduce(documents, wc_mapper, wc_reducer)
print
print "data science days"
print data_science_days
print
print "user words"
print user_words
print
print "distinct likers"
print distinct_likers_per_user
print
# matrix multiplication
entries = [("A", 0, 0, 3), ("A", 0, 1, 2),
("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)
print "map-reduce matrix multiplication"
print "entries:", entries
print "result:", map_reduce(entries, mapper, reducer)