-
Notifications
You must be signed in to change notification settings - Fork 1
/
munge.py
132 lines (110 loc) · 3.85 KB
/
munge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from math import log1p
from datetime import datetime
from pyspark.sql.functions import when
from myLib import *
def run(root, smaster, train_file_name, test_file_name, ask):
makeDirs()
f = os.path.abspath(__file__)
root, train_path, test_path, spark = \
configure(root, smaster, train_file_name, test_file_name, f, ask)
X = munge(train_path, spark)
X.show()
save_munged(X, train_file_name, root)
X = mungeNoLabel(test_path, spark)
X.show()
save_munged(X, test_file_name, root)
def munge(path, spark):
df = spark.read.csv(path, header=True, inferSchema=True)
print('aggregating columns'.upper())
cols = ['cntX', 'avgX', 'avg2X', '1plogavgX', df.columns[-1]]
df = df.rdd.map(rowaggs).toDF(cols)
print('labeling outliers'.upper())
for col_name in df.columns[1:-1]:
df = df.withColumn('%s_O' % col_name, nameOuts(df,col_name, 1.5))
cols.insert(4, 'O')
df= df.rdd.map(lambda x:(x[0], x[1], x[2], x[3], sum(x[5:]), x[4]))\
.toDF(cols)
print('balancing classes'.upper())
df = balance_classes(df)
return df
def rowaggs(x):
r = filter(None, x[1:-1])
avrg = 0
if len(r) != 0:
avrg = sum(r)/len(r)
return len(r), avrg, avrg**2, log1p(abs(avrg)), x[-1]
def mungeNoLabel(path, spark):
df = spark.read.csv(path, header=True, inferSchema=True)
print('aggregating row'.upper())
cols = [df.columns[0], 'cntX', 'avgX', 'avg2X', '1plogavgX']
df = df.rdd.map(rowaggsNoLabel).toDF(cols)
print('labeling outliers'.upper())
for col_name in df.columns[2:]:
df = df.withColumn('%s_O' % col_name, nameOuts(df,col_name, 1.5))
cols.append('O')
df = df.rdd.map(lambda x:(x[0], x[1], x[2], x[3], x[4],sum(x[5:8])))\
.toDF(cols)
return df
def rowaggsNoLabel(x):
r = filter(None, x[1:])
avrg = 0
if len(r) != 0:
avrg = sum(r)/len(r)
return x[0], len(r), avrg, avrg**2, log1p(abs(avrg))
def nameOuts(df, col_name, iqrx):
quants = df.approxQuantile([col_name],[.25,.75],0)
q1, q3 = quants[0][0], quants[0][1]
iqr = q3 - q1
lb = q1 - iqrx * iqr
ub = q3 + iqrx * iqr
return when((df[col_name]<lb) | (df[col_name]>ub),1).otherwise(0)
def multi_nameOuts(df, iqrx, label=True):
# USE approxQuantile() TO CALCULATE THE IQR PER COLUMN AND LABEL OUTS
start = 1
end = -1
if not label:
start = 2
end = None
def inner(dataframe):
for col_name in df.columns[start:end]:
dataframe = dataframe\
.withColumn('%s_O' % col_name, nameOuts(col_name, iqrx))
return dataframe
return inner
def balance_classes(df):
# OVERSAMPLING SPECIFICALLY TO ADDRESS CLASS IMBALANCE OF BOSCHE DATA
'''
fraction argument in .sample() misbehaves
if it didn't should be able to return without while loop
'''
c0 = df.filter(df.Response==0).count()
c1 = df.filter(df.Response==1).count()
diff = float(abs(c0 - c1))
lrgrClss = max(c0, c1)
smlrClss = min(c0, c1)
if smlrClss == 0:
smlrClss = 1
x = diff / smlrClss
f_label = 0
if c0 > c1:
f_label = 1
if x < .25:
return df
else:
while smlrClss+df.filter(df.Response==f_label)\
.sample(True, x, 42).count() < .9*lrgrClss:
x += x/2
return df.union(df.filter(df.Response==f_label).sample(True,x,42))
def save_munged(X, file_name, root):
dt = datetime.now().time()
munged_file_name = str(dt).replace(':', '_') + '_' + file_name
munged_path = root % munged_file_name
print('saving data >>> '.upper() + munged_path)
X.write.csv(munged_path, header=True)
if __name__ == '__main__':
ask = True
root = 'hdfs://ryans-macbook:9000/user/ryan/%s'
smaster = 'spark://ryans-macbook:7077'
train_fname = 'train_numeric.csv'
test_fname = 'test_numeric.csv'
run(root, smaster, train_fname, test_fname, ask)