-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdbutils.py
123 lines (84 loc) · 3.33 KB
/
dbutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import sqlite3
import os
class TraceDBError(Exception):
pass
class TraceDB:
def __init__(self, bv, dbpath):
self.should_translate = False
self.sqlite_handle = None
self.map_low = -1
self.map_high = -1
# Getting the segment offset if needed
self.exec_off = -1
if len(bv.segments) == 0:
raise TraceDBError("No segments in file")
if bv.segments[0].start == 0:
self.should_translate = True
# Does not handle multiple executable segments for now
for seg in bv.segments:
if seg.executable:
self.exec_off = seg.start
break
if self.exec_off < 0:
raise TraceDBError("Could not find executable segment")
# Loading the db
self.branch_count, self.mapping_count, self.hitcount_count = self._load(dbpath)
file_basename = os.path.basename(bv.file.original_filename)
if self.should_translate:
mappings = self.get_mappings()
for m in mappings:
if m[0].endswith(file_basename):
self.map_low = m[1]
self.map_high = m[2]
break
if self.map_low < 0:
raise TraceDBError("Could not find matching segment in trace")
def _load(self, dbpath):
self.sqlite_handle = sqlite3.connect(dbpath)
# Computing count
c = self.sqlite_handle.cursor()
c.execute("SELECT COUNT(*) FROM branches;")
branch_count = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM mappings;")
mapping_count = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM hitcounts;")
hitcount_count = c.fetchone()[0]
# Getting the addresses
c.close()
return (branch_count, mapping_count, hitcount_count)
def _pie_to_phys(self, addr):
if self.should_translate:
# Should not happen but we never know
if addr < self.map_low or addr >= self.map_high:
return addr
return (addr - self.map_low) + self.exec_off
return addr
def _phys_to_pie(self, addr):
if self.should_translate:
return (addr - self.exec_off) + self.map_low
return addr
def get_xrefs_from(self, addr):
c = self.sqlite_handle.cursor()
xrefs = {}
for xref in c.execute("SELECT destination FROM branches WHERE source=?;", (self._phys_to_pie(addr),)):
xref = self._pie_to_phys(xref[0])
if xref in xrefs:
xrefs[xref] += 1
else:
xrefs[xref] = 1
c.close()
xrefs = sorted(xrefs.items(), key=lambda a: a[1], reverse=True)
return list(xrefs)
def get_hitcounts(self):
c = self.sqlite_handle.cursor()
hitcounts = []
for address, hitcount in c.execute("SELECT address,hitcount FROM hitcounts ORDER BY hitcount DESC;"):
hitcounts.append((self._pie_to_phys(address), hitcount))
c.close()
return hitcounts
def get_mappings(self):
c = self.sqlite_handle.cursor()
mappings = []
for filename, start, end in c.execute("SELECT filename, start, end FROM mappings;"):
mappings.append((filename, start, end))
return mappings