Skip to content

Commit

Permalink
optionally delete from S3 what was NOT uploaded (#3117)
Browse files Browse the repository at this point in the history
* optionally delete from S3 what was NOT uploaded

Part of #2224

* Apply suggestions from code review

Co-authored-by: Ryan Johnson <[email protected]>

* more feedbacked

* rename the option properly

* python lint

* Revert "python lint"

This reverts commit 4bfd17c.

Co-authored-by: Ryan Johnson <[email protected]>
  • Loading branch information
peterbe and escattone authored Mar 11, 2021
1 parent 7ad6c7e commit 671cbc0
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ jobs:
# XXX would be nice to validate here that $DEPLOYER_BUCKET_PREFIX is truthy
echo "DEPLOYER_BUCKET_PREFIX=$DEPLOYER_BUCKET_PREFIX"
poetry run deployer upload ../client/build
poetry run deployer upload --prune ../client/build
poetry run deployer update-lambda-functions ./aws-lambda
# TODO
# Execute command to tell the Dev CloudFront distribution to use the
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stage-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ jobs:
# XXX would be nice to validate here that $DEPLOYER_BUCKET_PREFIX is truthy
echo "DEPLOYER_BUCKET_PREFIX=$DEPLOYER_BUCKET_PREFIX"
poetry run deployer upload ../client/build
poetry run deployer upload --prune ../client/build
poetry run deployer update-lambda-functions ./aws-lambda
# TODO: Depending on how long the upload takes, consider switching to
Expand Down
8 changes: 8 additions & 0 deletions deployer/src/deployer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def whatsdeployed(ctx, directory: Path, output: str):
show_default=True,
is_flag=True,
)
@click.option(
"--prune",
help="Delete keys that were not uploaded this time (including those that didn't "
"need to be uploaded)",
default=False,
show_default=True,
is_flag=True,
)
@click.argument("directory", type=click.Path(), callback=validate_directory)
@click.pass_context
def upload(ctx, directory: Path, **kwargs):
Expand Down
130 changes: 130 additions & 0 deletions deployer/src/deployer/upload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import concurrent.futures
import datetime
import hashlib
import mimetypes
import re
Expand All @@ -10,6 +11,7 @@
import boto3
import click
from boto3.s3.transfer import S3TransferConfig
from dateutil.tz import UTC

from .constants import (
DEFAULT_CACHE_CONTROL,
Expand Down Expand Up @@ -49,6 +51,7 @@ class Totals:
uploaded_files: int = 0
uploaded_redirects: int = 0
uploaded_files_size: int = 0
deleted_files: int = 0

def count(self, task):
if task.skipped:
Expand All @@ -57,6 +60,8 @@ def count(self, task):
self.failed += 1
elif task.is_redirect:
self.uploaded_redirects += 1
elif task.is_deletion:
self.deleted_files += 1
else:
self.uploaded_files += 1
self.uploaded_files_size += task.size
Expand Down Expand Up @@ -109,6 +114,7 @@ class UploadTask:
error = None
skipped = False
is_redirect = False
is_deletion = False

def upload(self):
raise NotImplementedError()
Expand Down Expand Up @@ -249,6 +255,31 @@ def upload(self, bucket_manager):
)


class DeleteTask(UploadTask):
"""
Class for doing deletion by key tasks.
"""

is_deletion = True

def __init__(self, key, dry_run=False):
self.key = key
self.dry_run = dry_run

def __repr__(self):
return f"{self.__class__.__name__}({self.key})"

def __str__(self):
return self.key

def delete(self, bucket_manager):
if not self.dry_run:
bucket_manager.client.delete_object(
Key=str(self.key),
Bucket=bucket_manager.bucket_name,
)


class BucketManager:
def __init__(self, bucket_name, bucket_prefix):
self.bucket_name = bucket_name
Expand Down Expand Up @@ -290,6 +321,15 @@ def get_bucket_objects(self):
result = {}
continuation_token = None
while True:
# Note! You can set a `MaxKeys` parameter here.
# The default is 1,000. Any number larger than 1,000 is ignored
# and it will just fall back to 1,000.
# (Peterbe's note) I've experimented with different numbers (
# e.g. 500 or 100) and the total time difference is insignificant.
# A large MaxKeys means larger batches and fewer network requests
# which has a reduced risk of network failures (automatically retried)
# and there doesn't appear to be any benefit in setting it to a lower
# number. So leave it at 1,000 which is what you get when it's not set.
kwargs = dict(Bucket=self.bucket_name)
if self.key_prefix:
kwargs["Prefix"] = self.key_prefix
Expand Down Expand Up @@ -364,6 +404,10 @@ def iter_redirect_tasks(
dry_run=dry_run,
)

def iter_delete_tasks(self, keys, dry_run=False):
for key in keys:
yield DeleteTask(key, dry_run=dry_run)

def count_file_tasks(self, build_directory):
return sum(self.iter_file_tasks(build_directory, for_counting_only=True))

Expand Down Expand Up @@ -400,7 +444,19 @@ def upload(
task.skipped = True
if on_task_complete:
on_task_complete(task)

# Before continuing, pop it from the existing dict because
# we no longer need it after the ETag comparison has been
# done.
existing_bucket_objects.pop(task.key, None)
continue

if existing_bucket_objects:
# Independent of if we benefitted from the knowledge of the
# key already existing or not, remove it from the dict
# so we can figure out what remains later.
existing_bucket_objects.pop(task.key, None)

future = executor.submit(task.upload, self)
futures[future] = task

Expand All @@ -416,6 +472,31 @@ def upload(

return timer

def delete(self, keys, on_task_complete=None, dry_run=False):
"""Delete doesn't care if it's a redirect or a regular file."""
with concurrent.futures.ThreadPoolExecutor(
max_workers=MAX_WORKERS_PARALLEL_UPLOADS
) as executor, StopWatch() as timer:
# Upload the redirects first, then the built files. This
# ensures that a built file overrides its stale redirect.
task_iter = self.iter_delete_tasks(keys, dry_run=dry_run)
futures = {}
for task in task_iter:
future = executor.submit(task.delete, self)
futures[future] = task

for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
task.error = future.exception()
except concurrent.futures.CancelledError as cancelled:
task.error = cancelled

if on_task_complete:
on_task_complete(task)

return timer


def upload_content(build_directory, content_roots, config):
full_timer = StopWatch().start()
Expand All @@ -426,6 +507,7 @@ def upload_content(build_directory, content_roots, config):
force_refresh = config["force_refresh"]
show_progress_bar = not config["no_progressbar"]
upload_redirects = not config["no_redirects"]
prune = config["prune"]

log.info(f"Upload files from: {build_directory}")
if upload_redirects:
Expand Down Expand Up @@ -492,6 +574,54 @@ def on_task_complete(task):
log.info(f"Total uploaded redirects: {totals.uploaded_redirects:,} ")
log.info(f"Total skipped files: {totals.skipped:,} matched existing S3 objects")
log.info(f"Total upload/skip time: {upload_timer}")

if prune:
# Now `existing_bucket_objects` has mutated to only contain the keys
# that were not uploaded or not needed to be uploaded.
# That basically means all the S3 keys that exist before but are
# unrecognized now. For example, things that were once built but are
# now deleted.
now = datetime.datetime.utcnow().replace(tzinfo=UTC)
delete_keys = []
for key in existing_bucket_objects:
if key.startswith(f"{bucket_prefix}/_whatsdeployed/"):
# These are special and wouldn't have been uploaded
continue

if key.startswith(f"{bucket_prefix}/static/"):
# Careful with these!
# Static assets such as `main/static/js/8.0b83949c.chunk.js`
# are aggressively cached and they might still be referenced
# from within HTML pages that are still in the CDN cache.
# Suppose someone gets a copy of yesterday's HTML from the CDN
# and it refers to `/static/js/foo.abc123.js` which is not in their
# browser cache or the CDN's cache, what might happen is that
# their browser requests it even though
# `/static/js/foo.def456.js` is now the latest and greatest.
# To be safe, only delete if it's considered "old".
delta = now - existing_bucket_objects[key]["LastModified"]
if delta.days < 30:
continue

assert key.startswith(bucket_prefix)

delete_keys.append(key)

log.info(f"Total pending task deletions: {len(delete_keys):,}")

with DisplayProgress(len(delete_keys), show_progress_bar) as progress:

def on_task_complete(task):
progress.update(task)
totals.count(task)

mgr.delete(delete_keys, on_task_complete=on_task_complete, dry_run=dry_run)

if dry_run:
log.info("No deletions. Dry run!")
else:
log.info(f"Total deleted keys: {totals.deleted_files:,}")

log.info(f"Done in {full_timer.stop()}.")

if totals.failed:
Expand Down

0 comments on commit 671cbc0

Please sign in to comment.