Skip to content

Commit

Permalink
[s3] Add support for signing Cloudfront URLs (#587)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencehonles authored May 3, 2020
1 parent 960c1ae commit b116e3a
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
77 changes: 74 additions & 3 deletions storages/backends/s3boto3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import posixpath
import threading
import warnings
from datetime import datetime, timedelta
from gzip import GzipFile
from tempfile import SpooledTemporaryFile

Expand Down Expand Up @@ -33,10 +34,55 @@
from boto3 import __version__ as boto3_version
from botocore.client import Config
from botocore.exceptions import ClientError
from botocore.signers import CloudFrontSigner
except ImportError as e:
raise ImproperlyConfigured("Could not load Boto3's S3 bindings. %s" % e)


# NOTE: these are defined as functions so both can be tested
def _use_cryptography_signer():
# https://cryptography.io as an RSA backend
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key
)

def _cloud_front_signer_from_pem(key_id, pem):
key = load_pem_private_key(
pem, password=None, backend=default_backend())

return CloudFrontSigner(
key_id, lambda x: key.sign(x, padding.PKCS1v15(), hashes.SHA1()))

return _cloud_front_signer_from_pem


def _use_rsa_signer():
# https://stuvel.eu/rsa as an RSA backend
import rsa

def _cloud_front_signer_from_pem(key_id, pem):
key = rsa.PrivateKey.load_pkcs1(pem)
return CloudFrontSigner(key_id, lambda x: rsa.sign(x, key, 'SHA-1'))

return _cloud_front_signer_from_pem


for _signer_factory in (_use_cryptography_signer, _use_rsa_signer):
try:
_cloud_front_signer_from_pem = _signer_factory()
break
except ImportError:
pass
else:
def _cloud_front_signer_from_pem(key_id, pem):
raise ImproperlyConfigured(
'An RSA backend is required for signing cloudfront URLs.\n'
'Supported backends are packages: cryptography and rsa.')


boto3_version_info = tuple([int(i) for i in boto3_version.split('.')])


Expand Down Expand Up @@ -307,7 +353,23 @@ def __init__(self, acl=None, bucket=None, **settings):
"set AWS_DEFAULT_ACL."
)

def get_cloudfront_signer(self, key_id, key):
return _cloud_front_signer_from_pem(key_id, key)

def get_default_settings(self):
cloudfront_key_id = setting('AWS_CLOUDFRONT_KEY_ID')
cloudfront_key = setting('AWS_CLOUDFRONT_KEY')
if bool(cloudfront_key_id) ^ bool(cloudfront_key):
raise ImproperlyConfigured(
'Both AWS_CLOUDFRONT_KEY_ID and AWS_CLOUDFRONT_KEY must be '
'provided together.'
)

if cloudfront_key_id:
cloudfront_signer = self.get_cloudfront_signer(cloudfront_key_id, cloudfront_key)
else:
cloudfront_signer = None

return {
"access_key": setting('AWS_S3_ACCESS_KEY_ID', setting('AWS_ACCESS_KEY_ID')),
"secret_key": setting('AWS_S3_SECRET_ACCESS_KEY', setting('AWS_SECRET_ACCESS_KEY')),
Expand All @@ -324,6 +386,7 @@ def get_default_settings(self):
"location": setting('AWS_LOCATION', ''),
"encryption": setting('AWS_S3_ENCRYPTION', False),
"custom_domain": setting('AWS_S3_CUSTOM_DOMAIN'),
"cloudfront_signer": cloudfront_signer,
"addressing_style": setting('AWS_S3_ADDRESSING_STYLE'),
"secure_urls": setting('AWS_S3_SECURE_URLS', True),
"file_name_charset": setting('AWS_S3_FILE_NAME_CHARSET', 'utf-8'),
Expand Down Expand Up @@ -670,12 +733,20 @@ def _strip_signing_parameters(self, url):
def url(self, name, parameters=None, expire=None, http_method=None):
# Preserve the trailing slash after normalizing the path.
name = self._normalize_name(self._clean_name(name))
if self.custom_domain:
return "{}//{}/{}".format(self.url_protocol,
self.custom_domain, filepath_to_uri(name))
if expire is None:
expire = self.querystring_expire

if self.custom_domain:
url = "{}//{}/{}".format(
self.url_protocol, self.custom_domain, filepath_to_uri(name))

if self.cloudfront_signer:
expiration = datetime.utcnow() + timedelta(seconds=expire)

return self.cloudfront_signer.generate_presigned_url(url, date_less_than=expiration)

return url

params = parameters.copy() if parameters else {}
params['Bucket'] = self.bucket.name
params['Key'] = self._encode_name(name)
Expand Down
36 changes: 36 additions & 0 deletions tests/test_s3boto3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
import warnings
from datetime import datetime
from textwrap import dedent
from unittest import skipIf

from botocore.exceptions import ClientError
Expand Down Expand Up @@ -581,6 +582,41 @@ def test_storage_url(self):
HttpMethod=custom_method,
)

def test_storage_url_custom_domain_signed_urls(self):
key_id = 'test-key'
filename = 'file.txt'
pem = dedent(
'''\
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQCXVuwcMk+JmVSKuQ1K4dZx4Z1dEcRQgTlqvhAyljIpttXlZh2/
fD3GkJCiqfwEmo+cdNK/LFzRj/CX8Wz1z1lH2USONpG6sAkotkatCbejiItDu5y6
janGJHfuWXu6B/o9gwZylU1gIsePY3lLNk+r9QhXUO4jXw6zLJftVwKPhQIDAQAB
AoGAbpkRV9HUmoQ5al+uPSkp5HOy4s8XHpYxdbaMc8ubwSxiyJCF8OhE5RXE/Xso
N90UUox1b0xmUKfWddPzgvgTD/Ub7D6Ukf+nVWDX60tWgNxICAUHptGL3tWweaAy
H+0+vZ0TzvTt9r00vW0FzO7F8X9/Rs1ntDRLtF3RCCxdq0kCQQDHFu+t811lCvy/
67rMEKGvNsNNSTrzOrNr3PqUrCnOrzKazjFVjsKv5VzI/U+rXGYKWJsMpuCFiHZ3
DILUC09TAkEAwpm2S6MN6pzn9eY6pmhOxZ+GQGGRUkKZfC1GDxaRSRb8sKTjptYw
WSemJSxiDzdj3Po2hF0lbhkpJgUq6xnCxwJAZgHHfn5CLSJrDD7Q7/vZi/foK3JJ
BRTfl3Wa4pAvv5meuRjKyEakVBGV79lyd5+ZHNX3Y40hXunjoO3FHrZIxwJAdRzu
waxahrRxQOKSr20c4wAzWnGddIUSO9I/VHs/al5EKsbBHrnOlQkwizSfuwqZtfZ7
csNf8FeCFRiNELoLJwJAZxWBE2+8J9VW9AQ0SE7j4FyM/B8FvRhF5PLAAsw/OxHO
SxiFP7Ptdac1tm5H5zOqaqSHWphI19HNNilXKmxuCA==
-----END RSA PRIVATE KEY-----'''
).encode('ascii')

url = 'https://mock.cloudfront.net/file.txt?Expires=3600&Signature=DbqVgh3FHtttQxof214tSAVE8Nqn3Q4Ii7eR3iykbOqAPbV89HC3EB~0CWxarpLNtbfosS5LxiP5EutriM7E8uR4Gm~UVY-PFUjPcwqdnmAiKJF0EVs7koJcMR8MKDStuWfFKVUPJ8H7ORYTOrixyHBV2NOrpI6SN5UX6ctNM50_&Key-Pair-Id=test-key' # noqa

self.storage.custom_domain = "mock.cloudfront.net"

for pem_to_signer in (
s3boto3._use_cryptography_signer(),
s3boto3._use_rsa_signer()):
self.storage.cloudfront_signer = pem_to_signer(key_id, pem)

with mock.patch('storages.backends.s3boto3.datetime') as mock_datetime:
mock_datetime.utcnow.return_value = datetime.utcfromtimestamp(0)
self.assertEqual(self.storage.url(filename), url)

def test_generated_url_is_encoded(self):
self.storage.custom_domain = "mock.cloudfront.net"
filename = "whacky & filename.mp4"
Expand Down
2 changes: 2 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ deps =
django30: Django>=3.0,<3.1
djangomaster: https://github.com/django/django/archive/master.tar.gz
py27: mock
cryptography
pytest
pytest-cov
rsa
extras =
azure
boto3
Expand Down

0 comments on commit b116e3a

Please sign in to comment.