-
Notifications
You must be signed in to change notification settings - Fork 1
/
function_analysis.py
160 lines (133 loc) · 5.2 KB
/
function_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import csv
from datetime import datetime
import json
import os
import subprocess
# Debug flag
DEBUG = True
# File paths (globals)
current_working_dir = os.getcwd()
semgrep_dir = "semgrep_analysis_input"
input_fn_dir = semgrep_dir + "/sca_analysis_fns"
now = datetime.now()
date_time = now.strftime("%m-%d-%Y_%H:%M")
testing_output_dir = "testing-output-" + date_time
tmp_output_file_name = current_working_dir + "/" + testing_output_dir + "/tmp_output.txt"
# Invalid function names
invalid_fn_names = ["foreach"]
################################################################
# HELPER FUNCTIONS
################################################################
def initial_setup():
subprocess.run("mkdir " + testing_output_dir, cwd=current_working_dir, shell=True)
################################################################
# PROCESSING FUNCTIONS
################################################################
def get_extn_name(file):
return file[:-7]
def get_fn_info(result_obj):
if "extra" in result_obj:
extra_obj = result_obj["extra"]
if "metavars" in extra_obj:
metavars_obj = extra_obj["metavars"]
if "$FUNC" in metavars_obj:
func_obj = metavars_obj["$FUNC"]
if "abstract_content" in func_obj:
fn_name = func_obj["abstract_content"]
if "start" in func_obj:
start_obj = func_obj["start"]
if "line" in start_obj:
return fn_name, start_obj["line"]
raise RuntimeError("Could not extract function name from results object")
def get_semgrep_command(source_file, yml_file):
semgrep_command = "semgrep scan --json"
semgrep_command += " --config=" + semgrep_dir + "/" + yml_file
semgrep_command += " --output=" + tmp_output_file_name
full_file_name = current_working_dir + "/" + input_fn_dir + "/" + source_file
semgrep_command += " " + full_file_name
return semgrep_command
# This function exists so that random segfaults in semgrep are handled properly.
def run_semgrep_command(semgrep_command):
if DEBUG:
print(semgrep_command)
while True:
res = subprocess.run(semgrep_command, shell=True, cwd=current_working_dir, capture_output=True)
if "Segmentation fault (core dumped)" not in res.stdout.decode('utf-8') and os.path.isfile(tmp_output_file_name):
break
def process_semgrep_results():
results = open(tmp_output_file_name, "r")
results_json = json.load(results)
results.close()
subprocess.run("rm " + tmp_output_file_name, shell=True, cwd=current_working_dir, capture_output=True)
return results_json
def store_state_results(extn_name, results_json):
results_file_name = extn_name + "_state_results.json"
subprocess.run(
"touch " + results_file_name,
shell=True,
cwd=current_working_dir + "/" + testing_output_dir,
capture_output=True)
with open(testing_output_dir + "/" + results_file_name, "w") as results_file:
json.dump(results_json, results_file, indent=2)
results_file.close()
def get_line_start(fn_obj):
metavars = fn_obj["extra"]["metavars"]
func_start = metavars["$FUNC"]["start"]["line"]
ty_start = metavars["$TY"]["start"]["line"]
return min(func_start, ty_start)
def get_line_end(fn_obj):
return fn_obj["end"]["line"]
def get_csv_output(file):
extn_name = get_extn_name(file)
print("Running function analysis on " + extn_name)
# Get number of copied functions
fn_semgrep_command = get_semgrep_command(file, "function.yml")
run_semgrep_command(fn_semgrep_command)
results_json = process_semgrep_results()
fns_list = results_json["results"]
num_copied_fns = len(fns_list)
total_loc = 0
for fn_obj in fns_list:
if DEBUG:
print(json.dumps(fn_obj, indent=2))
total_loc += get_line_end(fn_obj) - get_line_start(fn_obj) + 1
# Get number of state modified functions + instances
state_semgrep_command = get_semgrep_command(file, "state.yml")
run_semgrep_command(state_semgrep_command)
results_json = process_semgrep_results()
store_state_results(extn_name, results_json)
state_list = results_json["results"]
num_state_instances = len(state_list)
state_fns_set = set()
for robj in state_list:
fn_name, fn_line = get_fn_info(robj)
if DEBUG:
print(fn_name)
print(fn_line)
if fn_name not in invalid_fn_names:
state_fns_set.add((fn_name, fn_line))
num_state_fns = len(state_fns_set)
return extn_name, total_loc, num_copied_fns, num_state_fns, num_state_instances
################################################################
# MAIN ROUTINE
################################################################
if __name__ == '__main__':
initial_setup()
test_files_dir = current_working_dir + "/" + input_fn_dir
# Create CSV
csv_file = open("function_analysis.csv", "w")
csv_file_writer = csv.writer(csv_file)
csv_file_writer.writerow([
"Extension Name",
"Total LOC",
"Num Copied Fns",
"Num Copied + Modified Fns",
"Num State Modifications"
])
file_list = os.listdir(test_files_dir)
file_list.sort()
for file in file_list:
extn_name, num_lines, copied_fns, state_fns, state_instances = get_csv_output(file)
output_list = [extn_name, str(num_lines), str(copied_fns), str(state_fns), str(state_instances)]
csv_file_writer.writerow(output_list)
csv_file.close()