This repository has been archived by the owner on Aug 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtagdel.py
executable file
·64 lines (51 loc) · 2.57 KB
/
tagdel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/python
from json import loads
from time import strptime
from sys import exit
import requests
from sys import argv
import argparse as ap
#Set up arguments to program
parser = ap.ArgumentParser(description="Delete tags from a docker trusted registry using REST API.")
parser.add_argument('-n', '--noop', action='store_true', help='Show a count of tags that are expected to be deleted and kept, then exit')
parser.add_argument('-k', '--keep', type=int, help='Number of newest tags to keep', default=25)
parser.add_argument('-b', '--baseurl', help='Base url to the registry, including protocol. Default: "https://dtr.nrk.no"',
default='https://dtr.nrk.no')
parser.add_argument('-r', '--repo', help='The repository you want to perform operations on - Ex: origo/folkemusikk', required=True)
parser.add_argument('-u', '--user', help='Username to authenticate as', required=True)
parser.add_argument('-t', '--token', help='Authentication token. Must be created in the registry.', required=True)
parser.add_argument('-i', '--ignore', help='Tag to ignore/keep. Can be specified multiple times.', action='append')
args = parser.parse_args()
baseurl = "%s/api/v0" % args.baseurl
timeformat = "%Y-%m-%dT%H:%M:%S"
class TagData:
def __init__(self, jsondata):
self.name = jsondata['name']
self.digest = jsondata['digest']
self.createdstr = jsondata['createdAt']
self.created = strptime(jsondata['createdAt'].split('.')[0], timeformat)
def delete(self):
if self.name not in args.ignore and not args.noop:
print "Deleting tag %s from %s" % (self.name, args.repo)
d = requests.delete("%s/repositories/%s/tags/%s" % (baseurl, args.repo, self.name), auth=(args.user, args.token))
return d.status_code == 200
else:
print "Skipping tag %s from %s" % (self.name, args.repo)
return False
def __cmp__(self, other):
return cmp(self.created, other.created)
class Controller:
def __init__(self, keep):
tagreq = requests.get("%s/repositories/%s/tags?pageSize=100000" % (baseurl, args.repo), auth=(args.user, args.token))
tagdata = tagreq.json()
self.tags = sorted(map(lambda t: TagData(t), tagdata))
self.tokeep = self.tags[-keep:]
self.todelete = self.tags[:-min(len(self.tags), keep)]
def info(self):
print "Will delete %s oldest tags, and keep latest %s tags" % (len(self.todelete), len(self.tokeep))
def delete(self):
for t in self.todelete: t.delete()
if __name__ == '__main__':
c = Controller(args.keep)
c.info()
c.delete()