-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata.py
222 lines (194 loc) · 8.41 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import argparse
import os
from pathlib import Path
from typing import List, Any
import logging
logger = logging.getLogger(__name__)
logger.setLevel(os.getenv('MERGELOGGING', 'INFO'))
import pandas as pd
from config import DataConfig
from data_store import DataStore, FileRecord, ErrorRecord, DataQuery
config = DataConfig()
def set_config(new_config: DataConfig):
global config
config = new_config
def print_or_quiet(*args, **kwargs):
if config.DRY_RUN:
print(*args, **kwargs)
def list_sessions(query: DataQuery):
query.select_clause = 'session_id, count(*)'
query.from_clause = 'files'
query.group_clause = 'session_id'
return query
def list_duplicated(query: DataQuery, session_ids):
query_inner = DataQuery()
query_inner.select_clause = 'file_hash as file_hash, file_size as file_size, COUNT(*) as cnt'
query_inner.from_clause = 'files'
if len(session_ids) > 0:
query_inner.where_clause = [f'{query_inner.format_query_in_clause('session_id', session_ids)}']
else:
query_inner.where_clause = []
query_inner.where_clause.append(f'file_hash != "{config.UNDER_THRESHOLD_TEXT}"')
query_inner.group_clause = 'file_size, file_hash'
query_inner.having_clause = 'cnt > 1'
query.select_clause = 'f.file_hash, f.file_size, f.file_name'
query.from_clause = f'''files AS f INNER JOIN ({query_inner.format_query()}) AS q ON f.file_hash == q.file_hash AND f.file_size == q.file_size'''
query.order_clause = 'f.file_hash, f.file_size'
return query
def list_duplicatedpaths(query: DataQuery, session_ids):
query.select_clause = 'file_name, COUNT(*) as cnt'
query.from_clause = 'files'
if len(session_ids) > 0:
query.where_clause = f'{query.format_query_in_clause('session_id', session_ids)}'
else:
query.where_clause = None
query.group_clause = 'file_size, file_hash'
query.having_clause = 'cnt > 1'
return query
def list_files(query: DataQuery, session_ids):
query.select_clause = '*'
query.from_clause = 'files'
if len(session_ids) > 0:
query.where_clause = [f'{query.format_query_in_clause('session_id', session_ids)}']
else:
query.where_clause = []
query.where_clause.append(f'file_hash != "{config.UNDER_THRESHOLD_TEXT}"')
return query
def count_sessions(query: DataQuery):
query.select_clause = 'COUNT(DISTINCT session_id)'
query.from_clause = 'files'
return query
def count_files(query: DataQuery, session_ids):
query.select_clause = '*'
query.from_clause = 'files'
query.where_clause = [f'{query.format_query_in_clause('session_id', session_ids)}']
return query
# it filters all records for which there is only 1 combination of the same file_size, file_hash
def delete_non_duplicated(df: pd.DataFrame):
assert type(df) == pd.DataFrame
non_duplicated_records = df.groupby(['file_size', 'file_hash'])['file_name'].filter(lambda x: len(x) == 1)
# print(result_df.count(df))
result_df = pd.merge(left=df, right=non_duplicated_records, how='outer', indicator=True) \
.query('_merge=="left_only"')
# \
# .drop('_merge', axis=1)
# print(result_df.count())
print(result_df.count(), df.count())
return result_df
# it filters all records which the file_name contains some of the texts in the exclusion list
def filter_df(df_orig, include_list, exclude_list):
assert type(df_orig) == pd.DataFrame
df = df_orig
for i in exclude_list:
print_or_quiet(f"Excluding: {i}")
df = df[~df['file_name'].str.contains(i.strip())]
return df
# depending on the DRY_RUN setting this will create a list of files that are duplicated
# For each combination of file_hash, file_size, it will mark the first file_name (lexicographically ordered) as duplicated = False
# and the rest as duplicated = True and then it will print the list. If DRY_RUN is set, it will print the
# file_size and hash as title and then all the files that are part of it below this title.
# If DRY_RUN is false, it will output a series of unix commands, with rm {file_name} when duplicated == True,
# and echo {file_name} when False
def show_duplicated(results: List[Any], ds:DataStore, include_list: List= None, exclude_list: List= None):
def print_duplicates(df):
last_file_size = None
last_file_hash = None
for i, r in df.iterrows():
logger.debug(i,r)
if r['file_size'] != last_file_size or r['file_hash'] != last_file_hash:
if config.DRY_RUN:
print(f"{r['file_size']} {r['file_hash']}")
last_file_size = r['file_size']
last_file_hash = r['file_hash']
if config.DRY_RUN:
print(f"\t {r['file_name']} {r['duplicated']}")
else:
if r['duplicated']:
print (f'rm \"{r['file_name']}\"')
else:
print (f'echo \"{r['file_name']}\"')
df = pd.DataFrame.from_records(results, columns=ds.headers())
df = filter_df(df, include_list, exclude_list)
df = df.sort_values(['file_hash', 'file_size', 'file_name'])
non_duplicated_df = delete_non_duplicated(df)
non_duplicated_df['duplicated'] = non_duplicated_df.duplicated(subset=['file_hash', 'file_size'], keep='first')
total_size = non_duplicated_df.groupby('duplicated').sum('file_size')
print(total_size)
# print_duplicates(df)
def run(args):
task = args.task
target = args.target
session_ids = args.sessions
include_list= []
exclude_list= []
# if args.include:
# try:
# with open(args.include) as f:
# lines = f.readlines()
# for i in lines:
# include_list.append(i)
# except Exception as e:
# print(e)
# return
if args.exclude:
try:
with open(args.exclude) as f:
lines = f.readlines()
for i in lines:
exclude_list.append(i)
except Exception as e:
print(e)
return
print_or_quiet(config.show_config())
ds = DataStore(config.DATASTORE)
query = DataQuery()
if task == 'list':
if target == 'sessions':
query = list_sessions(query)
if target == 'duplicatedpaths':
query = list_duplicatedpaths(query, session_ids)
if target == 'duplicated':
query = list_duplicated(query, session_ids)
if target == 'files':
query = list_files(query, session_ids)
if task == 'count':
if target == 'sessions':
query = count_sessions(query)
if target == 'files':
query = count_files(query, session_ids)
if query:
try:
results = ds.exec_query(query)
if task == 'list':
if target == 'duplicated':
show_duplicated(results, ds, include_list, exclude_list)
# df = pd.DataFrame.from_records(results, columns=ds.headers())
# df = filter_df(df, include_list, exclude_list)
# df = df.sort_values(['file_hash', 'file_size', 'file_name'])
# delete_non_duplicated(df)
return
for i in results:
print_or_quiet(i)
except Exception as e:
print(query)
raise e
else:
print("Nothing to show")
return
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="data",
description="Lets you handle the file data store",
epilog="Use carefully",
)
parser.add_argument("task")
parser.add_argument("target")
parser.add_argument("-s", "--session", action= 'append', dest='sessions', default=[])
# parser.add_argument("-i", "--include", action= 'store', dest='include', default=None) # TODO: Solve how to manage include tasks
parser.add_argument("-x", "--exclude", action= 'store', dest='exclude', default=None)
parser.add_argument("-p", "--prefer", action= 'store', dest='prefer', default=None)
parser.add_argument("--no-dry-run", action= 'store_false', dest='dry_run', default=True, help="In dry-run mode (default) the program will show the list of hashes, sizes and then the files. In no-dry-run mode, the system will generate the rm commands")
args = parser.parse_args()
# print(args.task, args.target, args)
config.DRY_RUN = args.dry_run
run(args)