diff --git a/README.md b/README.md new file mode 100644 index 0000000..d17ff5c --- /dev/null +++ b/README.md @@ -0,0 +1,67 @@ +# AWS S3 Glacier Restore tool + +I've created this tool in purpose of restoring huge number of objects stored in Glacier tier in AWS S3 bucket + +## Problem +AWS have tier which is called Glacier, it costs a fraction of S3 Standard tier and is primarily used for storing long term backups but it have it's specifics + +1) Restore takes up to 48 hours for files to be available for download +2) You need to request retrieval for every single object (file) in bucket +3) You don't know which files are already ready for download and you have to check them all again and againt + +## Solution +This tool + +1) It can generate a list of all objects in bucket if you need to restore whole bucket, or, you can supply your own list of object which you need to retrieve +2) It saves a progress where it left off and list of files it already requested for retrieval so it save you time if you need to check multiple times +3) Is multithreaded so it can request multiple files at once! (this was a huge boost, from days to hours!) +4) Allows you to simply check status of requested files and keep list of files which are already ready for download and does not check them again + +## Usage +You always need to specify `--bucket` parameter (name of bucket) +You can specify an `--aws-profile` parameter to use specified profile from `~/.aws/config` + +### Subcommands +# +`generate-object-list` ganerates a file list into `.objects` file +# +`request-objects-restore` uses `.objects` object list and saves names of already requested objects to `.progress` file + +Parameteres: +`--retain-for` number of days you want to keep objects restored [Required] +`--retrieval-tier` Standard, Bulk, Expedited (default: Standard) [Optional] +`--thread-count` (default: num of cpu in your machine) [Optional] +# +`check-objects-status` uses `.objects` object list, compares it to `.available` and check only files which are not already ready for download + +Parameters: +`--thread-count` (default: num of cpu in your machine) [Optional] +# + +### Generating a list +``` +./s3_restore.py --bucket genereate-object-list +``` +If you have your own list of files you want to restore, save it to file and name it `.objects` + +### Requesting a retrieval +``` +./s3_restore.py --bucket request-objects-restore --retain-for 10 +``` + +### Checking objects retrieval status +``` +./s3_restore.py --bucket check-objects-status +``` + +## Benchmarks +I did not create any benchmarks but in my case requesting retrieval of 700 000 files took around 6-8 hours (doing it naive way would took several days!) + +## Performance +For best performance i recommend creating an EC2 Instance (preferably in same region as your bucket, and with many cores to utilize multithreading) and running it from there, latency is so much lower and in benefit of it the request rate is so much higher :) + +The AWS S3 `POST` request limit is around 3500req/s so be aware that there are paralelization limits and set your thread count accordingly (if i remember correctly the request rate for single thread is around 5-10req/s) + +## Remarks +This tool is the fastest one available on the internet (or at least was in times of writing [mid 2019]) +**If you found something better let me know!** \ No newline at end of file diff --git a/s3_restore.py b/s3_restore.py new file mode 100755 index 0000000..5260051 --- /dev/null +++ b/s3_restore.py @@ -0,0 +1,352 @@ +#!/usr/bin/python3 + +import sys +import time +import boto3 +import queue +import logging +import argparse +import datetime +import threading +import multiprocessing +from os import path +from botocore.exceptions import ClientError + +AWS_PROFILE = 'default' +PERCENT_QUEUE = queue.Queue() + + +def setup_logger(logfile): + logger = logging.getLogger(f's3_restore_{logfile}') + logger.setLevel(logging.DEBUG) + fh = logging.FileHandler(logfile) + logger.addHandler(fh) + + return logger + + +def read_file(fname): + '''read file per line to array''' + lines = [] + with open(fname) as f: + for i, l in enumerate(f): + lines.append(l.replace('\n', '')) + + return lines + + +def chunks(lst, n): + '''generator, yield successive n-sized chunks from lst.''' + for i in range(0, len(lst), n): + yield lst[i:i + n] + + +def diff(first, second): + '''diff between two arrays''' + second = set(second) + return [item for item in first if item not in second] + + +def refresh_credentials(thread_id=0): + session = boto3.session.Session(profile_name=AWS_PROFILE) + s3 = session.client('s3') + return s3 + + +def request_retrieval(progress_logger, availability_logger, files, bucket_name, retain_days, tier, chunk_index): + ''' + reqest object retrieval from supplied 'files' array + 'files' array should contain s3 paths eg. 2018/06/10/file.txt + ''' + + s3_client = refresh_credentials(chunk_index) + counter = 0 + for f in files: + try: + response = s3_client.restore_object( + Bucket=bucket_name, + Key=f, + RestoreRequest={ + 'Days': int(retain_days), + 'GlacierJobParameters': { + 'Tier': tier + } + } + ) + + if response['ResponseMetadata']['HTTPStatusCode'] == 200: + print(f'{f} already available to download') + availability_logger.info(f) + elif response['ResponseMetadata']['HTTPStatusCode'] == 202: + progress_logger.info(f) + + except ClientError as e: + code = e.response['Error']['Code'] + if code == 'NoSuchKey': + print(f'{f} not found, skipping') + elif code == 'RestoreAlreadyInProgress': + print(f'{f} restore already in progress, ignoring') + progress_logger.info(f) + elif code == 'ExpiredToken': + s3_client = refresh_credentials(chunk_index) + else: + print(f'{f}: {e}') + + counter += 1 + actual_percent = counter / len(files) + PERCENT_QUEUE.put([chunk_index, actual_percent]) + + +def check_files_availability(availability_logger, files, bucket_name, chunk_index): + '''does a HEAD request on files array to check if s3 object restore is already complete or is still in progress''' + s3_client = refresh_credentials(chunk_index) + + counter = 0 + for f in files: + try: + response = s3_client.head_object(Bucket=bucket_name, Key=f) + if 'x-amz-restore' in response['ResponseMetadata']['HTTPHeaders']: + x_amz_restore = response['ResponseMetadata']['HTTPHeaders']['x-amz-restore'] + if 'ongoing-request="false"' in x_amz_restore: # false = restore complete, true = restore still in progress + availability_logger.info(f) + + except ClientError as e: + code = e.response['Error']['Code'] + if code == 'NoSuchKey': + print(f'{f} not found, skipping') + elif code == 'ExpiredToken': + s3_client = refresh_credentials(chunk_index) + else: + print(f'Exception occured: {e}') + + counter += 1 + actual_percent = counter / len(files) + PERCENT_QUEUE.put([chunk_index, actual_percent]) + + +def print_percent_queue(percent_dict): + while PERCENT_QUEUE.empty() is False: + data = PERCENT_QUEUE.get(timeout=0.1) + percent_dict[data[0]] = data[1] + + out_str = '' + total_percent = 0 + for chunk_id, percent in percent_dict.items(): + percent *= 100 + total_percent += percent + out_str += f' T{chunk_id}: {percent}% ' + + if(len(percent_dict) > 0): + total_percent /= len(percent_dict) + out_str = f'Total: {total_percent:.2f}% [{out_str}]' + print(out_str) + + +def main_generate_list(bucket, output_filename): + '''generates a file list from whole bucket (only files in glacier or deep_archive tier)''' + + if path.exists(output_filename): + input_overwrite_continue = input(f'File {output_filename} already exists and will be overwritten\nContinue? y/[n]: ') + if input_overwrite_continue != 'y': + return + + s3_client = refresh_credentials() + + glacier_objects = [] + print('Listing objects to file') + try: + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket) + last_count = 0 + for page in pages: + for obj in page['Contents']: + if obj['StorageClass'] == 'GLACIER' or obj['StorageClass'] == 'DEEP_ARCHIVE': + glacier_objects.append(obj['Key']) + if len(glacier_objects) >= last_count+1000: + last_count = len(glacier_objects) + print(f'Found: {last_count}') + + except ClientError as e: + print(e) + + print(f'Total count: {len(glacier_objects)} glacier/deep_archive objects saved to {output_filename}') + + with open(output_filename, 'w') as output_list: + for obj in glacier_objects: + output_list.write(f'{obj}\n') + + +def main_request_objects_restore(bucket, object_list_path, retain_for, retrieval_tier, thread_count): + + progress_logfile = f'{bucket}.progress' + availability_logfile = f'{bucket}.available' + progress_logger = setup_logger(progress_logfile) + availability_logger = setup_logger(availability_logfile) + + progress_log = [] + if path.exists(progress_logfile): + progress_log = read_file(progress_logfile) + availability_log = [] + if path.exists(availability_logfile): + availability_log = read_file(availability_logfile) + + print('') + lines = read_file(object_list_path) + if len(progress_log) > 0: + prev_len = len(lines) + lines = diff(lines, progress_log) + print(f'Progress log found. Skipping {prev_len - len(lines)} entries') + + if len(availability_log) > 0: + prev_len = len(lines) + lines = diff(lines, availability_log) + print(f'Availability log found. Skipping {prev_len - len(lines)} entries (restore is complete on these files)') + + if len(lines) == 0: + print('All objects already requested, nothing to do') + sys.exit(1) + + print(f'Will have to process {len(lines)} files') + + if len(lines) < int(thread_count): + thread_count = len(lines) + + split_by = max(int(len(lines) / int(thread_count)), 1) + + est_hours = len(lines)/int(thread_count)/5/60/60 # 5 -> single thread can request approx 5 objects/s + est_hours_format = str(datetime.timedelta(hours=est_hours)).split('.')[0] + print(f'{thread_count} threads, {split_by} files per thread') + if input(f'This will take approximately { est_hours_format }\nContinue? (y/[n]): ') != 'y': + sys.exit(1) + + threads = [] + timer_start = time.time() + chunk_index = 0 + for chunk in chunks(lines, split_by): + t = threading.Thread(target=request_retrieval, args=(progress_logger, availability_logger, chunk, bucket, retain_for, retrieval_tier, chunk_index), daemon=True) + t.start() + threads.append(t) + chunk_index += 1 + + percent_dict = {} + while any(thread.is_alive() for thread in threads): + print_percent_queue(percent_dict) + time.sleep(1) + + print_percent_queue(percent_dict) + + exec_time = str((time.time()-timer_start)).split('.')[0] + print(f'Execution took {exec_time}s') + + +def main_check_restore_status(bucket, object_list_path, thread_count): + + availability_logfile = f'bucket_{bucket}.available' + availability_logger = setup_logger(availability_logfile) + + availability_log = [] + file_list = [] + + if not path.exists(object_list_path): + print(f'{object_list_path} not found. Cancelling') + print('If you dont have any file with path list, run `Generate file list` option first') + return + + print('') + file_list = read_file(object_list_path) + + if path.exists(availability_logfile): + availability_log = read_file(availability_logfile) + + if len(availability_log) > 0: + prev_len = len(file_list) + file_list = diff(file_list, availability_log) + print(f'Availability log found. Skipping {prev_len - len(file_list)} entries (these files are ready for download)') + + print(f'Will have to process {len(file_list)} files') + + split_by = max(int(len(file_list) / int(thread_count)), 1) + + est_hours = len(file_list)/int(thread_count)/14/60/60 # 5 -> single thread can request approx 14 objects/s + est_hours_format = str(datetime.timedelta(hours=est_hours)).split('.')[0] + print(f'{thread_count} threads, {split_by} files per thread') + if input(f'This will take approximately { est_hours_format }\nContinue? (y/[n]): ') != 'y': + sys.exit(1) + + threads = [] + timer_start = time.time() + chunk_index = 0 + for chunk in chunks(file_list, split_by): + t = threading.Thread(target=check_files_availability, args=(availability_logger, chunk, bucket, chunk_index), daemon=True) + t.start() + threads.append(t) + chunk_index += 1 + + percent_dict = {} + while any(thread.is_alive() for thread in threads): + print_percent_queue(percent_dict) + time.sleep(0.1) + + print_percent_queue(percent_dict) + + print(f'Execution took {time.time()-timer_start}') + print('') + new_availability_list = read_file(availability_logfile) + new_file_list = read_file(object_list_path) + print(f'{len(new_availability_list)} files are restored and ready for download') + print(f'{len(new_file_list)-len(new_availability_list)} files is still being restored') + + +def main(): + global AWS_PROFILE + + parser = argparse.ArgumentParser() + parser.add_argument('--bucket', required=True) + parser.add_argument('--aws-profile', default='default') + + subparsers = parser.add_subparsers(dest='subcommand', required=True) + + generate_parser = subparsers.add_parser('generate-object-list') + generate_parser.add_argument('--output-object-list-path') + + request_parser = subparsers.add_parser('request-objects-restore') + request_parser.add_argument('--retain-for', required=True, help='How long to keep objects restored') + request_parser.add_argument('--object-list-path') + request_parser.add_argument('--retrieval-tier', default='Standard', choices=['Standard', 'Bulk', 'Expedited']) + request_parser.add_argument('--thread-count', default=int(multiprocessing.cpu_count())) + + check_parser = subparsers.add_parser('check-objects-status') + check_parser.add_argument('--object-list-path') + check_parser.add_argument('--thread-count', default=int(multiprocessing.cpu_count())) + + args = parser.parse_args() + + AWS_PROFILE = args.aws_profile + + if args.subcommand == 'generate-object-list': + print('Command: Generate list of objects to restore from specified S3 bucket') + + if args.output_object_list_path is None: + args.output_object_list_path = f'{args.bucket}.objects' + + main_generate_list(args.bucket, args.output_object_list_path) + + elif args.subcommand == 'request-objects-restore': + print('Command: Request restoration of objects') + + if args.object_list_path is None: + args.object_list_path = f'{args.bucket}.objects' + + main_request_objects_restore(args.bucket, args.object_list_path, args.retain_for, args.retrieval_tier, args.thread_count) + + elif args.subcommand == 'check-objects-status': + print('Command: Check objects status to verify completeness') + + if args.object_list_path is None: + args.object_list_path = f'{args.bucket}.objects' + + main_check_restore_status(args.bucket, args.object_list_path, args.thread_count) + + +if __name__ == '__main__': + main()