-
Notifications
You must be signed in to change notification settings - Fork 0
/
archive_copy.py
155 lines (123 loc) Β· 5.53 KB
/
archive_copy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import logging
import sys
import time
from argparse import ArgumentParser
from functools import partial
from multiprocessing.pool import ThreadPool
import boto3
import botocore
PROGRAM_DESCRIPTION = 'A tool that helps organizing S3 archives'
PROGRAM_EPILOGUE = 'Have a nice day!'
S3_SERVICE_NAME = 's3'
LOG_FORMAT = '%(levelname)s: %(asctime)s: %(message)s'
MAX_POOL_CONNECTIONS = 100
NUMBER_OF_COPY_THREADS = 10
SOURCE_ARCHIVE_PROFILE_NAME = 'source_archive_profile'
RESTORE_DAYS = 7
POLLING_INTERVAL_SECONDS = 300
TIER = 'Standard'
LOG_LEVEL = logging.INFO
FAST_ACCESS_FLAG_HELP_MESSAGE = ("don't use StorageClass=GLACIER, instead use StorageClass=STANDARD for faster access. "
"Read more about Amazon S3 Storage Classes: "
"https://aws.amazon.com/s3/storage-classes/")
def bucket_exists(s3_bucket, bucket_name):
try:
response = s3_bucket.head_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
logging.debug(e)
return False
return True
def get_s3objects(source_client, bucket):
paginator = source_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket)
s3objects = []
for page in pages:
s3objects.extend(page['Contents'])
return s3objects
def count_remaining_and_request_restores(s3_rsrc, bucket, s3objects):
restored_count = 0
pending_count = 0
just_requested_count = 0
not_glacier_count = 0
logging.info('Checking storage class for the requested objects')
logging.info('Objects in Glacier or Deep Archive need to be restored before they can be copied')
logging.info('Legend: π¦ ready to copy π€ requesting restore πΌ restoring πͺ restored')
for s3object in s3objects:
obj = s3_rsrc.Object(bucket, s3object['Key'])
if obj.storage_class == 'GLACIER' or obj.storage_class == 'DEEP_ARCHIVE':
if obj.restore is None:
obj.restore_object(RestoreRequest={'Days': RESTORE_DAYS, 'GlacierJobParameters': {'Tier': TIER}})
just_requested_count = just_requested_count + 1
print('π€', end='')
sys.stdout.flush()
elif 'ongoing-request="true"' in obj.restore:
print('πΌ', end='')
sys.stdout.flush()
pending_count = pending_count + 1
elif 'ongoing-request="false"' in obj.restore:
print('πͺ', end='')
sys.stdout.flush()
restored_count = restored_count + 1
else:
print('π¦', end='')
sys.stdout.flush()
not_glacier_count = not_glacier_count + 1
print('')
logging.info('π¦ Ready to copy: ' + str(not_glacier_count))
logging.info('π€ Requesting restore: ' + str(just_requested_count))
logging.info('πΌ Restoring: ' + str(pending_count))
logging.info('πͺ Restored: ' + str(restored_count))
return pending_count + just_requested_count
def copy_s3object(source_client, source_bucket, dest_bucket, copy_to_glacier, s3object):
sys.stdout.flush()
extra_args = None
if copy_to_glacier:
extra_args = {'StorageClass': 'GLACIER'}
source_client.copy(
CopySource={'Bucket': source_bucket, 'Key': s3object['Key']},
Bucket=dest_bucket,
Key=s3object['Key'],
ExtraArgs=extra_args
)
print('πΎ', end='')
def copy_s3objects(source_client, source_bucket, dest_bucket, s3objects, copy_to_glacier):
pool = ThreadPool(processes=NUMBER_OF_COPY_THREADS)
pool.map(partial(copy_s3object, source_client, source_bucket, dest_bucket, copy_to_glacier), s3objects)
print('')
def main():
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)
parser = ArgumentParser(
description=PROGRAM_DESCRIPTION,
epilog=PROGRAM_EPILOGUE
)
parser.add_argument('source_bucket')
parser.add_argument('destination_bucket')
parser.add_argument('-f', '--fast-access', action='store_true', help=FAST_ACCESS_FLAG_HELP_MESSAGE)
parser.print_usage()
args = parser.parse_args()
source_bucket = args.source_bucket
destination_bucket = args.destination_bucket
copy_to_glacier = not args.fast_access
botocore_config = botocore.config.Config(max_pool_connections=MAX_POOL_CONNECTIONS)
destination_client = boto3.client(S3_SERVICE_NAME, config=botocore_config)
source_session = boto3.Session(profile_name=SOURCE_ARCHIVE_PROFILE_NAME)
source_client = source_session.client(S3_SERVICE_NAME, config=botocore_config)
s3_rsrc = source_session.resource(S3_SERVICE_NAME)
if not bucket_exists(source_client, source_bucket):
logging.error(f'''Bucket {source_bucket} doesn't exist''')
return 1
if not bucket_exists(destination_client, destination_bucket):
logging.error(f'''Bucket {destination_bucket} doesn't exist''')
return 1
logging.info('Populating list of objects...')
s3objects = get_s3objects(source_client, source_bucket)
while count_remaining_and_request_restores(s3_rsrc, source_bucket, s3objects) >= 1:
logging.info(f'π΄ Sleeping {POLLING_INTERVAL_SECONDS} seconds')
time.sleep(POLLING_INTERVAL_SECONDS)
logging.info('Copying objects from ' + source_bucket + ' to ' + destination_bucket)
copy_s3objects(source_client, source_bucket, destination_bucket, s3objects, copy_to_glacier)
logging.info('Objects copied.')
logging.info('Restoration complete.')
return 0
if __name__ == '__main__':
sys.exit(main())