-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetcher.py
222 lines (198 loc) · 8.26 KB
/
fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
"""
file fetcher for cache manager client
"""
import os.path
import time
import threading
from shared import Message
from cmlogging import *
import settings
__version__ = "$Rev: 837 $"
__author__ = "[email protected] (David Rybach)"
__copyright__ = "Copyright 2012, RWTH Aachen University"
class PingThread (threading.Thread):
"""Send an empty message to the server to keep the connection
alive during long copies"""
def __init__(self, conn, interval):
threading.Thread.__init__(self)
self.conn = conn
self.interval = interval
self.finished = threading.Event()
def run(self):
while not self.finished.isSet():
try:
self.finished.wait(self.interval)
if not self.finished.isSet():
self.conn.sendMessage(Message(Message.PING, []))
except Exception as e:
warning("exception in PingThread: %s" % str(e))
break
def stop(self):
self.finished.set()
@staticmethod
def create(conn, config):
pt = PingThread(conn, config.SOCKET_TIMEOUT / 2)
pt.start()
return pt
class CacheFetcher:
def __init__(self, config, fileSystem, connection):
self.config = config
self.fileSystem = fileSystem
self.remoteSystem = settings.clientEnvironment().remoteFileSystem(config)
self.conn = connection
def brandFile(self, host, filename):
debug("brandFile: %s:%s" % (host, filename))
self.remoteSystem.brandFile(host, filename)
def checkRemote(self, originalFileInfo, host, filename):
debug("checkRemote: " + filename)
log("checking status of %s:%s" % (host, filename))
stat = self.remoteSystem.getFileStat(host, filename)
debug("stat: %s" % str(stat))
debug("original: %s" % str(originalFileInfo))
if stat == None:
return False
return (int(float(originalFileInfo[1])) == stat[0] and \
int(float(originalFileInfo[2])) == stat[1])
def checkLocal(self, originalFileInfo, filename):
debug("checkLocal " + filename)
if not os.path.isfile(filename):
debug("file not found")
return False
checkFileInfo = self.fileSystem.getFileInfo(filename)
debug("local: " + str(checkFileInfo))
return (int(originalFileInfo[1]) == int(checkFileInfo[1]) and \
int(float(originalFileInfo[2])) == int(float(checkFileInfo[2])))
def copyFromNode(self, fileinfo, host, filename, destination):
debug("copy from node: %s, %s, %s" % (host, filename, destination))
log("start copying %s:%s" % (host, filename))
pt = PingThread.create(self.conn, self.config)
copyOK, msg = self.remoteSystem.copyFile(host, filename, destination)
pt.stop()
del pt
if not copyOK:
log("%s" % msg)
error("cannot copy %s:%s to %s" % (host, filename, destination))
return False
else:
log("copied %s:%s" % (host, filename))
self.fileSystem.setATime(destination)
return True
def copyFromServer(self, fileinfo, destination):
filename = fileinfo[0]
debug("copy from server: %s, %s" % (filename, destination))
log("start copying %s" % filename)
try:
# shutil.copy2(filename, destination)
pt = PingThread.create(self.conn, self.config)
copyOk, msg = self.remoteSystem.copyUsingCp(filename, destination)
pt.stop()
del pt
if not copyOk:
raise Exception(msg)
log("copied %s" % filename)
self.fileSystem.setATime(destination)
return True
except Exception as e:
error("cannot copy %s to %s: %s" % (filename, destination, str(e)))
return False
def requestFile(self, fileinfo, destination, locateLimit=99999):
debug("requestFile: " + str(fileinfo))
fileserver = self.fileSystem.getFileServer(fileinfo[0])
debug("file server: " + fileserver)
return self.conn.sendMessage(Message(Message.REQUEST_FILE, fileinfo + [fileserver, destination, str(locateLimit)]))
def requestFileLocations(self, fileinfo, locateLimit=999999):
debug("requestFileLocations: %s, %d locations" % (str(fileinfo), locateLimit))
return self.conn.sendMessage(Message(Message.GET_LOCATIONS, fileinfo + [str(locateLimit)]))
def sendFileLocation(self, fileinfo, destination):
debug("sendFileLocation: %s, %s" % (str(fileinfo), destination))
r = self.conn.sendMessage(Message(Message.HAVE_FILE, fileinfo + [destination]))
debug(" => " + str(r))
def sendFileRemoved(self, fileinfo, destination):
debug("sendFileRemoved: %s, %s" % (str(fileinfo), destination))
r = self.conn.sendMessage(Message(Message.DELETED_COPY, fileinfo + [destination]))
debug(" => " + str(r))
def sendExit(self):
debug("sendExit")
r = self.conn.sendMessage(Message(Message.EXIT, []))
debug(" => " + str(r))
def sendKeepAlive(self):
debug("sendKeepAlive")
r = self.conn.sendMessage(Message(Message.KEEP_ALIVE, []))
debug(" => " + str(r))
def isActive(self, destination):
""" return: waiting time """
debug("isActive: %s" % destination)
r = self.conn.sendMessage(Message(Message.IS_ACTIVE, [destination]))
msg = self.conn.receiveMessage()
if not msg:
error("connection reset")
return 0
elif msg.type == Message.WAIT:
return int(msg.content[0])
else:
return 0
def handleMessage(self, fileinfo, destination, msg):
""" return (retFile, retval, terminate) """
debug("handleMessage %s" % str(msg))
debug("destination = %s" % destination)
retval = False
retFile = None
reply = None
terminate = False
if msg == None:
error("no connection to master")
retFile = fileinfo[0]
retval = True
elif msg.type == Message.CHECK_LOCAL:
if self.checkLocal(fileinfo, msg.content[0]):
reply = Message(Message.FILE_OK)
log("using existing copy %s" % msg.content[0])
retFile = msg.content[0]
retval = True
else:
reply = Message(Message.FILE_NOT_OK)
elif msg.type == Message.CHECK_REMOTE:
if self.checkRemote(fileinfo, msg.content[0], msg.content[1]):
self.brandFile(msg.content[0], msg.content[1])
reply = Message(Message.FILE_OK)
retFile = (msg.content[0], msg.content[1])
else:
reply = Message(Message.FILE_NOT_OK)
elif msg.type == Message.COPY_FROM_NODE:
if self.copyFromNode(fileinfo, msg.content[0], msg.content[1], destination):
reply = Message(Message.COPY_OK, [destination])
debug("OK")
retFile = destination
retval = True
else:
reply = Message(Message.COPY_FAILED)
elif msg.type == Message.COPY_FROM_SERVER:
if self.copyFromServer(fileinfo, destination):
reply = Message(Message.COPY_OK, [destination])
debug("OK")
retFile = destination
retval = True
else:
reply = Message(Message.COPY_FAILED, [])
elif msg.type == Message.FALLBACK:
log("no local cache available")
retFile = fileinfo[0]
retval = True
elif msg.type == Message.WAIT:
log("no copy slot available. waiting.")
time.sleep(int(msg.content[0]))
self.requestFile(fileinfo, destination)
elif msg.type == Message.EXIT:
debug("exit received")
terminate = True
retval = True
else:
error("unknown message received: %d" % msg.type)
retFile = fileinfo[0]
retval = True
debug("reply: " + str(reply))
debug("retval: " + str(retval))
debug("retFile: " + str(retFile))
if reply != None:
self.conn.sendMessage(reply)
return (retFile, retval, terminate)