-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpcmci_linear_para.py
84 lines (70 loc) · 2.55 KB
/
pcmci_linear_para.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import numpy as np
from tigramite import data_processing as pp
from tigramite.independence_tests import RCOT
from tigramite.independence_tests import ParCorr
from tigramite.pcmci import PCMCI
def pcmci_causality(data, dt, index, headers, T_data, N_data, maxlag):
T = T_data
N = N_data
tau_max = maxlag
# Verbosity:
# 0 - nothing
# 1 - final graph only
# 2 - everything
verbose_max = 2
verbose = 2
print("======")
# print(list(data)) # got 100 records as itertools.chain object, not numpy df
data = np.array(list(data))
# data = np.fromiter(data, float)
# print(data)
# Initialize dataframe object, specify time axis and variable names
dataframe = pp.DataFrame(data, datatime=dt, var_names=headers)
print(dataframe.var_names)
parcorr = ParCorr(significance='analytic')
pcmci = PCMCI(dataframe=dataframe, cond_ind_test=parcorr, verbosity=1)
# correlations = pcmci.get_lagged_dependencies(tau_max=tau_max)
pcmci.verbosity = 1
results = pcmci.run_pcmci(tau_max=tau_max, pc_alpha=None)
# Print results
print("p-values")
print(results['p_matrix'].round(3))
print("MCI partial correlations")
print(results['val_matrix'].round(2))
# print("inside def pcmci_causality")
# output edges
result_arr = []
# result_arr.append(["effect","cause"])
for index_cause, item in enumerate(results['p_matrix']):
print("index is")
print(index)
print("item is")
print(item)
print("cause is")
cause = headers[index_cause]
print(headers[index_cause])
for index_effect, arr in enumerate(item):
print("effect arr is ")
print(arr)
print("effect name is")
effect = headers[index_effect]
print(headers[index_effect])
for arrItem in arr:
if arrItem < 0.05 and cause != effect:
result_arr.append([effect, cause, index])
print("{} caused by {}".format(effect, cause))
break
with open("pcmci_linear_para_out{}.csv".format(index), "w", newline='') as f:
for row in result_arr:
f.write("%s\n" % ','.join(str(col) for col in row))
# print(pcmci)
return result_arr
def run_pcmci(maxlag, rdd, header, dt, t, n):
T = t
N = n
res = rdd.mapPartitionsWithIndex(
lambda i, iterator: pcmci_causality(iterator, dt, i, header, T, N, maxlag)).collect()
# res = rdd.map(mult).collect()
print("!!!!!!!!!!")
print(res)
return res