-
Notifications
You must be signed in to change notification settings - Fork 10
/
arcticdb.py
208 lines (155 loc) · 5.94 KB
/
arcticdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
""""Interface between the JS Download script and the Database storage"""
import os
from datetime import datetime as dt
import time
import json
import argparse
from Naked.toolshed.shell import execute_js, muterun_js
import pandas as pd
from database import instance as db
from tools import exp_to_int
blockSeries = 5000
attemptsThreshold = 10
logsSeparate = False
parseToInt = False
tempFilename = 'data/temp.json'
dataDownloaderScript = '--max-old-space-size=16384 data-downloader.js'
def loadRawData(filepath):
start = time.time()
try:
with open(filepath) as json_data:
loadedData = json.load(json_data)
print("Loading the data took "+str(time.time() - start)+" seconds")
return loadedData
except FileNotFoundError:
return None
def convertTimestamp(x):
if 'date' in x:
key = 'date'
elif 'time' in x:
key = 'time'
else:
raise ValueError('Unsupported timestamp format in given data %s.' % x.keys())
x['date'] = dt.utcfromtimestamp(x.pop(key, None)) #remove the old key, convert to date and replace it with 'date'
def parseInt(data):
if type(data) == dict:
for key in data: #parse to int
if type(data[key]) == str:
try:
base = 10
if data[key].startswith('0x'):
base = 16
data[key] = int(data[key], base=base)
except ValueError:
pass
elif type(data) == list:
for i, val in enumerate(data):
if type(val) == str:
try:
base = 10
if val.startswith('0x'):
base = 16
data[i] = int(val, base=base)
except ValueError:
pass
def processRawCourseData(data):
start = time.time()
for x in data:
convertTimestamp(x)
def downloadCourse():
callDataDownloaderCourse(tempFilename)
data = loadRawData(tempFilename) #get it
os.remove(tempFilename)
print("Downloaded data with length "+str(len(data))+" ticks") #debug
processRawCourseData(data) #process a bit to make it suitable for storage
df = pd.DataFrame(data)
df.set_index('date', inplace=True)
print(df.head())
print(df.index[0], type(df.index[0]))
db.save('tick', df) # save to db
def callDataDownloaderCourse(filename):
success = execute_js(dataDownloaderScript, '--course --filename '+filename)
if not success:
print("Failed to execute js")
def callDataDownloaderBlockchain(start, count, filename):
try:
success = execute_js(dataDownloaderScript, '--blockchain '+str(start)+' '+str(count)+' --filename '+filename)
except OSError:
#likely an out-of-memory error. Return and try again later
return None
if not success:
print("Failed to execute js")
def downloadBlockchain(start=0, targetBlock=None):
"""Calls the JS script to download a certain block range and saves the result in the DB"""
currentBlock = getLatestBlock() + 1 #add 1 for the next block to download
if currentBlock < 0:
currentBlock = start
series = blockSeries
if targetBlock is None:
targetBlock = 5528000-series #TODO: Have automatic detection of latest block
if series > targetBlock - currentBlock:
series = targetBlock - currentBlock
print("Starting to download blocks after", currentBlock, " and with target ", targetBlock)
attempts = 0
while currentBlock < targetBlock:
nextTargetBlock = currentBlock + series-1
print('Calling js to download '+str(series)+' blocks from '+str(currentBlock))
callDataDownloaderBlockchain(currentBlock, nextTargetBlock, tempFilename)
data = loadRawData(tempFilename) #get it
if data is None:
print("Failed reading", tempFilename, ", redownloading...")
attempts += 1
if attempts > attemptsThreshold:
raise RuntimeError("Too many failed data-downloader calls, aborting operation.")
time.sleep(30 * attempts) #delay before retrying. Most issues are solved that way.
continue
attempts = 0
os.remove(tempFilename)
data = processRawBlockchainData(data)
for key in data:
if data[key]:
df = pd.DataFrame(data[key])
if key == 'trace':
for k in ['gasUsed', 'gas']:
fr = df.iloc[0][k]
print(fr, type(fr))
df.set_index('date', inplace=True)
if key == 'block':
#TODO: Ugly fix
#the following keys are problematic. Due to a bug in the JS implementation,
#some values may be in exponential form, which is more string chars than the max allowance of the DB
#so we need to parse them to normal numbers and then convert to string
for k in ['totalDifficulty', 'difficulty']: #problematic keys
df[k] = df[k].map(lambda x: str(exp_to_int(x)))
db.save(key, df)
currentBlock += series
def getBlockchainFile(arg1, arg2): #the resulting file from the download script should match the requested arguments
return 'data/blocks '+str(arg1)+'-'+str(arg2)+'.json'
def processRawBlockchainData(data):
for key in data: #for each time series
for el in data[key]: #each element in the time series
convertTimestamp(el) #transfer UNIX timestamp to date object, used to filter and manage the DB
if parseToInt:
parseInt(el)
return data
def getLatestBlock():
try:
tmp = db.getLatestRow('block') #get a dataframe with only the latest row
num = tmp.values[0, tmp.columns.searchsorted('number')] #extract the block number from it
return num
except:
return -1
if __name__ == "__main__": #if this is the main file, parse the command args
parser = argparse.ArgumentParser(description="Module that downloads and stores blockchain and course data.")
parser.add_argument('--course', dest='course', action="store_true", help="Downloads and saves or upgrades historical course data.")
parser.add_argument('--blockchain', dest='blockchain', action="store_true", help="Downloads and saves or upgrades blockchain data.")
parser.add_argument('--start', type=int, default=0, help='From which block to start downloading.')
parser.add_argument('--end', type=int, default=None, help='Until which block to download.')
parser.set_defaults(course=False)
parser.set_defaults(blockchain=False)
args, _ = parser.parse_known_args()
db.open()
if args.course: downloadCourse()
if args.blockchain:
downloadBlockchain(args.start, args.end)
db.close()