-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathproc.py
203 lines (141 loc) · 5.52 KB
/
proc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import numpy
import pandas
def pca(data, cov_corr=True, eig_var=0.90, eig_num=None):
"""
Calculates the Principal Component Analysis of a data matrix
Args:
data -- Assumes a pandas DataFrame. Can either be raw data or
a covariance / correlation matrix
cov_corr -- A bool set to True if the matrix data is a covariance
or correlation matrix and False if data is a normal
matrix of data. Defaults to True
eig_var -- A maximum value of the cumulative variance to keep.
Defaults to 90% or 0.90
eig_num -- Instead of setting the cumulative variance maximum, can
set eig_num to an integer value and explicitly return
eig_num eigenvalues and eigenvectors. Defaults to None
Returns:
eigenvalues -- A eig_num X 1 numpy vector of eigenvalues
eigenvectors -- A n X eig_num numpy array of eigenvectors
"""
u,s,vh = numpy.linalg.svd(data)
if cov_corr:
eigenvalues = s
else:
eigenvalues = s*s
if eig_num is None:
cumulative_variance = (eigenvalues / eigenvalues.sum()).cumsum()
for i,val in enumerate(cumulative_variance):
if val > eig_var:
eig_num = i
break
eigenvalues = eigenvalues[:eig_num].reshape(eig_num, 1)
eigenvectors = vh.T[:,:eig_num]
return eigenvalues, eigenvectors
def pca_window(data, window_length=60, eig_num=4, corr=True):
"""
Runs PCA on a rolling window basis on the supplied data matrix
Args:
data -- Assumes a pandas dataframe of raw data
window_length -- The size of each rolling window.
eig_num -- Integer that determines the number of eigenvectors to keep in
all windows
corr - A bool for determining whether to calculate PCA over the
covariance matrix or the correlation matrix
Returns:
eig_list -- A list that contains the eigenvalues, eigenvectors and
timestamps of the beginning and ending observation in the
window. Each entry in eig_list is a dictionary with the
following keys:
values - The eig_num x 1 vector of eigenvalues
vectors - the n x eig_num matrix of eigenvectors
begin - The timestamp for the first observation in the window
end - The timestamp for the last observation in the window
"""
t, n = data.shape
eig_list = list()
if corr:
for i in xrange(window_length, t - window_length):
temp_u, temp_s, temp_vh = numpy.linalg.svd(data[i:window_length+i].corr())
eig_list.append({'values': temp_s[:eig_num].reshape(eig_num,1),
'vectors': temp_vh.T[:,:eig_num],
'begin': data.ix[i].name,
'end': data.ix[window_length+i].name})
else:
for i in xrange(window_length, t - window_length):
temp_u, temp_s, temp_vh = numpy.linalg.svd(data[i:window_length+i].cov())
eig_list.append({'values': (temp_s*temp_s)[:eig_num].reshape(eig_num,1),
'vectors': temp_vh.T[:,:eig_num],
'begin': data.ix[i].name,
'end': data.ix[window_length+i].name})
return eig_list
def procrustes(target, source):
"""
Performs a procrustes rotation of source into target
Args:
target -- n x k numpy array of vectors
source -- n x k numpy array of vectors
Returns:
R -- k x k numpy array that is a rotation matrix
"""
if source.shape != target.shape:
raise RuntimeError('source, target arrays have different dimensions')
A = source
B = target
# TODO(alex): See if we need to flip signs manually for accuracy
M = numpy.dot(A.T, B)
u,s,vh = numpy.linalg.svd(M)
R = numpy.dot(u, vh)
return R
def prin_angles(p, q, num_angles=None):
"""
Calculates the principal angles between the vector subspaces p and q
Args:
p -- n x k numpy array of n-dimensional vectors
q -- n x k numpy array of n-dimensional vectors
num_angles -- An integer denoting the number of principal angles to
calculate. Defaults to k if p and q are n x k
Returns:
angles -- A k x 1 numpy array of the principal angles
"""
if p.shape != q.shape:
raise RuntimeError('p and q have different dimensions')
if num_angles is None:
num_angles = p.shape[1]
angles = numpy.zeros(num_angles)
for j in xrange(num_angles):
angles[j] = numpy.arccos(numpy.dot(p[:,j], q[:,j]) / (numpy.dot(p[:,j],p[:,j]) * numpy.dot(q[:,j],q[:,j])))
return angles.reshape(num_angles,1)
def print_vecs(vecs):
"""
Prints out the vectors in vecs in a pretty format
Args:
vecs -- A numpy array of vectors
Returns:
Nothing
"""
n,k = vecs.shape
for i in range(n):
print(' '.join(map(lambda x: '{:7.4f}'.format(x), vecs[i,:])))
return None
def frob_norm(A, B):
"""
Returns the Frobenius norm of A - B
Args:
A -- A numpy array
B -- A numpy array
Returns:
frob -- The Frobenius norm of the difference between A and B
"""
frob = numpy.linalg.norm(A - B)
return frob
def angle_measure(angles):
"""
Calculates the angular distance
Args:
angles -- k x 1 numpy vector of angles
Returns:
dist -- A float that represents the total angular distance
"""
dist = numpy.sqrt(reduce(lambda x,y: x + y, map(lambda x: x*x, angles)))
return dist