-
Notifications
You must be signed in to change notification settings - Fork 0
/
Python_image_loader.py
235 lines (208 loc) · 9.8 KB
/
Python_image_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
from datetime import datetime, timedelta
from shutil import copyfile
from hashlib import sha256
from os import path, makedirs, walk, getenv, rename
from imghdr import what
from time import sleep
from sys import argv
from PIL import Image
from requests import post, HTTPError
from pandas import DataFrame, concat, read_csv
try:
from APIKey import AzureAPIKey
Azure = True
except ImportError:
AzureAPIKey = ''
Azure = False
class Python_image_loader:
def __init__(self, sourceDir, destDir):
""" Initialise all of the variables and set the API Key
"""
# 1. Initilise, grab variables
self.subscription_key = AzureAPIKey
self.vision_base_url = "https://eastus.api.cognitive.microsoft.com/vision/v1.0/"
self.vision_analyze_url = self.vision_base_url + "analyze"
self.sourceDir = sourceDir
self.destinationDir = destDir
self._copyLogFile = self.destinationDir + 'copyLog.dat'
# 2. Check for source and Destination folders (if no Dest, create dest. If not source, fail)
if not path.exists(self.sourceDir):
exit()
if not path.exists(self.destinationDir):
print('[+] Destination directory doesn''t exist, creating...')
makedirs(self.destinationDir)
# 3. Check for source log (If no source log, generate)
if not path.exists(self._copyLogFile):
self.__reset_copy_log()
else:
# 4. Load source log data into memory
self.__load_copy_log(self._copyLogFile)
def __reset_copy_log(self):
""" Create a new log of all the files in the folder
"""
print('[+] Rebuilding copy log... this may take a few mins')
existingFiles = self.get_hashes(self.destinationDir, True)
self._copyLog = existingFiles if not existingFiles.empty else DataFrame(columns=["SHA256", "Path", "Azure_Processed"])
self._copyLog.to_csv(self._copyLogFile, quoting=1)
print('[+] Copy Log Rebuilt')
def __load_copy_log(self, logfile):
""" Load an existing log of all the files in the folder
"""
with open(logfile, 'r') as f:
self._copyLog = read_csv(f, quoting=1, index_col=0)
print('[!!] TODO: Perform Copy Log integrity Checks')
# TODO: check that all of the files still exist, and if not find them
# Hash all files in source
def get_hashes(self, source, no_jpg_size_filter=True):
""" Returns SHA256 hashes of files in the folder.
Setting True to no_jpg_size_filter allows the function
to hash all files, otherwise it will limit to jpg files of 1920X1080 resolution
"""
BUF_SIZE = 65345
hash_dict = DataFrame()
for root, dirs, files in walk(source):
for name in files:
if what(path.join(root, name)) == 'jpeg' or no_jpg_size_filter:
im = Image.open(path.join(root, name))
if im.size == (1920, 1080) or no_jpg_size_filter:
with open(path.join(root, name), 'rb') as f:
sha1 = sha256()
while True:
data = f.read(BUF_SIZE)
if not data:
break
sha1.update(data)
line = DataFrame([{"SHA256": sha1.hexdigest(), "Path": path.join(
root, name), "Azure_Processed": 0}])
hash_dict = concat(
[hash_dict, line], axis=0, ignore_index=True)
# print(hash_dict.to_string())
return hash_dict
def azureVisionUpdate(self, image_path):
""" This is a wrapper for the Azure Compute Vision API call
"""
try:
image_data = open(image_path, "rb").read()
except:
print("[!!] File not found: " + image_path)
return path.basename(image_path)
headers = {'Ocp-Apim-Subscription-Key': self.subscription_key,
"Content-Type": "application/octet-stream"}
params = {'visualFeatures': 'Categories,Description,Color'}
try:
response = post(self.vision_analyze_url,
headers=headers,
params=params,
data=image_data)
response.raise_for_status()
analysis = response.json()
except HTTPError as e:
print('[ERROR] HTTP Error: {}'.format(e.response.text))
print('[+] Saving Copy Log and exiting... ')
self._copyLog.to_csv(self._copyLogFile, quoting=1)
exit()
# print(analysis)
try:
image_caption = analysis["description"]["captions"][0]["text"].capitalize()
main_category = analysis["categories"]
except:
print("[!] error in the image caption")
print(analysis)
image_caption = "ERROR_UNKNOWN"
main_category = "UNKNOWN"
if len(main_category) > 0:
prefix = max(analysis["categories"],
key=lambda x: x['score'])['name']
else:
prefix = 'NoCat'
return prefix.upper() + '_' + image_caption.replace(' ', '_') + '.jpg'
def __unprocessedFileName(self, filename):
""" check to see if the filename resembles filenames which have not
yet been tagged by the Azure Vision API. These are filenames with
68 pseudorandom characters but no underscores
"""
if len(filename) == 68 and '_' not in filename:
return True
else:
return False
def findPics(self, Azure):
""" Finds new files by hashing all of the eligible image
files in the folder and checking them against a list
of existing hashes. If some of the eligible files are
new, then these are copied to the destination folder.
[[Candidate to move to new function]]
Once new files have been found, the process scans the
destination folder and attempts to tag any files which
have not yet been labeled by the Azure Vision API.
"""
# get the source Hashes
print(
'[+] Examining new files and checking for viable backgrounds (jpg, 1920x1080 only)')
newHashes = self.get_hashes(self.sourceDir, False)
# test for new ones
newHashes = newHashes.merge(self._copyLog, on=['SHA256'],
how='left', indicator=True)
newHashes = newHashes.loc[newHashes['_merge'] == 'left_only']
if newHashes.shape[0] > 0:
print('[+] Copying {} files over'.format(newHashes.shape[0]))
# move new ones to destination
for index, row in newHashes.iterrows():
copyfile(row['Path_x'], path.join(
self.destinationDir, path.basename(row['Path_x'] + '.jpg')))
line = DataFrame([{"SHA256": row['SHA256'], "Path":path.join(
self.destinationDir, path.basename(row['Path_x'] + '.jpg')), "Azure_Processed":0}])
self._copyLog = concat(
[self._copyLog, line], axis=0, ignore_index=True)
# update the copyLog file given that we've updated the files
self._copyLog.to_csv(self._copyLogFile, quoting=1)
else:
print('[+] No new files found')
print('[+] Checking files for taggable files')
# check jpeg files for wierd 65 char file names and update
if Azure:
for index, row in self._copyLog.loc[self._copyLog['Azure_Processed'] == 0].iterrows():
if self.__unprocessedFileName(path.basename(row['Path'])):
print(
'[+] Starting Azure image tagging for {} ...'.format(path.basename(row['Path'])))
endTime = datetime.now() + timedelta(seconds=3)
newFileName = self.azureVisionUpdate(row['Path'])
newFileName = newFileName.replace('__', '_')
while datetime.now() < endTime:
sleep(0.5)
newPath = path.join(path.dirname(row['Path']), newFileName)
if path.isfile(newPath):
newPath = newPath[:-4] + ' (1)' + newPath[-4:]
try:
rename(row['Path'], newPath)
self._copyLog.at[index, 'Path'] = newPath
self._copyLog.at[index, 'Azure_Processed'] = 1
print(
'[+] Azure image tagging finished, new name: {}'.format(newFileName))
except OSError:
print('[ERROR] Image {} not updated'.format(
path.basename(row['Path'])))
self._copyLog.to_csv(self._copyLogFile, quoting=1)
print('[+] Copy Log Saved')
def main():
if len(argv) < 2:
# This may change between machines.. will need to be tested
sourceDir = getenv(
'LOCALAPPDATA') + "\\Packages\\Microsoft.Windows.ContentDeliveryManager_cw5n1h2txyewy\\LocalState\\Assets\\"
destinationDir = getenv('HOMEPATH') + "\\Pictures\\Windows Spotlight\\"
else:
if len(argv) == 2:
sourceDir = argv[0]
destinationDir = argv[0]
if not path.isdir(sourceDir):
print('[ERROR] Source directory doesn''t exist')
exit()
else:
print(
'[ERROR] wrong arguments given, format is: PythonImageLoader.py Source Destination')
exit()
pil = Python_image_loader(sourceDir, destinationDir)
pil.findPics(Azure)
print('[=] === Execution Finished ===')
exit(0)
if __name__ == "__main__":
main()