diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8bfeaa72..a0353730 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,48 +14,27 @@ repos: rev: 1.8.0 hooks: - id: pyproject-fmt - - repo: https://github.com/rstcheck/rstcheck - rev: v6.2.0 - hooks: - - id: rstcheck - additional_dependencies: - - sphinx==6.1.3 - - tomli==2.0.1 - - repo: https://github.com/sphinx-contrib/sphinx-lint - rev: v0.9.1 - hooks: - - id: sphinx-lint - repo: https://github.com/asottile/pyupgrade rev: v3.15.2 hooks: - id: pyupgrade args: [--py39-plus] + - repo: https://github.com/hadialqattan/pycln + rev: "v2.4.0" + hooks: + - id: pycln + args: [--all] - repo: https://github.com/adamchainz/django-upgrade rev: 1.16.0 hooks: - id: django-upgrade args: [--target-version, "3.2"] - - repo: https://github.com/psf/black-pre-commit-mirror - rev: 24.4.2 - hooks: - - id: black - - repo: https://github.com/adamchainz/blacken-docs - rev: 1.16.0 - hooks: - - id: blacken-docs - additional_dependencies: - - black==23.1.0 - - repo: https://github.com/pycqa/isort - rev: 5.13.2 - hooks: - - id: isort - name: isort (python) - - repo: https://github.com/PyCQA/flake8 - rev: 7.0.0 - hooks: - - id: flake8 - additional_dependencies: - - flake8-bugbear - - flake8-comprehensions - - flake8-logging - - flake8-tidy-imports + - repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.6.6 + hooks: + # Run the linter. + - id: ruff + args: [--fix] + # Run the formatter. + - id: ruff-format diff --git a/CHANGELOG.md b/CHANGELOG.md index 350aa31f..f2987832 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,13 +40,13 @@ Using the following categories, list your changes in this order: ### Fixed -- Fix crash when running `manage.py collectstatic` if Django's `settings.py:STATIC_ROOT` is a `Path` object. +- Fix crash when running `manage.py collectstatic` when Django's `settings.py:STATIC_ROOT` is a `Path` object. ## [2.0.0](https://github.com/Archmonger/ServeStatic/compare/1.2.0...2.0.0) - 2024-09-12 ### Added -- Django `settings.py:SERVESTATIC_USE_MANIFEST` utilize the Django manifest rather than scanning the filesystem. +- Django `settings.py:SERVESTATIC_USE_MANIFEST` will allow ServeStatic to use the Django manifest rather than scanning the filesystem. - When also using ServeStatic's `CompressedManifestStaticFilesStorage` backend, ServeStatic will no longer need to call `os.stat`. ### Changed diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 49ea5488..737aed7f 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -98,9 +98,9 @@ watch: - ../LICENSE.md site_name: ServeStatic -site_author: Archmonger +site_author: Mark Bakhit (Archmonger) site_description: Production-grade static file server for Python web apps. -copyright: '©
Archmonger' +copyright: '© Mark Bakhit (Archmonger)' repo_url: https://github.com/Archmonger/ServeStatic site_url: https://archmonger.github.io/ServeStatic/ repo_name: ServeStatic (GitHub) diff --git a/docs/src/contributing.md b/docs/src/contributing.md index d92726de..007407eb 100644 --- a/docs/src/contributing.md +++ b/docs/src/contributing.md @@ -1,112 +1,59 @@ -# Contributing +## Creating a development environment -## Tool Setup +If you plan to make code changes to this repository, you will need to install the following dependencies first: -### `python` +- [Git](https://git-scm.com/downloads) +- [Python 3.9+](https://www.python.org/downloads/) +- [Hatch](https://hatch.pypa.io/latest/) -To contribute code or documentation updates, an installation of Python 3 is required. +Once you finish installing these dependencies, you can clone this repository: -### `hatch` - -This project utilizes [`hatch`](https://hatch.pypa.io/latest/) to manage Python environments for development and testing. Follow -[the `hatch` installation instructions](https://hatch.pypa.io/latest/install/) before continuing through this document. - -### `pre-commit` - -Additionally, this project uses [`pre-commit`](https://pre-commit.com/) Git hooks to run linting and formatting checks against each commit. See [the `pre-commit` installation instructions](https://pre-commit.com/#install) for how to install this tool. - -Once installed, run `pre-commit install` to set up the git hook scripts: - -``` shell -$ pre-commit install -pre-commit installed at .git/hooks/pre-commit -``` - -### Git - -Clone the repository using: - -``` shell +```shell git clone https://github.com/Archmonger/ServeStatic.git cd ServeStatic ``` -All example commands are expected to be run from the `ServeStatic` folder. - -## Code Contributions - -Ensure you have followed the [tool setup](#tool-setup) instructions before following the instructions below. - -### Development - -#### Linting - -The project uses `flake8` and `isort` for linting and uses `black` to format code. To run the all linters: - -``` shell -hatch run lint:check -``` - -Or select a specific linter: +## Executing test environment commands -``` shell -hatch run lint:flake8 -``` - -!!! tip - - Linting is likely to see an update in the near future to use `ruff` for linting and formatting. - -### Testing - -Tests are run across a matrix of Python and Django versions to ensure full compatibility with all supported versions. +By utilizing `hatch`, the following commands are available to manage the development environment. -#### Full Test Suite +### Tests -To run the full test suite, using the system Python: +| Command | Description | +| --- | --- | +| `hatch test` | Run Python tests using the current environment's Python version | +| `hatch test --all` | Run tests using all compatible Python and Django versions | +| `hatch test --python 3.9` | Run tests using a specific Python version | +| `hatch test --include "django=5.1"` | Run tests using a specific Django version | +| `hatch test -k test_get_js_static_file` | Run only a specific test | -``` shell -hatch test -``` - -To select a particular Python version: - -``` shell -hatch test --python 3.9 -``` +??? question "What other arguments are available to me?" -!!! tip + The `hatch test` command is a wrapper for `pytest`. Hatch "intercepts" a handful of arguments, which can be previewed by typing `hatch test --help`. - `hatch` can manage Python versions for you, for example installing Python 3.9: `hatch python install 3.9` - - See the [hatch documentation](https://hatch.pypa.io/latest/tutorials/python/manage/) - -To select a particular Django version: - -``` shell -hatch test --include "django=5.1" -``` - -#### Specific Test(s) - -To run only a specific test: - -``` shell -hatch test -k test_get_js_static_file -``` + Any additional arguments in the `test` command are directly passed on to pytest. See the [pytest documentation](https://docs.pytest.org/en/8.3.x/reference/reference.html#command-line-flags) for what additional arguments are available. -!!! tip +### Linting and Formatting - Additional arguments are passed on to pytest. +| Command | Description | +| --- | --- | +| `hatch fmt` | Run all linters and formatters | +| `hatch fmt --check` | Run all linters and formatters, but do not save fixes to the disk | +| `hatch fmt --linter` | Run only linters | +| `hatch fmt --formatter` | Run only formatters | +| `hatch run precommit:check` | Run all [`pre-commit`](https://pre-commit.com/) checks configured within this repository | +| `hatch run precommit:update` | Update the [`pre-commit`](https://pre-commit.com/) hooks configured within this repository | - See the [pytest documentation](https://docs.pytest.org/en/8.3.x/reference/reference.html#command-line-flags) for options +### Documentation -## Documentation Contributions +| Command | Description | +| --- | --- | +| `hatch run docs:serve` | Start the `mkdocs` server to view documentation locally | +| `hatch run docs:build` | Build the documentation | +| `hatch run docs:linkcheck` | Check for broken links in the documentation | -Ensure you have followed the [tool setup](#tool-setup) instructions before following the instructions below. +??? tip "Configure your IDE for linting" -### Modifying Documentation + This repository uses `hatch fmt` for linting and formatting, which is a [modestly customized](https://hatch.pypa.io/latest/config/internal/static-analysis/#default-settings) version of [`ruff`](https://github.com/astral-sh/ruff). -1. Start the `mkdocs` server by running `hatch run docs:serve` -1. Visit [the documentation site](http://localhost:8000/) in your preferred browser -1. Edit the documentation. The site will load change as documentation files change. + You can install `ruff` as a plugin to your preferred code editor to create a similar environment. diff --git a/docs/src/dictionary.txt b/docs/src/dictionary.txt index a42378ec..bebe042d 100644 --- a/docs/src/dictionary.txt +++ b/docs/src/dictionary.txt @@ -35,3 +35,4 @@ linter linters linting pytest +formatters diff --git a/docs/src/django-settings.md b/docs/src/django-settings.md index 8efbf6ce..054f7d4b 100644 --- a/docs/src/django-settings.md +++ b/docs/src/django-settings.md @@ -30,7 +30,7 @@ Find and serve files using Django's manifest file. This is the most efficient way to determine what files are available, but it requires that you are using a [manifest-compatible](https://docs.djangoproject.com/en/stable/ref/contrib/staticfiles/#manifeststaticfilesstorage) storage backend. -When using ServeStatic's [`CompressedManifestStaticFilesStorage`](./django.md#step-2-add-compression-and-caching-support) storage backend, ServeStatic will no longer need to call `os.stat` on each file during startup which improves startup speeds. +When using ServeStatic's [`CompressedManifestStaticFilesStorage`](./django.md#step-2-add-compression-and-caching-support) storage backend, ServeStatic will no longer need to call `os.stat` on each file during startup. --- @@ -138,8 +138,9 @@ def force_download_pdfs(headers, path, url): url: The host-relative URL of the file e.g. `/static/styles/app.css` """ - if path.endswith('.pdf'): - headers['Content-Disposition'] = 'attachment' + if path.endswith(".pdf"): + headers["Content-Disposition"] = "attachment" + SERVESTATIC_ADD_HEADERS_FUNCTION = force_download_pdfs ``` @@ -167,7 +168,8 @@ def immutable_file_test(path, url): """ # Match filename with 12 hex digits before the extension # e.g. app.db8f2edc0c8a.js - return re.match(r'^.+\.[0-9a-f]{12}\..+$', url) + return re.match(r"^.+\.[0-9a-f]{12}\..+$", url) + SERVESTATIC_IMMUTABLE_FILE_TEST = immutable_file_test ``` diff --git a/docs/src/django.md b/docs/src/django.md index 7d52344a..2dc4a3ee 100644 --- a/docs/src/django.md +++ b/docs/src/django.md @@ -10,10 +10,9 @@ Edit your `settings.py` file and add ServeStatic to the `MIDDLEWARE` list. The S ```python linenums="0" MIDDLEWARE = [ - ..., "django.middleware.security.SecurityMiddleware", "servestatic.middleware.ServeStaticMiddleware", - ..., + # ... ] ``` @@ -29,7 +28,7 @@ ServeStatic comes with a storage backend which compresses your files and hashes ```python linenums="0" STORAGES = { - ..., + # ... "staticfiles": { "BACKEND": "servestatic.storage.CompressedManifestStaticFilesStorage", }, @@ -146,9 +145,9 @@ You can disable Django's static file handling and allow ServeStatic to take over ```python linenums="0" INSTALLED_APPS = [ - ..., + # ... "servestatic.runserver_nostatic", "django.contrib.staticfiles", - ..., + # ... ] ``` diff --git a/docs/src/quick-start.md b/docs/src/quick-start.md index a896290a..1bc374af 100644 --- a/docs/src/quick-start.md +++ b/docs/src/quick-start.md @@ -16,7 +16,6 @@ Edit your `settings.py` file and add `ServeStatic` to the `MIDDLEWARE` list, abo ```python linenums="0" MIDDLEWARE = [ - # ... "django.middleware.security.SecurityMiddleware", "servestatic.middleware.ServeStaticMiddleware", # ... diff --git a/docs/src/servestatic.md b/docs/src/servestatic.md index 48589780..0632a913 100644 --- a/docs/src/servestatic.md +++ b/docs/src/servestatic.md @@ -58,7 +58,7 @@ If you want to something other than `index.html` as the index file, then you can A dictionary mapping file extensions (lowercase) to the mimetype for that extension. For example: ```python linenums="0" -{'.foo': 'application/x-foo'} +{".foo": "application/x-foo"} ``` Note that ServeStatic ships with its own default set of mimetypes and does not use the system-supplied ones (e.g. `/etc/mime.types`). This ensures that it behaves consistently regardless of the environment in which it's run. View the defaults in the `media_types.py` file. @@ -114,13 +114,14 @@ def force_download_pdfs(headers, path, url): None. Changes should be made by modifying the headers \ dictionary directly. """ - if path.endswith('.pdf'): - headers['Content-Disposition'] = 'attachment' + if path.endswith(".pdf"): + headers["Content-Disposition"] = "attachment" + application = ServeStatic( application, add_headers_function=force_download_pdfs, - ) +) ``` --- @@ -151,7 +152,7 @@ def immutable_file_test(path, url): """ # Match filename with 12 hex digits before the extension # e.g. app.db8f2edc0c8a.js - return re.match(r'^.+\.[0-9a-f]{12}\..+$', url) + return re.match(r"^.+\.[0-9a-f]{12}\..+$", url) ``` ## Compression Support diff --git a/pyproject.toml b/pyproject.toml index 845e1f4f..66e0b2b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,9 +12,7 @@ keywords = [ "Django", ] license = "MIT" -authors = [ - { name = "Mark Bakhit" }, -] +authors = [{ name = "Mark Bakhit" }] requires-python = ">=3.9" classifiers = [ "Development Status :: 5 - Production/Stable", @@ -48,19 +46,13 @@ urls.Homepage = "https://github.com/Archmonger/ServeStatic" path = "src/servestatic/__init__.py" [tool.hatch.build.targets.sdist] -include = [ - "/src", - "/tests", -] +include = ["/src", "/tests"] [tool.hatch.build.targets.wheel] -nclude = [ - "/src", - "/tests", -] +nclude = ["/src", "/tests"] [tool.hatch.metadata] -license-files = { paths = [ "LICENSE" ] } +license-files = { paths = ["LICENSE"] } [tool.hatch.envs.default] installer = "uv" @@ -69,52 +61,60 @@ installer = "uv" # Testing # [tool.hatch.envs.hatch-test] -extra-dependencies = [ - "pytest-sugar", - "requests", - "brotli", -] +extra-dependencies = ["pytest-sugar", "requests", "brotli"] randomize = true matrix-name-format = "{variable}-{value}" # Django 3.2 LTS [[tool.hatch.envs.hatch-test.matrix]] -python = [ "3.9", "3.10"] -django = [ "3.2" ] +python = ["3.9", "3.10"] +django = ["3.2"] # Django 4.0 [[tool.hatch.envs.hatch-test.matrix]] -python = [ "3.9", "3.10" ] -django = [ "4.0" ] +python = ["3.9", "3.10"] +django = ["4.0"] # Django 4.1 [[tool.hatch.envs.hatch-test.matrix]] -python = [ "3.9", "3.10", "3.11"] -django = [ "4.1" ] +python = ["3.9", "3.10", "3.11"] +django = ["4.1"] # Django 4.2 [[tool.hatch.envs.hatch-test.matrix]] -python = [ "3.9", "3.10", "3.11", "3.12"] -django = [ "4.2" ] +python = ["3.9", "3.10", "3.11", "3.12"] +django = ["4.2"] # Django 5.0 [[tool.hatch.envs.hatch-test.matrix]] -python = [ "3.10", "3.11", "3.12"] -django = [ "5.0" ] +python = ["3.10", "3.11", "3.12"] +django = ["5.0"] # Django 5.1 [[tool.hatch.envs.hatch-test.matrix]] -python = [ "3.10", "3.11", "3.12"] -django = [ "5.1" ] +python = ["3.10", "3.11", "3.12"] +django = ["5.1"] [tool.hatch.envs.hatch-test.overrides] matrix.django.dependencies = [ - { if = [ "3.2" ], value = "django~=3.2" }, - { if = [ "4.0" ], value = "django~=4.0" }, - { if = [ "4.1" ], value = "django~=4.1" }, - { if = [ "4.2" ], value = "django~=4.2" }, - { if = [ "5.0" ], value = "django~=5.0" }, - { if = [ "5.1" ], value = "django~=5.1" }, + { if = [ + "3.2", + ], value = "django~=3.2" }, + { if = [ + "4.0", + ], value = "django~=4.0" }, + { if = [ + "4.1", + ], value = "django~=4.1" }, + { if = [ + "4.2", + ], value = "django~=4.2" }, + { if = [ + "5.0", + ], value = "django~=5.0" }, + { if = [ + "5.1", + ], value = "django~=5.1" }, ] # @@ -124,77 +124,38 @@ matrix.django.dependencies = [ template = "docs" detached = true dependencies = [ - "mkdocs", - "mkdocs-git-revision-date-localized-plugin", - "mkdocs-material", - "mkdocs-include-markdown-plugin", - "linkcheckmd", - "mkdocs-spellcheck[all]", - "mkdocs-git-authors-plugin", - "mkdocs-minify-plugin", - "mike", + "mkdocs", + "mkdocs-git-revision-date-localized-plugin", + "mkdocs-material", + "mkdocs-include-markdown-plugin", + "linkcheckmd", + "mkdocs-spellcheck[all]", + "mkdocs-git-authors-plugin", + "mkdocs-minify-plugin", + "mike", ] [tool.hatch.envs.docs.scripts] +serve = ["cd docs && mkdocs serve"] +build = ["cd docs && mkdocs build --strict"] linkcheck = [ - "linkcheckMarkdown docs/ -v -r", - "linkcheckMarkdown README.md -v -r", - "linkcheckMarkdown CHANGELOG.md -v -r", -] -build = [ "cd docs && mkdocs build --strict" ] -deploy_latest = [ - "cd docs && mike deploy --push --update-aliases {args} latest", -] -deploy_develop = [ - "cd docs && mike deploy --push develop", -] -serve = [ - "cd docs && mkdocs serve" -] - -# -# Linting -# -[tool.hatch.envs.lint] -template = "lint" -detached = true -dependencies = [ - "black", - "isort", - "flake8", - "flake8-bugbear", - "flake8-comprehensions", - "flake8-logging", - "flake8-tidy-imports" + "linkcheckMarkdown docs/ -v -r", + "linkcheckMarkdown README.md -v -r", + "linkcheckMarkdown CHANGELOG.md -v -r", ] +deploy_latest = ["cd docs && mike deploy --push --update-aliases {args} latest"] +deploy_develop = ["cd docs && mike deploy --push develop"] -[tool.hatch.envs.lint.scripts] -check = [ - "flake8 .", - "isort .", - "black ." -] -flake8 = [ - "flake8 .", -] -isort = [ - "isort ." -] -fmt = [ - "black ." -] # # pre-commit # -[tool.hatch.envs.pre-commit] +[tool.hatch.envs.precommit] template = "pre-commit" detached = true -dependencies = [ - "pre-commit>=3,<4", -] +dependencies = ["pre-commit>=3,<4"] -[tool.hatch.envs.pre-commit.scripts] +[tool.hatch.envs.precommit.scripts] check = ["pre-commit run --all-files"] update = ["pre-commit autoupdate"] @@ -203,7 +164,26 @@ update = ["pre-commit autoupdate"] # [tool.black] -target-version = [ 'py39' ] +target-version = ['py39'] + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +extend-ignore = [ + "FBT001", # Boolean-typed positional argument in function definition + "FBT002", # Boolean default positional argument in function definition + "ARG001", # Unused function argument + "ARG002", # Unused method argument + "ARG004", # Unused static method argument + "SLF001", # Private member accessed + "PLR2004", # Magic value used in comparison + "SIM115", # Use context handler for opening files +] +preview = true + +[tool.ruff.format] +preview = true [tool.pytest.ini_options] addopts = """\ @@ -214,15 +194,10 @@ addopts = """\ [tool.coverage.run] branch = true parallel = true -source = [ - "src/", - "tests/", -] +source = ["src/", "tests/"] [tool.coverage.paths] -source = [ - "src/", -] +source = ["src/"] [tool.coverage.report] show_missing = true diff --git a/scripts/generate_default_media_types.py b/scripts/generate_default_media_types.py index 462b7ef5..0b870ed1 100755 --- a/scripts/generate_default_media_types.py +++ b/scripts/generate_default_media_types.py @@ -28,9 +28,8 @@ def main() -> int: if args.check: print("Would write changes") return 1 - else: - print(f"Writing {media_types_py}") - media_types_py.write_text(new_text) + print(f"Writing {media_types_py}") + media_types_py.write_text(new_text) return 0 @@ -66,10 +65,7 @@ def default_types() -> dict[str, str]: def get_default_types_function() -> str: types_map = get_types_map() - lines = [ - f' "{suffix}": "{media_type}",' # noqa: B028 - for suffix, media_type in types_map.items() - ] + lines = [f' "{suffix}": "{media_type}",' for suffix, media_type in types_map.items()] return FUNCTION_TEMPLATE.format(entries="\n".join(lines)) @@ -96,7 +92,8 @@ def get_nginx_data() -> str: with closing(conn): conn.request("GET", "/nginx/nginx/master/conf/mime.types") response = conn.getresponse() - assert response.status == 200 + if response.status != 200: + raise AssertionError return response.read().decode() diff --git a/src/servestatic/__init__.py b/src/servestatic/__init__.py index 6d182d15..fc059c81 100644 --- a/src/servestatic/__init__.py +++ b/src/servestatic/__init__.py @@ -5,4 +5,4 @@ __version__ = "2.0.1" -__all__ = ["ServeStaticASGI", "ServeStatic"] +__all__ = ["ServeStatic", "ServeStaticASGI"] diff --git a/src/servestatic/asgi.py b/src/servestatic/asgi.py index 12f9e41f..91e88b37 100644 --- a/src/servestatic/asgi.py +++ b/src/servestatic/asgi.py @@ -45,25 +45,22 @@ async def __call__(self, scope, receive, send): # Convert ASGI headers into WSGI headers. Allows us to reuse all of our WSGI # header logic inside of aget_response(). wsgi_headers = { - "HTTP_" + key.decode().upper().replace("-", "_"): value.decode() - for key, value in scope["headers"] + "HTTP_" + key.decode().upper().replace("-", "_"): value.decode() for key, value in scope["headers"] } # Get the ServeStatic file response response = await self.static_file.aget_response(scope["method"], wsgi_headers) # Start a new HTTP response for the file - await send( - { - "type": "http.response.start", - "status": response.status, - "headers": [ - # Convert headers back to ASGI spec - (key.lower().encode(), value.encode()) - for key, value in response.headers - ], - } - ) + await send({ + "type": "http.response.start", + "status": response.status, + "headers": [ + # Convert headers back to ASGI spec + (key.lower().encode(), value.encode()) + for key, value in response.headers + ], + }) # Head responses have no body, so we terminate early if response.file is None: @@ -75,12 +72,10 @@ async def __call__(self, scope, receive, send): while True: chunk = await async_file.read(self.block_size) more_body = bool(chunk) - await send( - { - "type": "http.response.body", - "body": chunk, - "more_body": more_body, - } - ) + await send({ + "type": "http.response.body", + "body": chunk, + "more_body": more_body, + }) if not more_body: break diff --git a/src/servestatic/base.py b/src/servestatic/base.py index ab829f5f..2c16e665 100644 --- a/src/servestatic/base.py +++ b/src/servestatic/base.py @@ -129,6 +129,7 @@ def find_file(self, url): for path in self.candidate_paths_for_url(url): with contextlib.suppress(MissingFileError): return self.find_file_at_path(path, url) + return None def candidate_paths_for_url(self, url): for root, prefix in self.directories: @@ -145,7 +146,7 @@ def find_file_at_path(self, path, url): if url.endswith("/"): path = os.path.join(path, self.index_file) return self.get_static_file(path, url) - elif url.endswith(f"/{self.index_file}"): + if url.endswith(f"/{self.index_file}"): if os.path.isfile(path): return self.redirect(url, url[: -len(self.index_file)]) else: @@ -173,12 +174,11 @@ def url_is_canonical(url): @staticmethod def is_compressed_variant(path, stat_cache=None): - if path[-3:] in (".gz", ".br"): + if path[-3:] in {".gz", ".br"}: uncompressed_path = path[:-3] if stat_cache is None: return os.path.isfile(uncompressed_path) - else: - return uncompressed_path in stat_cache + return uncompressed_path in stat_cache return False def get_static_file(self, path, url, stat_cache=None): @@ -201,10 +201,7 @@ def get_static_file(self, path, url, stat_cache=None): def add_mime_headers(self, headers, path, url): media_type = self.media_types.get_type(path) - if media_type.startswith("text/"): - params = {"charset": str(self.charset)} - else: - params = {} + params = {"charset": str(self.charset)} if media_type.startswith("text/") else {} headers.add_header("Content-Type", str(media_type), **params) def add_cache_headers(self, headers, path, url): @@ -236,9 +233,7 @@ def redirect(self, from_url, to_url): elif from_url == to_url + self.index_file: relative_url = "./" else: - raise ValueError(f"Cannot handle redirect: {from_url} > {to_url}") - if self.max_age is not None: - headers = {"Cache-Control": f"max-age={self.max_age}, public"} - else: - headers = {} + msg = f"Cannot handle redirect: {from_url} > {to_url}" + raise ValueError(msg) + headers = {"Cache-Control": f"max-age={self.max_age}, public"} if self.max_age is not None else {} return Redirect(relative_url, headers=headers) diff --git a/src/servestatic/compress.py b/src/servestatic/compress.py index db6ad1e7..08589a8d 100644 --- a/src/servestatic/compress.py +++ b/src/servestatic/compress.py @@ -52,9 +52,7 @@ class Compressor: "wmv", ) - def __init__( - self, extensions=None, use_gzip=True, use_brotli=True, log=print, quiet=False - ): + def __init__(self, extensions=None, use_gzip=True, use_brotli=True, log=print, quiet=False): if extensions is None: extensions = self.SKIP_COMPRESS_EXTENSIONS self.extension_re = self.get_extension_re(extensions) @@ -66,9 +64,7 @@ def __init__( def get_extension_re(extensions): if not extensions: return re.compile("^$") - return re.compile( - rf'\.({"|".join(map(re.escape, extensions))})$', re.IGNORECASE - ) + return re.compile(rf'\.({"|".join(map(re.escape, extensions))})$', re.IGNORECASE) def should_compress(self, filename): return not self.extension_re.search(filename) @@ -98,9 +94,7 @@ def compress_gzip(data): output = BytesIO() # Explicitly set mtime to 0 so gzip content is fully determined # by file content (0 = "no timestamp" according to gzip spec) - with gzip.GzipFile( - filename="", mode="wb", fileobj=output, compresslevel=9, mtime=0 - ) as gz_file: + with gzip.GzipFile(filename="", mode="wb", fileobj=output, compresslevel=9, mtime=0) as gz_file: gz_file.write(data) return output.getvalue() @@ -116,14 +110,13 @@ def is_compressed_effectively(self, encoding_name, path, orig_size, data): ratio = compressed_size / orig_size is_effective = ratio <= 0.95 if is_effective: - self.log( - f"{encoding_name} compressed {path} ({orig_size // 1024}K -> {compressed_size // 1024}K)" - ) + self.log(f"{encoding_name} compressed {path} ({orig_size // 1024}K -> {compressed_size // 1024}K)") else: self.log(f"Skipping {path} ({encoding_name} compression not effective)") return is_effective - def write_data(self, path, data, suffix, stat_result): + @staticmethod + def write_data(path, data, suffix, stat_result): filename = path + suffix with open(filename, "wb") as f: f.write(data) @@ -144,9 +137,7 @@ def main(argv=None): "'.gz' and '.br' suffixes (as long as this results in a " "smaller file)" ) - parser.add_argument( - "-q", "--quiet", help="Don't produce log output", action="store_true" - ) + parser.add_argument("-q", "--quiet", help="Don't produce log output", action="store_true") parser.add_argument( "--no-gzip", help="Don't produce gzip '.gz' files", @@ -164,10 +155,7 @@ def main(argv=None): parser.add_argument( "extensions", nargs="*", - help=( - "File extensions to exclude from compression " - + f"(default: {default_exclude})" - ), + help=("File extensions to exclude from compression " + f"(default: {default_exclude})"), default=Compressor.SKIP_COMPRESS_EXTENSIONS, ) args = parser.parse_args(argv) diff --git a/src/servestatic/middleware.py b/src/servestatic/middleware.py index ca45990b..b513d630 100644 --- a/src/servestatic/middleware.py +++ b/src/servestatic/middleware.py @@ -47,21 +47,18 @@ class ServeStaticMiddleware(ServeStaticBase): def __init__(self, get_response=None, settings=django_settings): if not iscoroutinefunction(get_response): - raise ValueError( - "ServeStaticMiddleware requires an async compatible version of Django." - ) + msg = "ServeStaticMiddleware requires an async compatible version of Django." + raise ValueError(msg) markcoroutinefunction(self) self.get_response = get_response - debug = getattr(settings, "DEBUG") + debug = settings.DEBUG autorefresh = getattr(settings, "SERVESTATIC_AUTOREFRESH", debug) max_age = getattr(settings, "SERVESTATIC_MAX_AGE", 0 if debug else 60) allow_all_origins = getattr(settings, "SERVESTATIC_ALLOW_ALL_ORIGINS", True) charset = getattr(settings, "SERVESTATIC_CHARSET", "utf-8") mimetypes = getattr(settings, "SERVESTATIC_MIMETYPES", None) - add_headers_function = getattr( - settings, "SERVESTATIC_ADD_HEADERS_FUNCTION", None - ) + add_headers_function = getattr(settings, "SERVESTATIC_ADD_HEADERS_FUNCTION", None) self.index_file = getattr(settings, "SERVESTATIC_INDEX_FILE", None) immutable_file_test = getattr(settings, "SERVESTATIC_IMMUTABLE_FILE_TEST", None) self.use_finders = getattr(settings, "SERVESTATIC_USE_FINDERS", debug) @@ -72,9 +69,7 @@ def __init__(self, get_response=None, settings=django_settings): ) self.static_prefix = getattr(settings, "SERVESTATIC_STATIC_PREFIX", None) self.static_root = getattr(settings, "STATIC_ROOT", None) - self.keep_only_hashed_files = getattr( - django_settings, "SERVESTATIC_KEEP_ONLY_HASHED_FILES", False - ) + self.keep_only_hashed_files = getattr(django_settings, "SERVESTATIC_KEEP_ONLY_HASHED_FILES", False) force_script_name = getattr(settings, "FORCE_SCRIPT_NAME", None) static_url = getattr(settings, "STATIC_URL", None) root = getattr(settings, "SERVESTATIC_ROOT", None) @@ -125,19 +120,12 @@ async def __call__(self, request): if static_file is not None: return await self.aserve(static_file, request) - if django_settings.DEBUG and request.path.startswith( - django_settings.STATIC_URL - ): + if django_settings.DEBUG and request.path.startswith(django_settings.STATIC_URL): current_finders = finders.get_finders() - app_dirs = [ - storage.location - for finder in current_finders - for storage in finder.storages.values() - ] + app_dirs = [storage.location for finder in current_finders for storage in finder.storages.values()] app_dirs = "\n• ".join(sorted(app_dirs)) - raise MissingFileError( - f"ServeStatic did not find the file '{request.path.lstrip(django_settings.STATIC_URL)}' within the following paths:\n• {app_dirs}" - ) + msg = f"ServeStatic did not find the file '{request.path.lstrip(django_settings.STATIC_URL)}' within the following paths:\n• {app_dirs}" + raise MissingFileError(msg) return await self.get_response(request) @@ -160,14 +148,12 @@ def add_files_from_finders(self): for finder in finders.get_finders(): for path, storage in finder.list(None): prefix = (getattr(storage, "prefix", None) or "").strip("/") - url = "".join( - ( - self.static_prefix, - prefix, - "/" if prefix else "", - path.replace("\\", "/"), - ) - ) + url = "".join(( + self.static_prefix, + prefix, + "/" if prefix else "", + path.replace("\\", "/"), + )) # Use setdefault as only first matching file should be used files.setdefault(url, storage.path(path)) self.insert_directory(storage.location, self.static_prefix) @@ -178,10 +164,8 @@ def add_files_from_finders(self): def add_files_from_manifest(self): if not isinstance(staticfiles_storage, ManifestStaticFilesStorage): - raise ValueError( - "SERVESTATIC_USE_MANIFEST is set to True but " - "staticfiles storage is not using a manifest." - ) + msg = "SERVESTATIC_USE_MANIFEST is set to True but staticfiles storage is not using a manifest." + raise TypeError(msg) staticfiles: dict = staticfiles_storage.hashed_files stat_cache = None @@ -189,10 +173,7 @@ def add_files_from_manifest(self): if hasattr(staticfiles_storage, "load_manifest_stats"): manifest_stats: dict = staticfiles_storage.load_manifest_stats() if manifest_stats: - stat_cache = { - staticfiles_storage.path(k): os.stat_result(v) - for k, v in manifest_stats.items() - } + stat_cache = {staticfiles_storage.path(k): os.stat_result(v) for k, v in manifest_stats.items()} # Add files to ServeStatic for unhashed_name, hashed_name in staticfiles.items(): @@ -241,7 +222,8 @@ def immutable_file_test(self, path, url): # versioned filename return bool(static_url and basename(static_url) == basename(url)) - def get_name_without_hash(self, filename): + @staticmethod + def get_name_without_hash(filename): """ Removes the version hash from a filename e.g, transforms 'css/application.f3ea4bcc2.css' into 'css/application.css' @@ -254,7 +236,8 @@ def get_name_without_hash(self, filename): name = os.path.splitext(name_with_hash)[0] return name + ext - def get_static_url(self, name): + @staticmethod + def get_static_url(name): with contextlib.suppress(ValueError): return staticfiles_storage.url(name) diff --git a/src/servestatic/responders.py b/src/servestatic/responders.py index c97bbcb6..7b4e8a02 100644 --- a/src/servestatic/responders.py +++ b/src/servestatic/responders.py @@ -16,7 +16,7 @@ class Response: - __slots__ = ("status", "headers", "file") + __slots__ = ("file", "headers", "status") def __init__(self, status, headers, file): self.status = status @@ -114,7 +114,7 @@ def __init__(self, path, headers, encodings=None, stat_cache=None): self.alternatives = self.get_alternatives(headers, files) def get_response(self, method, request_headers): - if method not in ("GET", "HEAD"): + if method not in {"GET", "HEAD"}: return NOT_ALLOWED_RESPONSE if self.is_not_modified(request_headers): return self.not_modified_response @@ -132,7 +132,7 @@ def get_response(self, method, request_headers): async def aget_response(self, method, request_headers): """Variant of `get_response` that works with async HTTP requests. To minimize code duplication, `request_headers` conforms to WSGI header spec.""" - if method not in ("GET", "HEAD"): + if method not in {"GET", "HEAD"}: return NOT_ALLOWED_RESPONSE if self.is_not_modified(request_headers): return self.not_modified_response @@ -146,9 +146,7 @@ async def aget_response(self, method, request_headers): # just ignore it and return the standard response (this # behaviour is allowed by the spec) with contextlib.suppress(ValueError): - return await self.aget_range_response( - range_header, headers, file_handle - ) + return await self.aget_range_response(range_header, headers, file_handle) return Response(HTTPStatus.OK, headers, file_handle) def get_range_response(self, range_header, base_headers, file_handle): @@ -163,12 +161,10 @@ def get_range_response(self, range_header, base_headers, file_handle): return self.get_range_not_satisfiable_response(file_handle, size) if file_handle is not None: file_handle = SlicedFile(file_handle, start, end) - headers.extend( - ( - ("Content-Range", f"bytes {start}-{end}/{size}"), - ("Content-Length", str(end - start + 1)), - ) - ) + headers.extend(( + ("Content-Range", f"bytes {start}-{end}/{size}"), + ("Content-Length", str(end - start + 1)), + )) return Response(HTTPStatus.PARTIAL_CONTENT, headers, file_handle) async def aget_range_response(self, range_header, base_headers, file_handle): @@ -184,12 +180,10 @@ async def aget_range_response(self, range_header, base_headers, file_handle): return await self.aget_range_not_satisfiable_response(file_handle, size) if file_handle is not None: file_handle = AsyncSlicedFile(file_handle, start, end) - headers.extend( - ( - ("Content-Range", f"bytes {start}-{end}/{size}"), - ("Content-Length", str(end - start + 1)), - ) - ) + headers.extend(( + ("Content-Range", f"bytes {start}-{end}/{size}"), + ("Content-Length", str(end - start + 1)), + )) return Response(HTTPStatus.PARTIAL_CONTENT, headers, file_handle) def get_byte_range(self, range_header, size): @@ -203,12 +197,12 @@ def get_byte_range(self, range_header, size): def parse_byte_range(range_header): units, _, range_spec = range_header.strip().partition("=") if units != "bytes": - raise ValueError() + raise ValueError # Only handle a single range spec. Multiple ranges will trigger a # ValueError below which will result in the Range header being ignored start_str, sep, end_str = range_spec.strip().partition("-") if not sep: - raise ValueError() + raise ValueError if not start_str: start = -int(end_str) end = None @@ -250,7 +244,8 @@ def get_file_stats(path, encodings, stat_cache): continue return files - def get_headers(self, headers_list, files): + @staticmethod + def get_headers(headers_list, files): headers = Headers(headers_list) main_file = files[None] if len(files) > 1: @@ -270,12 +265,8 @@ def get_headers(self, headers_list, files): @staticmethod def get_not_modified_response(headers): - not_modified_headers = [ - (key, headers[key]) for key in NOT_MODIFIED_HEADERS if key in headers - ] - return Response( - status=HTTPStatus.NOT_MODIFIED, headers=not_modified_headers, file=None - ) + not_modified_headers = [(key, headers[key]) for key in NOT_MODIFIED_HEADERS if key in headers] + return Response(status=HTTPStatus.NOT_MODIFIED, headers=not_modified_headers, file=None) @staticmethod def get_alternatives(base_headers, files): @@ -287,7 +278,7 @@ def get_alternatives(base_headers, files): headers["Content-Length"] = str(file_entry.size) if encoding: headers["Content-Encoding"] = encoding - encoding_re = re.compile(r"\b%s\b" % encoding) + encoding_re = re.compile(rf"\b{encoding}\b") else: encoding_re = re.compile("") alternatives.append((encoding_re, file_entry.path, headers.items())) @@ -313,9 +304,14 @@ def get_path_and_headers(self, request_headers): if accept_encoding == "*": accept_encoding = "" # These are sorted by size so first match is the best - for encoding_re, path, headers in self.alternatives: - if encoding_re.search(accept_encoding): - return path, headers + return next( + ( + (path, headers) + for encoding_re, path, headers in self.alternatives + if encoding_re.search(accept_encoding) + ), + None, + ) class Redirect: @@ -344,7 +340,7 @@ class IsDirectoryError(MissingFileError): class FileEntry: - __slots__ = ("path", "size", "mtime") + __slots__ = ("mtime", "path", "size") def __init__(self, path, stat_cache=None): self.path = path @@ -364,11 +360,13 @@ def stat_regular_file(path, stat_function): except KeyError as exc: raise MissingFileError(path) from exc except OSError as exc: - if exc.errno in (errno.ENOENT, errno.ENAMETOOLONG): + if exc.errno in {errno.ENOENT, errno.ENAMETOOLONG}: raise MissingFileError(path) from exc raise if not stat.S_ISREG(stat_result.st_mode): if stat.S_ISDIR(stat_result.st_mode): - raise IsDirectoryError(f"Path is a directory: {path}") - raise NotARegularFileError(f"Not a regular file: {path}") + msg = f"Path is a directory: {path}" + raise IsDirectoryError(msg) + msg = f"Not a regular file: {path}" + raise NotARegularFileError(msg) return stat_result diff --git a/src/servestatic/runserver_nostatic/management/commands/runserver.py b/src/servestatic/runserver_nostatic/management/commands/runserver.py index c791f40d..98e0878b 100644 --- a/src/servestatic/runserver_nostatic/management/commands/runserver.py +++ b/src/servestatic/runserver_nostatic/management/commands/runserver.py @@ -9,6 +9,7 @@ from __future__ import annotations +import contextlib from importlib import import_module from django.apps import apps @@ -19,11 +20,10 @@ def get_next_runserver_command(): Return the next highest priority "runserver" command class """ for app_name in get_lower_priority_apps(): - module_path = "%s.management.commands.runserver" % app_name - try: + module_path = f"{app_name}.management.commands.runserver" + with contextlib.suppress(ImportError, AttributeError): return import_module(module_path).Command - except (ImportError, AttributeError): - pass + return None def get_lower_priority_apps(): @@ -48,7 +48,4 @@ def add_arguments(self, parser): super().add_arguments(parser) if parser.get_default("use_static_handler") is True: parser.set_defaults(use_static_handler=False) - parser.description += ( - "\n(Wrapped by 'servestatic.runserver_nostatic' to always" - " enable '--nostatic')" - ) + parser.description += "\n(Wrapped by 'servestatic.runserver_nostatic' to always" " enable '--nostatic')" diff --git a/src/servestatic/storage.py b/src/servestatic/storage.py index adef7ef5..ba81cf09 100644 --- a/src/servestatic/storage.py +++ b/src/servestatic/storage.py @@ -7,7 +7,8 @@ import os import re import textwrap -from typing import Any, Iterator, Tuple, Union +from collections.abc import Iterator +from typing import Any, Union from django.conf import settings from django.contrib.staticfiles.storage import ( @@ -19,7 +20,7 @@ from servestatic.compress import Compressor from servestatic.utils import stat_files -_PostProcessT = Iterator[Union[Tuple[str, str, bool], Tuple[str, None, RuntimeError]]] +_PostProcessT = Iterator[Union[tuple[str, str, bool], tuple[str, None, RuntimeError]]] class CompressedStaticFilesStorage(StaticFilesStorage): @@ -29,9 +30,7 @@ class CompressedStaticFilesStorage(StaticFilesStorage): compressor: Compressor | None - def post_process( - self, paths: dict[str, Any], dry_run: bool = False, **options: Any - ) -> _PostProcessT: + def post_process(self, paths: dict[str, Any], dry_run: bool = False, **options: Any) -> _PostProcessT: if dry_run: return @@ -40,9 +39,7 @@ def post_process( to_compress = (path for path in paths if self.compressor.should_compress(path)) with concurrent.futures.ThreadPoolExecutor() as executor: - futures = ( - executor.submit(self._compress_one, path) for path in to_compress - ) + futures = (executor.submit(self._compress_one, path) for path in to_compress) for compressed_paths in concurrent.futures.as_completed(futures): yield from compressed_paths.result() @@ -51,12 +48,11 @@ def _compress_one(self, path: str) -> list[tuple[str, str, bool]]: full_path = self.path(path) prefix_len = len(full_path) - len(path) compressed.extend( - (path, compressed_path[prefix_len:], True) - for compressed_path in self.compressor.compress(full_path) + (path, compressed_path[prefix_len:], True) for compressed_path in self.compressor.compress(full_path) ) return compressed - def create_compressor(self, **kwargs: Any) -> Compressor: + def create_compressor(self, **kwargs: Any) -> Compressor: # noqa: PLR6301 return Compressor(**kwargs) @@ -89,7 +85,7 @@ def post_process(self, *args, **kwargs): # Make exception messages helpful for name, hashed_name, processed in files: if isinstance(processed, Exception): - processed = self.make_helpful_exception(processed, name) + processed = self.make_helpful_exception(processed, name) # noqa: PLW2901 yield name, hashed_name, processed self.add_stats_to_manifest() @@ -118,9 +114,7 @@ def stat_static_root(self): file_paths = [] for root, _, files in os.walk(static_root): - file_paths.extend( - os.path.join(root, f) for f in files if f != self.manifest_name - ) + file_paths.extend(os.path.join(root, f) for f in files if f != self.manifest_name) stats = stat_files(file_paths) # Remove the static root folder from the path @@ -134,7 +128,8 @@ def load_manifest_stats(self): with contextlib.suppress(json.JSONDecodeError): stored = json.loads(content) return stored.get("stats", {}) - raise ValueError(f"Couldn't load stats from manifest '{self.manifest_name}'") + msg = f"Couldn't load stats from manifest '{self.manifest_name}'" + raise ValueError(msg) def post_process_with_compression(self, files): # Files may get hashed multiple times, we want to keep track of all the @@ -186,7 +181,7 @@ def delete_files(self, files_to_delete): if e.errno != errno.ENOENT: raise - def create_compressor(self, **kwargs): + def create_compressor(self, **kwargs): # noqa: PLR6301 return Compressor(**kwargs) def compress_files(self, names): @@ -195,9 +190,7 @@ def compress_files(self, names): to_compress = (name for name in names if self.compressor.should_compress(name)) with concurrent.futures.ThreadPoolExecutor() as executor: - futures = ( - executor.submit(self._compress_one, name) for name in to_compress - ) + futures = (executor.submit(self._compress_one, name) for name in to_compress) for compressed_paths in concurrent.futures.as_completed(futures): yield from compressed_paths.result() @@ -205,10 +198,7 @@ def _compress_one(self, name: str) -> list[tuple[str, str]]: compressed: list[tuple[str, str]] = [] path = self.path(name) prefix_len = len(path) - len(name) - compressed.extend( - (name, compressed_path[prefix_len:]) - for compressed_path in self.compressor.compress(path) - ) + compressed.extend((name, compressed_path[prefix_len:]) for compressed_path in self.compressor.compress(path)) return compressed def make_helpful_exception(self, exception, name): diff --git a/src/servestatic/utils.py b/src/servestatic/utils.py index 857b9c85..1131cf2e 100644 --- a/src/servestatic/utils.py +++ b/src/servestatic/utils.py @@ -7,8 +7,11 @@ import os import threading from concurrent.futures import ThreadPoolExecutor -from io import IOBase -from typing import AsyncIterable, Callable +from typing import TYPE_CHECKING, Callable + +if TYPE_CHECKING: + from collections.abc import AsyncIterable + from io import IOBase # This is the same size as wsgiref.FileWrapper ASGI_BLOCK_SIZE = 8192 @@ -65,30 +68,28 @@ def __init__(self, iterator: AsyncIterable): def __iter__(self): # Create a dedicated event loop to run the async iterator on. loop = asyncio.new_event_loop() - thread_executor = concurrent.futures.ThreadPoolExecutor( - max_workers=1, thread_name_prefix="ServeStatic" - ) + thread_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="ServeStatic") # Convert from async to sync by stepping through the async iterator and yielding # the result of each step. generator = self.iterator.__aiter__() with contextlib.suppress(GeneratorExit, StopAsyncIteration): while True: - yield thread_executor.submit( - loop.run_until_complete, generator.__anext__() - ).result() + yield thread_executor.submit(loop.run_until_complete, generator.__anext__()).result() loop.close() thread_executor.shutdown(wait=True) def open_lazy(f): """Decorator that ensures the file is open before calling a function. - This can be turned into a @staticmethod on `AsyncFile` once we drop Python 3.9 compatibility.""" + This can be turned into a @staticmethod on `AsyncFile` once we drop Python 3.9 compatibility. + """ @functools.wraps(f) - async def wrapper(self: "AsyncFile", *args, **kwargs): + async def wrapper(self: AsyncFile, *args, **kwargs): if self.closed: - raise ValueError("I/O operation on closed file.") + msg = "I/O operation on closed file." + raise ValueError(msg) if self.file_obj is None: self.file_obj = await self._execute(open, *self.open_args) return await f(self, *args, **kwargs) @@ -124,9 +125,7 @@ def __init__( opener, ) self.loop: asyncio.AbstractEventLoop | None = None - self.executor = ThreadPoolExecutor( - max_workers=1, thread_name_prefix="ServeStatic-AsyncFile" - ) + self.executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="ServeStatic-AsyncFile") self.lock = threading.Lock() self.file_obj: None | IOBase = None self.closed = False diff --git a/src/servestatic/wsgi.py b/src/servestatic/wsgi.py index 7211afa1..eb44e602 100644 --- a/src/servestatic/wsgi.py +++ b/src/servestatic/wsgi.py @@ -9,14 +9,11 @@ class ServeStatic(ServeStaticBase): def __call__(self, environ, start_response): path = decode_path_info(environ.get("PATH_INFO", "")) - if self.autorefresh: - static_file = self.find_file(path) - else: - static_file = self.files.get(path) + static_file = self.find_file(path) if self.autorefresh else self.files.get(path) if static_file is None: return self.application(environ, start_response) - else: - return self.serve(static_file, environ, start_response) + + return self.serve(static_file, environ, start_response) @staticmethod def serve(static_file, environ, start_response): diff --git a/tests/conftest.py b/tests/conftest.py index 320b227d..2b39c58d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,4 +10,3 @@ def django_setup(): os.environ["DJANGO_SETTINGS_MODULE"] = "tests.django_settings" django.setup() - yield diff --git a/tests/middleware.py b/tests/middleware.py index e2c16cdf..cce6cf4d 100644 --- a/tests/middleware.py +++ b/tests/middleware.py @@ -4,8 +4,7 @@ @sync_only_middleware def sync_middleware_1(get_response): def middleware(request): - response = get_response(request) - return response + return get_response(request) return middleware @@ -13,8 +12,7 @@ def middleware(request): @async_only_middleware def async_middleware_1(get_response): async def middleware(request): - response = await get_response(request) - return response + return await get_response(request) return middleware @@ -22,8 +20,7 @@ async def middleware(request): @sync_only_middleware def sync_middleware_2(get_response): def middleware(request): - response = get_response(request) - return response + return get_response(request) return middleware @@ -31,7 +28,6 @@ def middleware(request): @async_only_middleware def async_middleware_2(get_response): async def middleware(request): - response = await get_response(request) - return response + return await get_response(request) return middleware diff --git a/tests/test_asgi.py b/tests/test_asgi.py index 6d109727..6489e407 100644 --- a/tests/test_asgi.py +++ b/tests/test_asgi.py @@ -5,12 +5,13 @@ import pytest +from servestatic import utils as servestatic_utils from servestatic.asgi import ServeStaticASGI from .utils import AsgiReceiveEmulator, AsgiScopeEmulator, AsgiSendEmulator, Files -@pytest.fixture() +@pytest.fixture def test_files(): return Files( js=str(Path("static") / "app.js"), @@ -23,20 +24,17 @@ def application(request, test_files): async def asgi_app(scope, receive, send): if scope["type"] != "http": - raise RuntimeError("Incorrect response type!") - - await send( - { - "type": "http.response.start", - "status": 404, - "headers": [[b"content-type", b"text/plain"]], - } - ) + msg = "Incorrect response type!" + raise RuntimeError(msg) + + await send({ + "type": "http.response.start", + "status": 404, + "headers": [[b"content-type", b"text/plain"]], + }) await send({"type": "http.response.body", "body": b"Not Found"}) - return ServeStaticASGI( - asgi_app, root=test_files.directory, autorefresh=request.param - ) + return ServeStaticASGI(asgi_app, root=test_files.directory, autorefresh=request.param) def test_get_js_static_file(application, test_files): @@ -82,19 +80,16 @@ def test_small_block_size(application, test_files): scope = AsgiScopeEmulator({"path": "/static/app.js"}) receive = AsgiReceiveEmulator() send = AsgiSendEmulator() - from servestatic import utils - default_block_size = utils.ASGI_BLOCK_SIZE - utils.ASGI_BLOCK_SIZE = 10 + default_block_size = servestatic_utils.ASGI_BLOCK_SIZE + servestatic_utils.ASGI_BLOCK_SIZE = 10 asyncio.run(application(scope, receive, send)) assert send[1]["body"] == test_files.js_content[:10] - utils.ASGI_BLOCK_SIZE = default_block_size + servestatic_utils.ASGI_BLOCK_SIZE = default_block_size def test_request_range_response(application, test_files): - scope = AsgiScopeEmulator( - {"path": "/static/app.js", "headers": [(b"range", b"bytes=0-13")]} - ) + scope = AsgiScopeEmulator({"path": "/static/app.js", "headers": [(b"range", b"bytes=0-13")]}) receive = AsgiReceiveEmulator() send = AsgiSendEmulator() asyncio.run(application(scope, receive, send)) @@ -102,9 +97,7 @@ def test_request_range_response(application, test_files): def test_out_of_range_error(application, test_files): - scope = AsgiScopeEmulator( - {"path": "/static/app.js", "headers": [(b"range", b"bytes=10000-11000")]} - ) + scope = AsgiScopeEmulator({"path": "/static/app.js", "headers": [(b"range", b"bytes=10000-11000")]}) receive = AsgiReceiveEmulator() send = AsgiSendEmulator() asyncio.run(application(scope, receive, send)) diff --git a/tests/test_compress.py b/tests/test_compress.py index f14c8603..cf76c699 100644 --- a/tests/test_compress.py +++ b/tests/test_compress.py @@ -24,21 +24,19 @@ def files_dir(): tmp = tempfile.mkdtemp() timestamp = 1498579535 for path, contents in TEST_FILES.items(): - path = os.path.join(tmp, path.lstrip("/")) + current_path = os.path.join(tmp, path.lstrip("/")) with contextlib.suppress(FileExistsError): - os.makedirs(os.path.dirname(path)) - with open(path, "wb") as f: + os.makedirs(os.path.dirname(current_path)) + with open(current_path, "wb") as f: f.write(contents) - os.utime(path, (timestamp, timestamp)) + os.utime(current_path, (timestamp, timestamp)) compress_main([tmp, "--quiet"]) yield tmp shutil.rmtree(tmp) def test_compresses_file(files_dir): - with contextlib.closing( - gzip.open(os.path.join(files_dir, f"{COMPRESSABLE_FILE}.gz"), "rb") - ) as f: + with contextlib.closing(gzip.open(os.path.join(files_dir, f"{COMPRESSABLE_FILE}.gz"), "rb")) as f: contents = f.read() assert TEST_FILES[COMPRESSABLE_FILE] == contents @@ -74,11 +72,9 @@ def test_custom_log(): def test_compress(): compressor = Compressor(use_brotli=False, use_gzip=False) - assert [] == list(compressor.compress("tests/test_files/static/styles.css")) + assert list(compressor.compress("tests/test_files/static/styles.css")) == [] def test_compressed_effectively_no_orig_size(): compressor = Compressor(quiet=True) - assert not compressor.is_compressed_effectively( - "test_encoding", "test_path", 0, "test_data" - ) + assert not compressor.is_compressed_effectively("test_encoding", "test_path", 0, "test_data") diff --git a/tests/test_django_servestatic.py b/tests/test_django_servestatic.py index 3a19cd65..e576035c 100644 --- a/tests/test_django_servestatic.py +++ b/tests/test_django_servestatic.py @@ -41,21 +41,21 @@ def get_url_path(base, url): return urlparse(urljoin(base, url)).path -@pytest.fixture() +@pytest.fixture def static_files(): files = Files("static", js="app.js", nonascii="nonascii\u2713.txt") with override_settings(STATICFILES_DIRS=[files.directory]): yield files -@pytest.fixture() +@pytest.fixture def root_files(): files = Files("root", robots="robots.txt") with override_settings(SERVESTATIC_ROOT=files.directory): yield files -@pytest.fixture() +@pytest.fixture def tmp(): tmp_dir = tempfile.mkdtemp() with override_settings(STATIC_ROOT=tmp_dir): @@ -63,54 +63,53 @@ def tmp(): shutil.rmtree(tmp_dir) -@pytest.fixture() +@pytest.fixture def _collect_static(static_files, root_files, tmp): reset_lazy_object(storage.staticfiles_storage) call_command("collectstatic", verbosity=0, interactive=False) -@pytest.fixture() +@pytest.fixture def application(_collect_static): return get_wsgi_application() -@pytest.fixture() +@pytest.fixture def asgi_application(_collect_static): return get_asgi_application() -@pytest.fixture() +@pytest.fixture def server(application): app_server = AppServer(application) with closing(app_server): yield app_server -def test_get_root_file(server, root_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_get_root_file(server, root_files): response = server.get(root_files.robots_url) assert response.content == root_files.robots_content @override_settings(SERVESTATIC_USE_MANIFEST=False) -def test_get_root_file_no_manifest(server, root_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_get_root_file_no_manifest(server, root_files): response = server.get(root_files.robots_url) assert response.content == root_files.robots_content -def test_versioned_file_cached_forever(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_versioned_file_cached_forever(server, static_files): url = storage.staticfiles_storage.url(static_files.js_path) response = server.get(url) assert response.content == static_files.js_content - assert ( - response.headers.get("Cache-Control") - == f"max-age={ServeStaticMiddleware.FOREVER}, public, immutable" - ) + assert response.headers.get("Cache-Control") == f"max-age={ServeStaticMiddleware.FOREVER}, public, immutable" @pytest.mark.skipif(django.VERSION >= (5, 0), reason="Django <5.0 only") -def test_asgi_versioned_file_cached_forever_brotli( - asgi_application, static_files, _collect_static -): +@pytest.mark.usefixtures("_collect_static") +def test_asgi_versioned_file_cached_forever_brotli(asgi_application, static_files): url = storage.staticfiles_storage.url(static_files.js_path) scope = AsgiScopeEmulator({"path": url, "headers": [(b"accept-encoding", b"br")]}) receive = AsgiReceiveEmulator() @@ -126,9 +125,8 @@ def test_asgi_versioned_file_cached_forever_brotli( @pytest.mark.skipif(django.VERSION < (5, 0), reason="Django 5.0+ only") -def test_asgi_versioned_file_cached_forever_brotli_2( - asgi_application, static_files, _collect_static -): +@pytest.mark.usefixtures("_collect_static") +def test_asgi_versioned_file_cached_forever_brotli_2(asgi_application, static_files): url = storage.staticfiles_storage.url(static_files.js_path) scope = AsgiScopeEmulator({"path": url, "headers": [(b"accept-encoding", b"br")]}) @@ -151,14 +149,16 @@ async def executor(): assert headers.get(b"Vary") == b"Accept-Encoding" -def test_unversioned_file_not_cached_forever(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_unversioned_file_not_cached_forever(server, static_files): url = settings.STATIC_URL + static_files.js_path response = server.get(url) assert response.content == static_files.js_content assert response.headers.get("Cache-Control") == "max-age=60, public" -def test_get_gzip(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_get_gzip(server, static_files): url = storage.staticfiles_storage.url(static_files.js_path) response = server.get(url, headers={"Accept-Encoding": "gzip"}) assert response.content == static_files.js_content @@ -166,7 +166,8 @@ def test_get_gzip(server, static_files, _collect_static): assert response.headers["Vary"] == "Accept-Encoding" -def test_get_brotli(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_get_brotli(server, static_files): url = storage.staticfiles_storage.url(static_files.js_path) response = server.get(url, headers={"Accept-Encoding": "gzip, br"}) assert response.content == static_files.js_content @@ -174,14 +175,16 @@ def test_get_brotli(server, static_files, _collect_static): assert response.headers["Vary"] == "Accept-Encoding" -def test_no_content_type_when_not_modified(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_no_content_type_when_not_modified(server, static_files): last_mod = "Fri, 11 Apr 2100 11:47:06 GMT" url = settings.STATIC_URL + static_files.js_path response = server.get(url, headers={"If-Modified-Since": last_mod}) assert "Content-Type" not in response.headers -def test_get_nonascii_file(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_get_nonascii_file(server, static_files): url = settings.STATIC_URL + static_files.nonascii_path response = server.get(url) assert response.content == static_files.nonascii_content @@ -201,18 +204,19 @@ def finder_static_files(request): yield files -def test_no_content_disposition_header(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_no_content_disposition_header(server, static_files): url = settings.STATIC_URL + static_files.js_path response = server.get(url) assert response.headers.get("content-disposition") is None -@pytest.fixture() +@pytest.fixture def finder_application(finder_static_files, application): return application -@pytest.fixture() +@pytest.fixture def finder_server(finder_application): app_server = AppServer(finder_application) with closing(app_server): @@ -234,13 +238,13 @@ def test_file_served_from_static_dir_no_manifest(finder_static_files, finder_ser def test_non_ascii_requests_safely_ignored(finder_server): response = finder_server.get(settings.STATIC_URL + "test\u263a") - assert 404 == response.status_code + assert response.status_code == 404 def test_requests_for_directory_safely_ignored(finder_server): url = f"{settings.STATIC_URL}directory" response = finder_server.get(url) - assert 404 == response.status_code + assert response.status_code == 404 def test_index_file_served_at_directory_path(finder_static_files, finder_server): @@ -250,9 +254,7 @@ def test_index_file_served_at_directory_path(finder_static_files, finder_server) @override_settings(SERVESTATIC_USE_MANIFEST=False) -def test_index_file_served_at_directory_path_no_manifest( - finder_static_files, finder_server -): +def test_index_file_served_at_directory_path_no_manifest(finder_static_files, finder_server): path = finder_static_files.index_path.rpartition("/")[0] + "/" response = finder_server.get(settings.STATIC_URL + path) assert response.content == finder_static_files.index_content @@ -267,9 +269,7 @@ def test_index_file_path_redirected(finder_static_files, finder_server): assert location == settings.STATIC_URL + directory_path -def test_directory_path_without_trailing_slash_redirected( - finder_static_files, finder_server -): +def test_directory_path_without_trailing_slash_redirected(finder_static_files, finder_server): directory_path = finder_static_files.index_path.rpartition("/")[0] + "/" directory_url = settings.STATIC_URL + directory_path.rstrip("/") response = finder_server.get(directory_url, allow_redirects=False) @@ -287,7 +287,8 @@ def test_servestatic_file_response_has_only_one_header(): @override_settings(STATIC_URL="static/") -def test_relative_static_url(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_relative_static_url(server, static_files): url = storage.staticfiles_storage.url(static_files.js_path) response = server.get(url) assert response.content == static_files.js_content @@ -299,10 +300,7 @@ def test_404_in_prod(server): response_content = html.unescape(response_content) assert response.status_code == 404 - assert ( - "ServeStatic did not find the file 'garbage' within the following paths:" - not in response_content - ) + assert "ServeStatic did not find the file 'garbage' within the following paths:" not in response_content @override_settings(DEBUG=True) @@ -314,16 +312,14 @@ def test_error_message(server): # Beautify for easier debugging response_content = response_content[response_content.index("ServeStatic") :] - assert ( - "ServeStatic did not find the file 'garbage' within the following paths:" - in response_content - ) + assert "ServeStatic did not find the file 'garbage' within the following paths:" in response_content assert "•" in response_content assert str(Path(__file__).parent / "test_files" / "static") in response_content @override_settings(FORCE_SCRIPT_NAME="/subdir", STATIC_URL="static/") -def test_force_script_name(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_force_script_name(server, static_files): url = storage.staticfiles_storage.url(static_files.js_path) assert url.startswith("/subdir/static/") response = server.get(url) @@ -332,9 +328,8 @@ def test_force_script_name(server, static_files, _collect_static): @override_settings(FORCE_SCRIPT_NAME="/subdir", STATIC_URL="/subdir/static/") -def test_force_script_name_with_matching_static_url( - server, static_files, _collect_static -): +@pytest.mark.usefixtures("_collect_static") +def test_force_script_name_with_matching_static_url(server, static_files): url = storage.staticfiles_storage.url(static_files.js_path) assert url.startswith("/subdir/static/") response = server.get(url) @@ -342,7 +337,8 @@ def test_force_script_name_with_matching_static_url( assert response.content == static_files.js_content -def test_range_response(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_range_response(server, static_files): ... # FIXME: This test is not working, seemingly due to bugs with AppServer. @@ -358,23 +354,22 @@ def test_range_response(server, static_files, _collect_static): @pytest.mark.skipif(django.VERSION >= (5, 0), reason="Django <5.0 only") -def test_asgi_range_response(asgi_application, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_asgi_range_response(asgi_application, static_files): url = storage.staticfiles_storage.url(static_files.js_path) scope = AsgiScopeEmulator({"path": url, "headers": [(b"range", b"bytes=0-13")]}) receive = AsgiReceiveEmulator() send = AsgiSendEmulator() asyncio.run(AsgiAppServer(asgi_application)(scope, receive, send)) assert send.body == static_files.js_content[:14] - assert ( - send.headers[b"Content-Range"] - == b"bytes 0-13/" + str(len(static_files.js_content)).encode() - ) + assert send.headers[b"Content-Range"] == b"bytes 0-13/" + str(len(static_files.js_content)).encode() assert send.headers[b"Content-Length"] == b"14" assert send.status == 206 @pytest.mark.skipif(django.VERSION < (5, 0), reason="Django 5.0+ only") -def test_asgi_range_response_2(asgi_application, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_asgi_range_response_2(asgi_application, static_files): url = storage.staticfiles_storage.url(static_files.js_path) scope = AsgiScopeEmulator({"path": url, "headers": [(b"range", b"bytes=0-13")]}) @@ -389,22 +384,21 @@ async def executor(): headers = dict(response["headers"]) assert response["body"] == static_files.js_content[:14] - assert ( - headers[b"Content-Range"] - == b"bytes 0-13/" + str(len(static_files.js_content)).encode() - ) + assert headers[b"Content-Range"] == b"bytes 0-13/" + str(len(static_files.js_content)).encode() assert headers[b"Content-Length"] == b"14" assert response["status"] == 206 -def test_out_of_range_error(server, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_out_of_range_error(server, static_files): url = storage.staticfiles_storage.url(static_files.js_path) response = server.get(url, headers={"Range": "bytes=900-999"}) assert response.status_code == 416 @pytest.mark.skipif(django.VERSION >= (5, 0), reason="Django <5.0 only") -def test_asgi_out_of_range_error(asgi_application, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_asgi_out_of_range_error(asgi_application, static_files): url = storage.staticfiles_storage.url(static_files.js_path) scope = AsgiScopeEmulator({"path": url, "headers": [(b"range", b"bytes=900-999")]}) receive = AsgiReceiveEmulator() @@ -414,7 +408,8 @@ def test_asgi_out_of_range_error(asgi_application, static_files, _collect_static @pytest.mark.skipif(django.VERSION < (5, 0), reason="Django 5.0+ only") -def test_asgi_out_of_range_error_2(asgi_application, static_files, _collect_static): +@pytest.mark.usefixtures("_collect_static") +def test_asgi_out_of_range_error_2(asgi_application, static_files): url = storage.staticfiles_storage.url(static_files.js_path) scope = AsgiScopeEmulator({"path": url, "headers": [(b"range", b"bytes=900-999")]}) @@ -427,6 +422,4 @@ async def executor(): response = asyncio.run(executor()) assert response["status"] == 416 - assert dict(response["headers"])[b"Content-Range"] == b"bytes */%d" % len( - static_files.js_content - ) + assert dict(response["headers"])[b"Content-Range"] == b"bytes */%d" % len(static_files.js_content) diff --git a/tests/test_servestatic.py b/tests/test_servestatic.py index f99a04d2..dd9c3119 100644 --- a/tests/test_servestatic.py +++ b/tests/test_servestatic.py @@ -8,6 +8,7 @@ import tempfile import warnings from contextlib import closing +from pathlib import Path from urllib.parse import urljoin from wsgiref.headers import Headers from wsgiref.simple_server import demo_app @@ -99,7 +100,7 @@ def test_get_accept_star(server, files): def test_get_accept_missing(server, files): response = server.get( files.gzip_url, - # Using None is required to override requests’ default Accept-Encoding + # Using None is required to override requests default Accept-Encoding headers={"Accept-Encoding": None}, ) assert response.content == files.gzip_content @@ -296,7 +297,7 @@ def test_warn_about_missing_directories(application): def test_handles_missing_path_info_key(application): - response = application(environ={}, start_response=lambda *args: None) + response = application(environ={}, start_response=lambda *_args: None) assert response @@ -328,30 +329,24 @@ def test_immutable_file_test_accepts_regex(): @pytest.mark.skipif(sys.version_info < (3, 4), reason="Pathlib was added in Python 3.4") def test_directory_path_can_be_pathlib_instance(): - from pathlib import Path - root = Path(Files("root").directory) # Check we can construct instance without it blowing up ServeStatic(None, root=root, autorefresh=True) -def fake_stat_entry( - st_mode: int = stat.S_IFREG, st_size: int = 1024, st_mtime: int = 0 -) -> os.stat_result: - return os.stat_result( - ( - st_mode, - 0, # st_ino - 0, # st_dev - 0, # st_nlink - 0, # st_uid - 0, # st_gid - st_size, - 0, # st_atime - st_mtime, - 0, # st_ctime - ) - ) +def fake_stat_entry(st_mode: int = stat.S_IFREG, st_size: int = 1024, st_mtime: int = 0) -> os.stat_result: + return os.stat_result(( + st_mode, + 0, # st_ino + 0, # st_dev + 0, # st_nlink + 0, # st_uid + 0, # st_gid + st_size, + 0, # st_atime + st_mtime, + 0, # st_ctime + )) def test_last_modified_not_set_when_mtime_is_zero(): diff --git a/tests/test_storage.py b/tests/test_storage.py index 6c617bd4..5144c0ae 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -19,7 +19,7 @@ from .utils import Files -@pytest.fixture() +@pytest.fixture def setup(): staticfiles_storage._wrapped = empty files = Files("static") @@ -33,7 +33,7 @@ def setup(): shutil.rmtree(tmp) -@pytest.fixture() +@pytest.fixture def _compressed_storage(setup): backend = "servestatic.storage.CompressedStaticFilesStorage" if django.VERSION >= (4, 2): @@ -50,7 +50,7 @@ def _compressed_storage(setup): yield -@pytest.fixture() +@pytest.fixture def _compressed_manifest_storage(setup): backend = "servestatic.storage.CompressedManifestStaticFilesStorage" if django.VERSION >= (4, 2): @@ -67,7 +67,8 @@ def _compressed_manifest_storage(setup): call_command("collectstatic", verbosity=0, interactive=False) -def test_compressed_static_files_storage(_compressed_storage): +@pytest.mark.usefixtures("_compressed_storage") +def test_compressed_static_files_storage(): call_command("collectstatic", verbosity=0, interactive=False) for name in ["styles.css.gz", "styles.css.br"]: @@ -75,7 +76,8 @@ def test_compressed_static_files_storage(_compressed_storage): assert os.path.exists(path) -def test_compressed_static_files_storage_dry_run(_compressed_storage): +@pytest.mark.usefixtures("_compressed_storage") +def test_compressed_static_files_storage_dry_run(): call_command("collectstatic", "--dry-run", verbosity=0, interactive=False) for name in ["styles.css.gz", "styles.css.br"]: @@ -83,7 +85,8 @@ def test_compressed_static_files_storage_dry_run(_compressed_storage): assert not os.path.exists(path) -def test_make_helpful_exception(_compressed_manifest_storage): +@pytest.mark.usefixtures("_compressed_manifest_storage") +def test_make_helpful_exception(): class TriggerException(HashedFilesMixin): def exists(self, path): return False @@ -93,24 +96,22 @@ def exists(self, path): TriggerException().hashed_name("/missing/file.png") except ValueError as e: exception = e - helpful_exception = CompressedManifestStaticFilesStorage().make_helpful_exception( - exception, "styles/app.css" - ) + helpful_exception = CompressedManifestStaticFilesStorage().make_helpful_exception(exception, "styles/app.css") assert isinstance(helpful_exception, MissingFileError) -def test_unversioned_files_are_deleted(_compressed_manifest_storage): +@pytest.mark.usefixtures("_compressed_manifest_storage") +def test_unversioned_files_are_deleted(): name = "styles.css" versioned_url = staticfiles_storage.url(name) versioned_name = basename(versioned_url) name_pattern = re.compile("^" + name.replace(".", r"\.([0-9a-f]+\.)?") + "$") - remaining_files = [ - f for f in os.listdir(settings.STATIC_ROOT) if name_pattern.match(f) - ] + remaining_files = [f for f in os.listdir(settings.STATIC_ROOT) if name_pattern.match(f)] assert [versioned_name] == remaining_files -def test_manifest_file_is_left_in_place(_compressed_manifest_storage): +@pytest.mark.usefixtures("_compressed_manifest_storage") +def test_manifest_file_is_left_in_place(): manifest_file = os.path.join(settings.STATIC_ROOT, "staticfiles.json") assert os.path.exists(manifest_file) diff --git a/tests/utils.py b/tests/utils.py index 2255177d..3b8f8730 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,9 +20,7 @@ class AppServer: def __init__(self, application): self.application = application - self.server = make_server( - "127.0.0.1", 0, self.serve_under_prefix, handler_class=WSGIRequestHandler - ) + self.server = make_server("127.0.0.1", 0, self.serve_under_prefix, handler_class=WSGIRequestHandler) def serve_under_prefix(self, environ, start_response): prefix = shift_path_info(environ) @@ -54,7 +52,8 @@ def __init__(self, application): async def __call__(self, scope, receive, send): if scope["type"] != "http": - raise RuntimeError("Incorrect response type!") + msg = "Incorrect response type!" + raise RuntimeError(msg) # Remove the prefix from the path scope["path"] = scope["path"].replace(f"/{AppServer.PREFIX}", "", 1) @@ -132,7 +131,7 @@ class AsgiReceiveEmulator: be emulate HTTP events.""" def __init__(self, *events): - self.events = [{"type": "http.connect"}] + list(events) + self.events = [{"type": "http.connect"}, *list(events)] async def __call__(self): return self.events.pop(0) if self.events else {"type": "http.disconnect"} @@ -153,9 +152,7 @@ def __getitem__(self, index): @property def body(self): """Combine all HTTP body messages into a single bytestring.""" - return b"".join( - [message["body"] for message in self.message if message.get("body")] - ) + return b"".join([message["body"] for message in self.message if message.get("body")]) @property def headers(self):