diff --git a/src/whitenoise/storage.py b/src/whitenoise/storage.py index ae3d7de7..029b0ff8 100644 --- a/src/whitenoise/storage.py +++ b/src/whitenoise/storage.py @@ -4,6 +4,10 @@ import os import re import textwrap +from typing import Any +from typing import Iterator +from typing import Tuple +from typing import Union from django.conf import settings from django.contrib.staticfiles.storage import ManifestStaticFilesStorage @@ -11,52 +15,33 @@ from .compress import Compressor +_PostProcessT = Iterator[Union[Tuple[str, str, bool], Tuple[str, None, RuntimeError]]] -class CompressedStaticFilesMixin: + +class CompressedStaticFilesStorage(StaticFilesStorage): """ - Wraps a StaticFilesStorage instance to compress output files + StaticFilesStorage subclass that compresses output files. """ - def post_process(self, *args, **kwargs): - super_post_process = getattr( - super(), - "post_process", - self.fallback_post_process, - ) - files = super_post_process(*args, **kwargs) - if not kwargs.get("dry_run"): - files = self.post_process_with_compression(files) - return files - - # Only used if the class we're wrapping doesn't implement its own - # `post_process` method - def fallback_post_process(self, paths, dry_run=False, **options): - if not dry_run: - for path in paths: - yield path, None, False + def post_process( + self, paths: dict[str, Any], dry_run: bool = False, **options: Any + ) -> _PostProcessT: + if dry_run: + return - def create_compressor(self, **kwargs): - return Compressor(**kwargs) - - def post_process_with_compression(self, files): extensions = getattr(settings, "WHITENOISE_SKIP_COMPRESS_EXTENSIONS", None) compressor = self.create_compressor(extensions=extensions, quiet=True) - for name, hashed_name, processed in files: - yield name, hashed_name, processed - if isinstance(processed, Exception): - continue - unique_names = set(filter(None, [name, hashed_name])) - for name in unique_names: - if compressor.should_compress(name): - path = self.path(name) - prefix_len = len(path) - len(name) - for compressed_path in compressor.compress(path): - compressed_name = compressed_path[prefix_len:] - yield name, compressed_name, True - - -class CompressedStaticFilesStorage(CompressedStaticFilesMixin, StaticFilesStorage): - pass + + for path in paths: + if compressor.should_compress(path): + full_path = self.path(path) + prefix_len = len(full_path) - len(path) + for compressed_path in compressor.compress(full_path): + compressed_name = compressed_path[prefix_len:] + yield path, compressed_name, True + + def create_compressor(self, **kwargs: Any) -> Compressor: + return Compressor(**kwargs) class MissingFileError(ValueError): diff --git a/tests/test_storage.py b/tests/test_storage.py index baf043e3..fca9a60f 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -48,7 +48,7 @@ def _compressed_storage(setup): storages = {"STATICFILES_STORAGE": backend} with override_settings(**storages): - call_command("collectstatic", verbosity=0, interactive=False) + yield @pytest.fixture() @@ -68,12 +68,22 @@ def _compressed_manifest_storage(setup): call_command("collectstatic", verbosity=0, interactive=False) -def test_compressed_files_are_created(_compressed_storage): +def test_compressed_static_files_storage(_compressed_storage): + call_command("collectstatic", verbosity=0, interactive=False) + for name in ["styles.css.gz", "styles.css.br"]: path = os.path.join(settings.STATIC_ROOT, name) assert os.path.exists(path) +def test_compressed_static_files_storage_dry_run(_compressed_storage): + call_command("collectstatic", "--dry-run", verbosity=0, interactive=False) + + for name in ["styles.css.gz", "styles.css.br"]: + path = os.path.join(settings.STATIC_ROOT, name) + assert not os.path.exists(path) + + def test_make_helpful_exception(_compressed_manifest_storage): class TriggerException(HashedFilesMixin): def exists(self, path):