From 7e7512ff5ad93ec2a9c8f0ce6d3bdf45ed051127 Mon Sep 17 00:00:00 2001 From: Ondrej Tuma Date: Tue, 23 Jan 2024 12:45:53 +0100 Subject: [PATCH] Linter fixes and pre-commit update --- .github/workflows/rebase.yml | 4 +- .pre-commit-config.yaml | 13 +- doc/documentation.rst | 2 +- examples/http_digest.py | 161 +++++++---------- examples/large_file.py | 16 +- examples/metrics.py | 1 + examples/openapi3.py | 4 +- examples/put_file.py | 10 +- examples/simple.py | 284 +++++++++++++++++------------- examples/simple_json.py | 8 +- examples/websocket.py | 12 +- poorwsgi/results.py | 20 +-- setup.py | 253 ++++++++++++-------------- tests/test_digest.py | 25 ++- tests_integrity/openapi.py | 13 +- tests_integrity/test_digest.py | 13 +- tests_integrity/test_json.py | 13 +- tests_integrity/test_profile.py | 3 +- tests_integrity/test_simple.py | 39 ++-- tests_integrity/test_websocket.py | 6 +- 20 files changed, 465 insertions(+), 435 deletions(-) diff --git a/.github/workflows/rebase.yml b/.github/workflows/rebase.yml index 95a670e..b03480d 100644 --- a/.github/workflows/rebase.yml +++ b/.github/workflows/rebase.yml @@ -7,9 +7,9 @@ jobs: name: Rebase runs-on: ubuntu-latest if: >- - github.event.issue.pull_request != '' && + github.event.issue.pull_request != '' && ( - contains(github.event.comment.body, '/rebase') || + contains(github.event.comment.body, '/rebase') || contains(github.event.comment.body, '/autosquash') ) steps: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b77cdd7..c7b8d56 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,17 +1,17 @@ --- repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.6.0 hooks: - id: check-yaml - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit - rev: 'v0.0.270' + rev: 'v0.5.0' hooks: - id: ruff - repo: https://github.com/pycqa/flake8 - rev: 6.0.0 + rev: 7.1.0 hooks: - id: flake8 - repo: https://github.com/Lucas-C/pre-commit-hooks-markup @@ -27,3 +27,10 @@ repos: rev: v3.0.0a5 hooks: - id: pylint + language: system + - repo: https://github.com/pre-commit/mirrors-mypy + rev: 'v1.10.1' # Use the sha / tag you want to point at + hooks: + - id: mypy + args: [--explicit-package-bases] + language: system diff --git a/doc/documentation.rst b/doc/documentation.rst index 1857e93..a3b79b1 100644 --- a/doc/documentation.rst +++ b/doc/documentation.rst @@ -795,7 +795,7 @@ CachedInput ~~~~~~~~~~~ When HTTP Forms are base64 encoded, FieldStorage use readline on request input -file. This is not so optimal. So there is CachedInput class, which is returned +file. This is not so optimal. So there is CachedInput class, which is returned Proccess variables ~~~~~~~~~~~~~~~~~~ diff --git a/examples/http_digest.py b/examples/http_digest.py index 922eaca..7bef4b3 100644 --- a/examples/http_digest.py +++ b/examples/http_digest.py @@ -1,19 +1,18 @@ """HTTP WWW-Authenticate Digest Example.""" -from wsgiref.simple_server import make_server -from sys import path as python_path -from os import path +import logging from hashlib import sha256 +from os import path +from sys import path as python_path from time import time +from wsgiref.simple_server import make_server -import logging - -python_path.insert(0, path.abspath( - path.join(path.dirname(__file__), path.pardir))) +python_path.insert( + 0, path.abspath(path.join(path.dirname(__file__), path.pardir))) # pylint: disable=wrong-import-position -from poorwsgi import Application, state # noqa +from poorwsgi import Application, state # noqa +from poorwsgi.digest import PasswordMap, check_digest, hexdigest # noqa from poorwsgi.response import EmptyResponse, redirect # noqa -from poorwsgi.digest import check_digest, PasswordMap, hexdigest # noqa FILE = path.join(path.dirname(__file__), 'test.digest') @@ -38,151 +37,119 @@ def get_header(title): """Return HTML header list of lines.""" return ( - "", - "", + "", "", '', - "%s - %s" % (__file__, title), - "", - "", - "

%s - %s

" % (__file__, title) - ) + f"{__file__} - {title}", "", "", + f"

{__file__} - {title}

") def get_footer(): """Return HTML footer list of lines.""" - return ( - "
", - "Copyright (c) 2020 Ondřej Tůma. See ", - 'poorhttp.zeropage.cz' - '.', - "", - "" - ) + return ("
", "Copyright (c) 2020 Ondřej Tůma. See ", + 'poorhttp.zeropage.cz' + '.', "", "") def get_link(href, text=None, title=None): """Return HTML anchor.""" text = text or title or href title = title or text - return '%s' % (href, title, text) + return f'{text}' @app.route('/') def root(req): """Return Root (Index) page.""" - body = ( - '' - ) + body = ('') for line in get_header("Root") + body + get_footer(): - yield line.encode()+b'\n' + yield line.encode() + b'\n' @app.route('/admin_zone') @check_digest(ADMIN) def admin_zone(req): """Page only for ADMIN realm.""" - body = ( - '

%s test for %s algorithm.

' % (ADMIN, app.auth_algorithm), - '' - ) + body = (f'

{ADMIN} test for {app.auth_algorithm} algorithm.

', + '') for line in get_header("Root") + body + get_footer(): - yield line.encode()+b'\n' + yield line.encode() + b'\n' @app.route('/user_zone') @check_digest(USER) def user_zone(req): """Page for USER realm.""" - body = ( - '

%s test for %s algorithm.

' % (USER, app.auth_algorithm), - 'User: %s' % req.user, - '' - ) + body = (f'

{USER} test for {app.auth_algorithm} algorithm.

', + f'User: {req.user}', '') for line in get_header("Root") + body + get_footer(): - yield line.encode()+b'\n' + yield line.encode() + b'\n' @app.route('/user') @check_digest(USER, 'user') def user_only(req): """Page for user only.""" - body = ( - '

User test for %s algorithm.

' % app.auth_algorithm, - 'User: %s' % req.user, - '' - ) + body = (f'

User test for {app.auth_algorithm} algorithm.

', + f'User: {req.user}', '') for line in get_header("Root") + body + get_footer(): - yield line.encode()+b'\n' + yield line.encode() + b'\n' @app.route('/foo') @check_digest(USER, 'foo') def foo_only(req): """Page for foo user only.""" - body = ( - '

Foo test for %s algorithm.

' % app.auth_algorithm, - 'User: %s' % req.user, - '', - '
', - '', - '', - '
' - ) + body = (f'

Foo test for {app.auth_algorithm} algorithm.

', + f'User: {req.user}', '', '
', + '', + '', '
') for line in get_header("Root") + body + get_footer(): - yield line.encode()+b'\n' + yield line.encode() + b'\n' @app.route('/user/utf-8') @check_digest(USER, 'Ondřej') def utf8_chars(req): """Page for user only.""" - body = ( - '

User test for %s algorithm.

' % app.auth_algorithm, - 'User: %s' % req.user, - '' - ) + body = (f'

User test for {app.auth_algorithm} algorithm.

', + f'User: {req.user}', '') for line in get_header("Root") + body + get_footer(): - yield line.encode()+b'\n' + yield line.encode() + b'\n' @app.route('/foo/passwd', method=state.METHOD_POST) @check_digest(USER, 'foo') def foo_password(req): """Change foo's password.""" - digest = hexdigest(req.user, USER, req.form.get('password'), - app.auth_hash) + digest = hexdigest(req.user, USER, req.form.get('password'), app.auth_hash) app.auth_map.set(USER, req.user, digest) redirect('/foo') @@ -196,17 +163,13 @@ def unknown_endpoint(req): def generic_response(url, user): """Return generic response""" - body = ( - '

%s test for %s algorithm.

' % (USER, app.auth_algorithm), - 'User: %s' % user, - '' - ) + body = (f'

{USER} test for {app.auth_algorithm} algorithm.

', + f'User: {user}', '') for line in get_header("Root") + body + get_footer(): - yield line.encode()+b'\n' + yield line.encode() + b'\n' @app.route('/spaces in url') diff --git a/examples/large_file.py b/examples/large_file.py index 2ebd4a8..a9aaf01 100644 --- a/examples/large_file.py +++ b/examples/large_file.py @@ -1,4 +1,5 @@ """Large file upload test.""" +# pylint: disable=duplicate-code from wsgiref.simple_server import make_server, WSGIServer from socketserver import ThreadingMixIn @@ -14,11 +15,11 @@ sys.path.insert(0, os.path.abspath( os.path.join(EXAMPLES_PATH, os.path.pardir))) -# pylint: disable=import-error, disable=wrong-import-position +# pylint: disable=wrong-import-position from poorwsgi import Application, state # noqa from poorwsgi.request import FieldStorage # noqa from poorwsgi.response import HTTPException # noqa -from poorwsgi.results import hbytes # noqa +from poorwsgi.results import hbytes # noqa logger = log.getLogger() logger.setLevel("DEBUG") @@ -31,6 +32,7 @@ class Blackhole: """Dummy File Object""" + def __init__(self, filename): log.debug("Start uploading file: %s", filename) self.uploaded = 0 @@ -56,6 +58,7 @@ def hexdigest(self): class Temporary: """Temporary file""" + def __init__(self, filename): log.debug("Start uploading file: %s", filename) self.uploaded = 0 @@ -152,11 +155,10 @@ def html_form(req, file_callback): hexdigest = form['file'].file.hexdigest() end = time() - start - args = (hbytes(bytes_read) + - (int(end),) + - hbytes(bytes_read / end) + - (hexdigest,)) - stats = ("Upload: %.2f%s in %ds -> %.2f%sps SHA256: %s" % args) + size = hbytes(bytes_read) + speed = hbytes(bytes_read / end) + stats = (f"Upload: {size[0]:.2f}{size[1]} in {end}s -> " + f"{speed[0]:.2f}{speed[1]}ps SHA256: {hexdigest}") log.info(stats) if bytes_read != req.content_length: diff --git a/examples/metrics.py b/examples/metrics.py index 53b9dc2..d3fef1e 100644 --- a/examples/metrics.py +++ b/examples/metrics.py @@ -9,6 +9,7 @@ python_path.insert(0, os.path.abspath( os.path.join(EXAMPLES_PATH, os.path.pardir))) +# pylint: disable=wrong-import-position from poorwsgi import Application, state # noqa from poorwsgi.response import JSONResponse # noqa diff --git a/examples/openapi3.py b/examples/openapi3.py index 51b9107..16fb10d 100644 --- a/examples/openapi3.py +++ b/examples/openapi3.py @@ -31,7 +31,7 @@ from poorwsgi import Application, state # noqa from poorwsgi.response import Response, abort, HTTPException, \ JSONResponse # noqa -from poorwsgi.request import Request +from poorwsgi.request import Request # noqa from poorwsgi.openapi_wrapper import OpenAPIRequest, \ OpenAPIResponse # noqa from poorwsgi.session import PoorSession # noqa @@ -111,7 +111,7 @@ def after_each_response(req, res): return res except OpenAPIError as error: log.error("API output error: %s", str(error)) - assert False, f"OpenAPI Error {str(error)}" + raise return res diff --git a/examples/put_file.py b/examples/put_file.py index 27b2398..0e72ab1 100644 --- a/examples/put_file.py +++ b/examples/put_file.py @@ -15,16 +15,18 @@ # pylint: disable=import-error, disable=wrong-import-position from poorwsgi import Application, state # noqa -from poorwsgi.response import JSONResponse +from poorwsgi.response import JSONResponse # noqa logger = log.getLogger() logger.setLevel("DEBUG") app = application = Application("large_file") app.debug = True +# pylint: disable=duplicate-code + @app.route('/blackhole/', method=state.METHOD_PUT) -def blackhole_put(req, filename:str): +def blackhole_put(req, filename: str): """Upload file via PUT method like in webdav""" checksum = sha256() uploaded = 0 @@ -46,7 +48,7 @@ def blackhole_put(req, filename:str): @app.route('/temporary/', method=state.METHOD_PUT) -def temporary_put(req, filename:str): +def temporary_put(req, filename: str): """Upload file via PUT method like in webdav""" checksum = sha256() uploaded = 0 @@ -143,5 +145,5 @@ class ThreadingWSGIServer(ThreadingMixIn, WSGIServer): if __name__ == '__main__': ADDRESS = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1' httpd = make_server(ADDRESS, 8080, app, ThreadingWSGIServer) - print("Starting to serve on http://%s:8080" % ADDRESS) + print(f"Starting to serve on http://{ADDRESS}:8080") httpd.serve_forever() diff --git a/examples/simple.py b/examples/simple.py index df573a2..84109c7 100644 --- a/examples/simple.py +++ b/examples/simple.py @@ -4,32 +4,33 @@ licence as PoorWSGI. So enjoy it ;) """ -from wsgiref.simple_server import make_server +import logging as log +import os from base64 import decodebytes, encodebytes, urlsafe_b64encode from collections import OrderedDict -from io import FileIO as file, BytesIO -from os.path import getctime -from sys import path as python_path from functools import wraps from hashlib import md5 +from io import BytesIO +from io import FileIO as file +from os.path import getctime from random import choices - -import os -import logging as log +from sys import path as python_path +from wsgiref.simple_server import make_server EXAMPLES_PATH = os.path.dirname(__file__) -python_path.insert(0, os.path.abspath( - os.path.join(EXAMPLES_PATH, os.path.pardir))) +python_path.insert( + 0, os.path.abspath(os.path.join(EXAMPLES_PATH, os.path.pardir))) # pylint: disable=import-error, wrong-import-position -from poorwsgi import Application, state, request, redirect # noqa -from poorwsgi.headers import http_to_time, time_to_http, parse_range # noqa +from poorwsgi import Application, redirect, request, state # noqa +from poorwsgi.headers import http_to_time, parse_range, time_to_http # noqa +from poorwsgi.response import FileResponse # noqa +from poorwsgi.response import HTTPException # noqa +from poorwsgi.response import (FileObjResponse, GeneratorResponse, # noqa + NoContentResponse, NotModifiedResponse, + PartialResponse, RedirectResponse, Response) +from poorwsgi.results import html_escape, not_modified # noqa from poorwsgi.session import PoorSession, SessionError # noqa -from poorwsgi.response import Response, RedirectResponse, \ - FileObjResponse, FileResponse, GeneratorResponse, \ - NoContentResponse, NotModifiedResponse, PartialResponse, \ - HTTPException # noqa -from poorwsgi.results import not_modified # noqa try: import uwsgi # type: ignore @@ -37,14 +38,13 @@ except ModuleNotFoundError: uwsgi = None # pylint: disable=invalid-name - logger = log.getLogger() logger.setLevel("DEBUG") app = application = Application("simple") app.debug = True app.document_root = '.' app.document_index = True -app.secret_key = os.urandom(32) # random key each run +app.secret_key = os.urandom(32) # random key each run class MyValueError(ValueError): @@ -52,17 +52,24 @@ class MyValueError(ValueError): class Storage(file): + """File storage class created by StorageFactory.""" + def __init__(self, directory, filename): log.debug("directory: %s; filename: %s", directory, filename) self.path = directory + '/' + filename if os.access(self.path, os.F_OK): - raise Exception("File %s exist yet" % filename) + msg = f"File {filename} exist yet" + raise OSError(msg) super().__init__(self.path, 'w+b') class StorageFactory: + """Storage Factory do some code before creating file.""" + + # pylint: disable=too-few-public-methods + def __init__(self, directory): self.directory = directory if not os.access(directory, os.R_OK): @@ -80,6 +87,7 @@ def create(self, filename): @app.before_response() def log_request(req): + """Log each request before processing.""" log.info("Before response") log.info("Data: %s", req.data) @@ -93,7 +101,8 @@ def auto_form(req): factory = StorageFactory('./upload') try: req.form = request.FieldStorage( - req, keep_blank_values=app.keep_blank_values, + req, + keep_blank_values=app.keep_blank_values, strict_parsing=app.strict_parsing, file_callback=factory.create) except Exception as err: # pylint: disable=broad-except @@ -102,54 +111,41 @@ def auto_form(req): def get_crumbnav(req): + """Create crumb navigation from url.""" navs = [req.hostname] if req.uri == '/': navs.append('/') else: navs.append('/') - navs.append('%s' % req.uri) + navs.append(f'{req.uri}') return " » ".join(navs) -def html(s): - s = str(s) - s = s.replace('&', '&') - s = s.replace('>', '>') - s = s.replace('<', '<') - return s - - def get_header(title): + """Return HTML header.""" return ( - "", - "", + "", "", '', - "Simple.py - %s" % title, - '', - "", - "", - "

Simple.py - %s

" % title - ) + f"Simple.py - {title}", + '', "", "", + f"

Simple.py - {title}

") def get_footer(): - return ( - "
", - "Copyright (c) 2013-2021 Ondřej Tůma. See ", - 'poorhttp.zeropage.cz' - '.', - "", - "" - ) + """Return HTML footer.""" + return ("
", "Copyright (c) 2013-2021 Ondřej Tůma. See ", + 'poorhttp.zeropage.cz' + '.', "", "") def get_variables(req): + """Return some environment variables and it's values.""" usable = ("REQUEST_METHOD", "QUERY_STRING", "SERVER_NAME", "SERVER_PORT", "REMOTE_ADDR", "REMOTE_HOST", "PATH_INFO") - return sorted(tuple( - (key, html(val)) for key, val in req.environ.items() - if key.startswith("wsgi.") or key.startswith("poor_") or - key in usable)) + return sorted( + tuple((key, html_escape(repr(val))) + for key, val in req.environ.items() if key.startswith("wsgi.") + or key.startswith("poor_") or key in usable)) app.set_filter('email', r'[\w\.\-]+@[\w\.\-]+') @@ -157,6 +153,7 @@ def get_variables(req): def check_login(fun): """Check session cookie.""" + @wraps(fun) def handler(req): session = PoorSession(app.secret_key) @@ -166,17 +163,22 @@ def handler(req): pass if 'login' not in session.data: log.info('Login cookie not found.') - redirect("/", message="Login required",) + redirect( + "/", + message="Login required", + ) return fun(req) + return handler @app.route('/') def root(req): + """Return root index.""" buff = get_header("Index") + ( get_crumbnav(req), "", - ) + get_footer() + ) + get_footer() response = Response() for line in buff: response.write(line + '\n') @@ -214,7 +216,8 @@ def root(req): @app.route('/favicon.ico') -def favicon(req): +def favicon(_): + """Return favicon.""" icon = b""" AAABAAEAEBAAAAEAIABoBAAAFgAAACgAAAAQAAAAIAAAAAEAIAAAAAAAAAAAAAAAAAAAAAAAAAAA AAAAAAD///8A////AP///wD///8AFRX/Bw8P/24ICP/IAgL/7wAA/+oAAP/GAAD/bQAA/wj///8A @@ -242,7 +245,8 @@ def favicon(req): @app.route('/style.css') -def style(req): +def style(_): + """Return stylesheet.""" buff = """ body { width: 90%; max-width: 900px; margin: auto; padding-top: 30px; } @@ -264,12 +268,15 @@ def style(req): @app.route('/test/') @app.route('/test/static') def test_dynamic(req, variable=None): + """Test dynamics values.""" if not variable and req.headers.get('E-Tag') == 'W/"0123"': return not_modified(req) - var_info = {'type': type(variable), - 'value': variable, - 'uri_rule': req.uri_rule} + var_info = { + 'type': html_escape(repr(type(variable))), + 'value': html_escape(repr(variable)), + 'uri_rule': html_escape(req.uri_rule), + } title = "Variable" if variable is not None else "Static" @@ -277,18 +284,18 @@ def test_dynamic(req, variable=None): (get_crumbnav(req), "

Variable

", "") + \ - tuple("" % - (key, html(val)) for key, val in var_info.items()) + \ + tuple(f"" + for key, val in var_info.items()) + \ ("
%s:%s
{key}:{val}
", "

Browser Headers

", "") + \ - tuple("" % - (key, val) for key, val in req.headers.items()) + \ + tuple(f"" + for key, val in req.headers.items()) + \ ("
%s:%s
{key}:{val}
", "

Request Variables

", "") + \ - tuple("" % - (key, val) for key, val in get_variables(req)) + \ + tuple(f"" + for key, val in get_variables(req)) + \ ("
%s:%s
{key}:{val}
",) + \ get_footer() @@ -301,26 +308,27 @@ def test_dynamic(req, variable=None): @app.route('/test/') @app.route('/test///') def test_varargs(req, *args): - var_info = {'len': len(args), - 'uri_rule': req.uri_rule} - var_info.update(req.groups) + """Handler for variable path agrs""" + var_info = {'len': len(args), 'uri_rule': html_escape(req.uri_rule)} + for key, val in req.path_args.items(): + var_info[key] = html_escape(repr(val)) buff = get_header("Variable args test") + \ (get_crumbnav(req), "

Variables

", "") + \ - tuple("" % - (key, html(val)) for key, val in var_info.items()) + \ + tuple(f"" + for key, val in var_info.items()) + \ ("
%s:%s
{key}:{val}
", "

Browser Headers

", "") + \ - tuple("" % - (key, val) for key, val in req.headers.items()) + \ + tuple(f"" + for key, val in req.headers.items()) + \ ("
%s:%s
{key}:{val}
", "

Request Variables

", "") + \ - tuple("" % - (key, val) for key, val in get_variables(req)) + \ + tuple(f"" + for key, val in get_variables(req)) + \ ("
%s:%s
{key}:{val}
",) + \ get_footer() @@ -332,6 +340,7 @@ def test_varargs(req, *args): @app.route('/login') def login(req): + """Create login session cookie.""" log.debug("Input cookies: %s", repr(req.cookies)) cookie = PoorSession(app.secret_key) cookie.data['login'] = True @@ -342,6 +351,7 @@ def login(req): @app.route('/logout') def logout(req): + """Destroy login session cookie.""" log.debug("Input cookies: %s", repr(req.cookies)) cookie = PoorSession(app.secret_key) cookie.destroy() @@ -353,24 +363,26 @@ def logout(req): @app.route('/test/form', method=state.METHOD_GET_POST) @check_login def test_form(req): + """Form example""" + # pylint: disable=consider-using-f-string # get_var_info = {'len': len(args)} var_info = OrderedDict(( - ('form_keys', req.form.keys()), - ('form_values', ', '.join(tuple(str(req.form.getvalue(key)) - for key in req.form.keys()))), - ('form_getfirst', '%s,%s' % (req.form.getfirst('pname'), - req.form.getfirst('px'))), - ('form_getlist', '%s,%s' % (list(req.form.getlist('pname')), - list(req.form.getlist('px')))), + ('form_keys', ','.join(req.form.keys())), + ('form_values', ', '.join( + tuple(str(req.form.getvalue(key)) for key in req.form.keys()))), + ('form_getfirst', + '%s,%s' % (req.form.getfirst('pname'), req.form.getfirst('px'))), + ('form_getlist', '%s,%s' % + (list(req.form.getlist('pname')), list(req.form.getlist('px')))), ('', ''), - ('args_keys', req.args.keys()), - ('args_values', ', '.join(tuple(str(req.args[key]) - for key in req.args.keys()))), - ('args_getfirst', '%s,%s' % (req.args.getfirst('gname'), - req.args.getfirst('gx'))), - ('args_getlist', '%s,%s' % (list(req.args.getlist('gname')), - list(req.args.getlist('gx')))), - )) + ('args_keys', ','.join(req.args.keys())), + ('args_values', + ', '.join(tuple(str(req.args[key]) for key in req.args.keys()))), + ('args_getfirst', + '%s,%s' % (req.args.getfirst('gname'), req.args.getfirst('gx'))), + ('args_getlist', '%s,%s' % + (list(req.args.getlist('gname')), list(req.args.getlist('gx')))), + )) buff = get_header("HTTP Form args test") + \ (get_crumbnav(req), @@ -395,7 +407,7 @@ def test_form(req): "

Variables

", "") + \ tuple("" % - (key, html(val)) for key, val in var_info.items()) + \ + (key, html_escape(val)) for key, val in var_info.items()) + \ ("
%s:%s
", "

Browser Headers

", "") + \ @@ -418,35 +430,46 @@ def test_form(req): @app.route('/test/upload', method=state.METHOD_GET_POST) @check_login def test_upload(req): + """Upload file example.""" var_info = OrderedDict(( ('form_keys', req.form.keys()), - ('form_value_names', ', '.join(tuple(req.form[key].name - for key in req.form.keys()))), - ('form_value_types', ', '.join(tuple(req.form[key].type - for key in req.form.keys()))), - ('form_value_fnames', ', '.join(tuple(str(req.form[key].filename) - for key in req.form.keys()))), - ('form_value_lenghts', ', '.join(tuple(str(req.form[key].length) - for key in req.form.keys()))), - ('form_value_files', ', '.join(tuple(str(req.form[key].file) - for key in req.form.keys()))), - ('form_value_lists', ', '.join(tuple( - 'Yes' if req.form[key].list else 'No' - for key in req.form.keys()))), - )) + ('form_value_names', ', '.join( + tuple(html_escape(req.form[key].name) + for key in req.form.keys()))), + ('form_value_types', ', '.join( + tuple(html_escape(req.form[key].type) + for key in req.form.keys()))), + ('form_value_fnames', ', '.join( + tuple( + html_escape(str(req.form[key].filename)) + for key in req.form.keys()))), + ('form_value_lenghts', + ', '.join(tuple(str(req.form[key].length) + for key in req.form.keys()))), + ('form_value_files', ', '.join( + tuple( + html_escape(str(req.form[key].file)) + for key in req.form.keys()))), + ('form_value_lists', ', '.join( + tuple('Yes' if req.form[key].list else 'No' + for key in req.form.keys()))), + )) files = [] for key in req.form.keys(): if req.form[key].filename: - files.append("

%s

" % req.form[key].filename) - files.append("%s" % req.form[key].type) + files.append(f"

{req.form[key].filename}

") + files.append(f"{req.form[key].type}") if req.form[key].type.startswith('text/'): - files.append("
%s
" % - html(req.form.getvalue(key).decode('utf-8'))) + files.append( + "
" +
+                    html_escape(req.form.getvalue(key).decode('utf-8')) +
+                    "
") else: - files.append("
%s
" % - encodebytes(req.form.getvalue(key)).decode()) - os.remove("./upload/%s" % (req.form[key].filename)) + files.append("
" +
+                             encodebytes(req.form.getvalue(key)).decode() +
+                             "
") + os.remove("./upload/" + req.form[key].filename) buff = get_header('HTTP file upload test') + \ (get_crumbnav(req), @@ -459,8 +482,8 @@ def test_upload(req): '', "

Uploaded File

", "
") + \ - tuple("" % - (key, html(val)) for key, val in var_info.items()) + \ + tuple("" + for key, val in var_info.items()) + \ ("
%s:%s
{key}:{val}
",) + \ tuple(files) + \ get_footer() @@ -473,6 +496,7 @@ def test_upload(req): @app.http_state(state.HTTP_NOT_FOUND) def not_found(req): + """Not found example response.""" buff = ( "", "", @@ -483,7 +507,7 @@ def not_found(req): "", "

404 - Page Not Found

", get_crumbnav(req), - "

Your reqeuest %s was not found.

" % req.uri, + f"

Your reqeuest {req.uri} was not found.

", ) + get_footer() response = Response(status_code=state.HTTP_NOT_FOUND) @@ -502,6 +526,8 @@ def value_error_handler(req, error): @app.route('/test/empty') def test_empty(req): + """No content response""" + assert req res = NoContentResponse() res.add_header("Super-Header", "SuperValue") return res @@ -509,6 +535,7 @@ def test_empty(req): @app.route('/test/partial/unicodes') def test_partial_unicodes(req): + """Partial response test.""" ranges = {} if 'Range' in req.headers: ranges = parse_range(req.headers['Range']) @@ -516,14 +543,18 @@ def test_partial_unicodes(req): start = start or 100 end = end or 199 if end <= start: - start = end+100 - res = PartialResponse(''.join(choices("ěščřžýáíé", k=end+1-start))) + start = end + 100 + # ruff: noqa: S311 + res = PartialResponse(''.join( + choices( # nosec + "ěščřžýáíé", k=end + 1 - start))) res.make_range({(start, end)}, "unicodes") return res @app.route('/test/partial/empty') def test_partial_empty(req): + """Partial empty response test.""" res = Response() ranges = {} if 'Range' in req.headers: @@ -534,6 +565,8 @@ def test_partial_empty(req): @app.route('/test/partial/generator') def test_partial_generator(req): + """Partial response generator test.""" + def gen(): for i in range(10): yield b"line %d\n" % i @@ -547,18 +580,20 @@ def gen(): @app.route('/yield') -def yielded(req): +def yielded(_): """Simple response generator by yield.""" for i in range(10): yield b"line %d\n" % i @app.route('/chunked') -def chunked(req): +def chunked(_): """Generator response with Response class.""" + def gen(): for i in range(10): yield b"line %d\n" % i + return GeneratorResponse(gen(), headers={'Transfer-Encoding': 'chanked'}) @@ -591,7 +626,9 @@ def simple(req): def simple_py(req): """Return simple.py with FileResponse""" last_modified = int(getctime(__file__)) - weak = urlsafe_b64encode(md5(last_modified.to_bytes(4, "big")).digest()) + weak = urlsafe_b64encode( + md5( # nosec + last_modified.to_bytes(4, "big")).digest()) etag = f'W/"{weak.decode()}"' if 'If-None-Match' in req.headers: @@ -613,19 +650,21 @@ def simple_py(req): @app.after_response() -def log_response(req, res): +def log_response(_, res): + """Log after response created.""" log.info("After response") return res @app.route('/internal-server-error') -def method_raises_errror(req): +def method_raises_errror(_): + """Own internal server error test""" raise RuntimeError('Test of internal server error') @app.route('/none-error') -def none_error_handler(req): - return None +def none_error_handler(_): + """Test for None response.""" @app.route('/bad-request') @@ -637,15 +676,18 @@ def bad_request(req): @app.route('/forbidden') def forbidden(req): + """Test forbiden exception.""" raise HTTPException(state.HTTP_FORBIDDEN) @app.route('/not-implemented') def not_implemented(req): + """Test not implemented exception""" raise HTTPException(state.HTTP_NOT_IMPLEMENTED) if __name__ == '__main__': httpd = make_server('127.0.0.1', 8080, app) print("Starting to serve on http://127.0.0.1:8080") + httpd.serve_forever() diff --git a/examples/simple_json.py b/examples/simple_json.py index 74cf453..c3dd9b9 100644 --- a/examples/simple_json.py +++ b/examples/simple_json.py @@ -3,6 +3,7 @@ This sample testing example is free to use, modify and study under same BSD licence as PoorWSGI. So enjoy it ;) """ +# pylint: disable=duplicate-code from wsgiref.simple_server import make_server from sys import path as python_path @@ -68,7 +69,7 @@ def test_json_generator(req): @app.route('/profile') -def get_profile(req): +def get_profile(_): """Returun PROFILE env variable""" return JSONResponse(PROFILE=PROFILE) @@ -80,13 +81,12 @@ def get_timestamp(req): @app.route('/unicode') -def get_unicode(req): +def get_unicode(_): """Return simple JSON with contain raw unicode characters.""" return JSONResponse(name="Ondřej Tůma", encoder_kwargs={"ensure_ascii": False}) - @app.route('/test/headers') def test_headers(req): """Request headers response.""" @@ -110,5 +110,5 @@ def test_headers(req): if __name__ == '__main__': ADDRESS = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1' httpd = make_server(ADDRESS, 8080, app) - print("Starting to serve on http://%s:8080" % ADDRESS) + print(f"Starting to serve on http://{ADDRESS}:8080") httpd.serve_forever() diff --git a/examples/websocket.py b/examples/websocket.py index 1928c05..62299ef 100644 --- a/examples/websocket.py +++ b/examples/websocket.py @@ -38,6 +38,7 @@ """ # pylint: disable=consider-using-f-string +# pylint: disable=duplicate-code from wsgiref.simple_server import make_server, WSGIServer from socketserver import ThreadingMixIn @@ -59,11 +60,17 @@ try: import uwsgi # type: ignore + # pylint: disable=invalid-name WebSocketError = OSError + def WSocketApp(var): # noqa: N802 + """Compatible with wsocket WSocketApp""" + return var + class WebSocket(): """Compatibility class.""" # pylint: disable=no-self-use + def __init__(self): uwsgi.websocket_handshake() @@ -81,7 +88,7 @@ def send(self, msg): uwsgi = None # pylint: disable=invalid-name from wsocket import (WSocketApp, # type: ignore - WebSocketError, FixedHandler) + WebSocketError) def get_websocket(environment): @@ -109,7 +116,7 @@ def receive(self): poor = Application(__name__) poor.debug = True -app = application = poor if uwsgi else WSocketApp(poor) +app = application = WSocketApp(poor) @poor.route('/') @@ -220,6 +227,7 @@ class ThreadingWSGIServer(ThreadingMixIn, WSGIServer): if __name__ == '__main__': + from wsocket import FixedHandler httpd = make_server('127.0.0.1', 8080, app, ThreadingWSGIServer, FixedHandler) print("Starting to serve on http://127.0.0.1:8080") diff --git a/poorwsgi/results.py b/poorwsgi/results.py index 434f8f2..974d146 100644 --- a/poorwsgi/results.py +++ b/poorwsgi/results.py @@ -42,9 +42,9 @@ # pylint: disable=consider-using-f-string -def html_escape(s): +def html_escape(text: str): """Escape to html entities.""" - return ''.join(HTML_ESCAPE_TABLE.get(c, c) for c in s) + return ''.join(HTML_ESCAPE_TABLE.get(c, c) for c in text) def hbytes(val: float): @@ -131,17 +131,17 @@ def internal_server_error(req, **kwargs): if req.uri_handler: handler["module"] = req.uri_handler.__module__ handler["name"] = req.uri_handler.__name__ + uri_rule = html_escape(req.uri_rule) res.write( "

Response detail

\n" - " remote host: {req.remote_host}
\n" - " remote addr: {req.remote_addr}
\n" - " method: {req.method}
\n" - " uri: {req.uri}
\n" - " uri_rule: {req.uri_rule}
\n" + f" remote host: {req.remote_host}
\n" + f" remote addr: {req.remote_addr}
\n" + f" method: {req.method}
\n" + f" uri: {req.uri}
\n" + f" uri_rule: {uri_rule}
\n" " uri_handler: " - "{handler[module]}.{handler[name]}" - "
\n" - "".format(req=req, handler=handler)) + f"{handler['module']}.{handler['name']}" + "
\n") res.write( "

Exception Traceback

\n" diff --git a/setup.py b/setup.py index c62bc78..fb32f3a 100644 --- a/setup.py +++ b/setup.py @@ -1,57 +1,54 @@ """PoorWSGI setup.py""" -from distutils.command.install_data import install_data # type: ignore -from distutils.dir_util import remove_tree - -from os import path, makedirs, walk, environ -from shutil import copyfile -from subprocess import call - import logging +from os import environ, makedirs, path, walk, listdir +from shutil import copyfile, rmtree +from subprocess import call +from typing import ClassVar from setuptools import Command, setup # type: ignore -from setuptools.command.test import test # type: ignore from poorwsgi.state import __version__ - environ.update({'PYTHONPATH': 'poorwsgi'}) +# pylint: disable=missing-function-docstring + def find_data_files(directory, target_folder=""): """Find files in directory, and prepare tuple for setup.""" retval = [] for root, _, files in walk(directory): if target_folder: - retval.append( - (target_folder, - list(root+'/'+f - for f in files if f[0] != '.' and f[-1] != '~'))) + retval.append((target_folder, + list(root + '/' + f for f in files + if f[0] != '.' and f[-1] != '~'))) else: - retval.append( - (root, - list(root+'/'+f - for f in files if f[0] != '.' and f[-1] != '~'))) + retval.append((root, + list(root + '/' + f for f in files + if f[0] != '.' and f[-1] != '~'))) return retval class BuildDoc(Command): """Build html documentation.""" description = "build html documentation, need jinja24doc >= 1.1.0" - user_options = [ - ('build-base=', 'b', - "base build directory (default: 'build.build-base')"), - ('html-temp=', 't', "temporary documentation directory"), - ('public', 'p', "build as part of public poorhttp web") - ] + user_options: ClassVar[list[tuple]] = [ + ("build-base=", "b", + "base build directory (default: 'build.build-base')"), + ('html-temp=', 't', "temporary documentation directory"), + ('public', 'p', "build as part of public poorhttp web") + ] + + build_base: str | None + html_temp: str | None + public: bool = False def initialize_options(self): self.build_base = None self.html_temp = None - self.public = False def finalize_options(self): - self.set_undefined_options('build', - ('build_base', 'build_base')) + self.set_undefined_options('build', ('build_base', 'build_base')) if self.html_temp is None: self.html_temp = path.join(self.build_base, 'html') @@ -59,11 +56,13 @@ def page(self, in_name, out_name=None): """Generate page.""" if out_name is None: out_name = in_name - if call(['jinja24doc', '-v', '--var', f'public={self.public}', - f'_{in_name}.html', 'doc'], - stdout=open(f'{self.html_temp}/{out_name}.html', 'w', - encoding="utf-8")): - raise IOError(1, 'jinja24doc failed') + with open(f'{self.html_temp}/{out_name}.html', 'w', + encoding="utf-8") as stdout: + call([ + 'jinja24doc', '-v', '--var', f'public={self.public}', + f'_{in_name}.html', 'doc' + ], + stdout=stdout) def run(self): logging.info("building html documentation") @@ -79,95 +78,79 @@ def run(self): self.page('documentation') self.page('poorwsgi_api', 'api') self.page('licence') - copyfile('doc/style.css', self.html_temp+'/style.css') - copyfile('doc/web.css', self.html_temp+'/web.css') - copyfile('doc/small-logo.png', self.html_temp+'/small-logo.png') + copyfile('doc/style.css', self.html_temp + '/style.css') + copyfile('doc/web.css', self.html_temp + '/web.css') + copyfile('doc/small-logo.png', self.html_temp + '/small-logo.png') class CleanDoc(Command): """Clean temporary files from build_doc command.""" description = "clean up temporary files from 'build_doc' command" - user_options = [ - ('build-base=', 'b', - "base build directory (default: 'build-html.build-base')"), - ('html-temp=', 't', - "temporary documentation directory") - ] + user_options: ClassVar[list[tuple]] = [ + ('build-base=', 'b', + "base build directory (default: 'build-html.build-base')"), + ('html-temp=', 't', "temporary documentation directory") + ] + + build_base: str | None + html_temp: str | None def initialize_options(self): self.build_base = None self.html_temp = None def finalize_options(self): - self.set_undefined_options('build_doc', - ('build_base', 'build_base'), + self.set_undefined_options('build_doc', ('build_base', 'build_base'), ('html_temp', 'html_temp')) def run(self): if path.exists(self.html_temp): - remove_tree(self.html_temp, dry_run=self.dry_run) + if self.dry_run: + return + rmtree(self.html_temp) else: - logging.warn("'%s' does not exist -- can't clean it", self.html_temp) + logging.warning("'%s' does not exist -- can't clean it", + self.html_temp) -class InstallDoc(install_data): +class InstallDoc(Command): """Install documentation files.""" description = "install html documentation" - user_options = install_data.user_options + [ + user_options: ClassVar[list[tuple]] = [ ('build-base=', 'b', "base build directory (default: 'build-html.build-base')"), - ('html-temp=', 't', - "temporary documentation directory"), - ('skip-build', None, "skip the build step"), - ] + ('html-temp=', 't', "temporary documentation directory"), + ] + + build_base: str | None + html_temp: str | None + dest_dir: str | None + data_files: list[tuple] def initialize_options(self): self.build_base = None self.html_temp = None - self.skip_build = None - install_data.initialize_options(self) def finalize_options(self): - self.set_undefined_options('build_doc', - ('build_base', 'build_base'), + self.set_undefined_options('build_doc', ('build_base', 'build_base'), ('html_temp', 'html_temp')) - self.set_undefined_options('install', - ('skip_build', 'skip_build')) - install_data.finalize_options(self) + if self.dest_dir is None: + self.dest_dir = path.join(self.install_data, "share", "doc", + "poorwsgi", "html") def run(self): - if not self.skip_build: - self.run_command('build_doc') - self.data_files = find_data_files(self.html_temp, - 'share/doc/poorwsgi/html') - install_data.run(self) - - -class PyTest(test): - """Run tests.""" - user_options = [('pytest-args=', - 'a', 'Arguments to pass to py.test.'), - ('test-suite=', - 't', 'Test suite/module::Class::test')] - - def initialize_options(self): - test.initialize_options(self) - self.pytest_args = [] - - def finalize_options(self): - test.finalize_options(self) - if isinstance(self.pytest_args, (str)): - self.pytest_args = self.pytest_args.split(' ') + self.mkpath(self.dest_dir) + for page in listdir(self.html_temp): + src = f"{self.html_temp}/{page}" + (out, _) = self.copy_file(src, self.dest_dir) + self.outfiles.append(out) - self.pytest_args.append(self.test_suite or 'tests') - if self.verbose: - self.pytest_args.insert(0, '-v') + def get_inputs(self): + # pylint: disable=no-self-use + return [] - def run_tests(self): - # import here, cause outside the eggs aren't loaded - import pytest - if pytest.main(self.pytest_args) != 0: - raise RuntimeError("Test failed") + def get_outputs(self): + return self.outfiles def doc(): @@ -176,54 +159,48 @@ def doc(): return readme.read().strip() -setup( - name="PoorWSGI", - version=__version__, - description="Poor WSGI connector for Python", - author="Ondřej Tůma", - author_email="mcbig@zeropage.cz", - maintainer="Ondrej Tuma", - maintainer_email="mcbig@zeropage.cz", - url="http://poorhttp.zeropage.cz/poorwsgi", - project_urls={ - 'Documentation': 'http://poorhttp.zeropage.cz/poorwsgi', - 'Funding': 'https://github.com/sponsors/ondratu', - 'Source': 'https://github.com/poorHttp/PoorWSGI', - 'Tracker': 'https://github.com/PoorHttp/PoorWSGI/issues'}, - packages=['poorwsgi'], - package_data={'': ['py.typed']}, - data_files=[ - ('share/doc/poorwsgi', - ['doc/ChangeLog', 'doc/licence.txt', 'README.rst', - 'CONTRIBUTION.rst']) - ] + find_data_files("examples", "share/poorwsgi/examples"), - license="BSD", - license_files='doc/licence.txt', - long_description=doc(), - long_description_content_type="text/x-rst", - keywords='web wsgi development', - classifiers=[ - "Development Status :: 5 - Production/Stable", - "Environment :: Web Environment", - "Intended Audience :: Developers", - "License :: OSI Approved :: BSD License", - "Natural Language :: English", - "Natural Language :: Czech", - "Operating System :: MacOS :: MacOS X", - "Operating System :: POSIX", - "Operating System :: POSIX :: BSD", - "Operating System :: POSIX :: Linux", - "Programming Language :: Python :: 3 :: Only", - "Topic :: Internet :: WWW/HTTP :: Dynamic Content", - "Topic :: Internet :: WWW/HTTP :: WSGI :: Middleware", - "Topic :: Software Development :: Libraries :: Python Modules" - ], - python_requires=">=3.8", - cmdclass={'build_doc': BuildDoc, - 'clean_doc': CleanDoc, - 'install_doc': InstallDoc, - 'test': PyTest}, - tests_require=['pytest', 'requests', 'openapi-core', 'simplejson'], - extras_require={ - 'JSONGeneratorResponse': ['simplejson']} -) +setup(name="PoorWSGI", + version=__version__, + description="Poor WSGI connector for Python", + author="Ondřej Tůma", + author_email="mcbig@zeropage.cz", + maintainer="Ondrej Tuma", + maintainer_email="mcbig@zeropage.cz", + url="http://poorhttp.zeropage.cz/poorwsgi", + project_urls={ + 'Documentation': 'http://poorhttp.zeropage.cz/poorwsgi', + 'Funding': 'https://github.com/sponsors/ondratu', + 'Source': 'https://github.com/poorHttp/PoorWSGI', + 'Tracker': 'https://github.com/PoorHttp/PoorWSGI/issues' + }, + packages=['poorwsgi'], + package_data={'': ['py.typed']}, + data_files=[('share/doc/poorwsgi', [ + 'doc/ChangeLog', 'doc/licence.txt', 'README.rst', 'CONTRIBUTION.rst' + ])] + find_data_files("examples", "share/poorwsgi/examples"), + license="BSD", + license_files='doc/licence.txt', + long_description=doc(), + long_description_content_type="text/x-rst", + keywords='web wsgi development', + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Environment :: Web Environment", "Intended Audience :: Developers", + "License :: OSI Approved :: BSD License", + "Natural Language :: English", "Natural Language :: Czech", + "Operating System :: MacOS :: MacOS X", "Operating System :: POSIX", + "Operating System :: POSIX :: BSD", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3 :: Only", + "Topic :: Internet :: WWW/HTTP :: Dynamic Content", + "Topic :: Internet :: WWW/HTTP :: WSGI :: Middleware", + "Topic :: Software Development :: Libraries :: Python Modules" + ], + python_requires=">=3.8", + cmdclass={ + 'build_doc': BuildDoc, + 'clean_doc': CleanDoc, + 'install_doc': InstallDoc, + }, + tests_require=['pytest', 'requests', 'openapi-core', 'simplejson'], + extras_require={'JSONGeneratorResponse': ['simplejson']}) diff --git a/tests/test_digest.py b/tests/test_digest.py index f62f3fd..4ab9beb 100644 --- a/tests/test_digest.py +++ b/tests/test_digest.py @@ -1,6 +1,6 @@ """Tests for digest functionality.""" -from os.path import join, dirname, pardir from collections import defaultdict +from os.path import dirname, join, pardir from pytest import fixture @@ -19,13 +19,21 @@ response="0dcecfef979808a940c34af8a3bc7ee4", opaque="5af87f1f655f3e60b744462318d3bbcaae8079df107dbcb08168e1a93fd943d0", qop=auth, nc=00000002, cnonce="b636b6204f836fdc"''' -DICT = dict( # Dictionary from HEADER values - type='Digest', username='user', realm='User Zone', - nonce='1bcb61aa4910f2d462db474efd51a49018413d811536d746538715480e7e730a', - uri='/user', algorithm='MD5-sess', - response='0dcecfef979808a940c34af8a3bc7ee4', - opaque='5af87f1f655f3e60b744462318d3bbcaae8079df107dbcb08168e1a93fd943d0', - qop='auth', nc='00000002', cnonce='b636b6204f836fdc') +DICT = { # Dictionary from HEADER values + "type": 'Digest', + "username": 'user', + "realm": 'User Zone', + "nonce": + '1bcb61aa4910f2d462db474efd51a49018413d811536d746538715480e7e730a', + "uri": '/user', + "algorithm": 'MD5-sess', + "response": '0dcecfef979808a940c34af8a3bc7ee4', + "opaque": + '5af87f1f655f3e60b744462318d3bbcaae8079df107dbcb08168e1a93fd943d0', + "qop": 'auth', + "nc": '00000002', + "cnonce": 'b636b6204f836fdc' +} # pylint: disable=missing-function-docstring # pylint: disable=no-self-use @@ -55,6 +63,7 @@ def req(app): class TestMap(): """Test for PasswordMap class.""" + def test_set(self, pmap): assert REALM in pmap assert USER in pmap[REALM] diff --git a/tests_integrity/openapi.py b/tests_integrity/openapi.py index 4db2d47..a0f27bc 100644 --- a/tests_integrity/openapi.py +++ b/tests_integrity/openapi.py @@ -1,6 +1,6 @@ """requests library object Wrapper for openapi_core library objects.""" from urllib.parse import urlparse, parse_qs -from cgi import parse_header +from email.message import Message import json @@ -25,8 +25,9 @@ def __init__(self, request, path_pattern=None): self.request = request self.data = request.data - ctype = parse_header(request.headers.get('Content-Type', '')) - self.mimetype = ctype[0] + msg = Message() + msg['content-type'] = request.headers.get('Content-Type', '') + self.mimetype = msg.get_content_type() self.parameters = RequestParameters( path=args, @@ -52,7 +53,9 @@ class OpenAPIResponse(): def __init__(self, response): self.response = response - self.ctype = parse_header(response.headers.get('Content-Type', '')) + msg = Message() + msg['content-type'] = response.headers.get('Content-Type', '') + self.content_type = msg.get_content_type() @property def data(self): @@ -67,7 +70,7 @@ def status_code(self): @property def mimetype(self): """Response Content-Type""" - return self.ctype[0] + return self.content_type @property def headers(self): diff --git a/tests_integrity/test_digest.py b/tests_integrity/test_digest.py index 5cb0ce0..58c42f6 100644 --- a/tests_integrity/test_digest.py +++ b/tests_integrity/test_digest.py @@ -2,10 +2,10 @@ from os import environ from os.path import dirname, join, pardir -from requests.auth import HTTPDigestAuth from pytest import fixture, mark +from requests.auth import HTTPDigestAuth -from . support import start_server, check_url +from .support import check_url, start_server # pylint: disable=inconsistent-return-statements # pylint: disable=missing-function-docstring @@ -46,6 +46,7 @@ def utf8_auth(): class TestDigest: """Test http_digest example.""" + def test_unauthorized(self, url): check_url(url+'/admin_zone', status_code=401) check_url(url+'/user_zone', status_code=401) @@ -54,22 +55,22 @@ def test_unauthorized(self, url): def test_admin(self, url, admin_auth): check_url(url+'/admin_zone', auth=admin_auth) - check_url(url+'/admin_zone', params=dict(arg=42), auth=admin_auth) + check_url(url+'/admin_zone', params={"arg": 42}, auth=admin_auth) def test_user(self, url, user_auth): check_url(url+'/user_zone', auth=user_auth) - check_url(url+'/user_zone', params=dict(param='text'), auth=user_auth) + check_url(url+'/user_zone', params={"param": 'text'}, auth=user_auth) check_url(url+'/user', auth=user_auth) @mark.skip("https://github.com/psf/requests/issues/6102") def test_utf8(self, url, utf8_auth): """Check UTF-8 characters in username.""" check_url(url+'/user/utf-8', auth=utf8_auth) - check_url(url+'/user/utf-8', params=dict(param='text'), auth=utf8_auth) + check_url(url+'/user/utf-8', params={"param": 'text'}, auth=utf8_auth) def test_foo(self, url): auth = HTTPDigestAuth('foo', 'bar') - check_url(url+'/foo', params=dict(x=123), auth=auth) + check_url(url+'/foo', params={"x": 123}, auth=auth) check_url(url+'/foo', auth=auth) check_url(url+'/foo/passwd', method='POST', auth=auth, diff --git a/tests_integrity/test_json.py b/tests_integrity/test_json.py index cc578a9..f76471d 100644 --- a/tests_integrity/test_json.py +++ b/tests_integrity/test_json.py @@ -16,7 +16,7 @@ @fixture(scope="module") def server(request): - value = environ.get("TEST_SIMPLE_URL", "").strip('/') + value = environ.get("TEST_SIMPLE_JSON_URL", "").strip('/') if value: return value @@ -31,6 +31,7 @@ def server(request): class TestHeaders: """Test right headers in response.""" + def test_headers_empty(self, server): res = check_url(server+"/test/headers") assert "X-Powered-By" in res.headers @@ -71,7 +72,7 @@ def test_json_request(self, server): data = [{"x": 124.2, "y": 100.1}] res = check_url(server+"/test/json", status_code=418, method="POST", json=data, timeout=1) - assert res.json() == {"message": "I\'m teapot :-)", + assert res.json() == {"message": "I'm teapot :-)", "numbers": [0, 1, 2, 3, 4], "request": data} @@ -86,7 +87,7 @@ def generator(): res = check_url(server+"/test/json", status_code=418, method="POST", data=generator(), headers={'Content-Type': 'application/json'}) - assert res.json() == {"message": "I\'m teapot :-)", + assert res.json() == {"message": "I'm teapot :-)", "numbers": [0, 1, 2, 3, 4], "request": [{"0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "end": None}]} @@ -94,15 +95,16 @@ def generator(): class TestResponse: """Test right responses.""" + def test_json_response(self, server): res = check_url(server+"/test/json", status_code=418, timeout=1) - assert res.json() == {"message": "I\'m teapot :-)", + assert res.json() == {"message": "I'm teapot :-)", "numbers": [0, 1, 2, 3, 4], "request": {}} def test_json_generator_response(self, server): res = check_url(server+"/test/json-generator", status_code=418) - assert res.json() == {"message": "I\'m teapot :-)", + assert res.json() == {"message": "I'm teapot :-)", "numbers": [0, 1, 2, 3, 4], "request": {}} @@ -124,7 +126,6 @@ def test_raw_unicode(self, server): assert res.text == data assert int(res.headers['Content-Length']) == len(data.encode("utf-8")) - def test_bad_json_response(self, server): check_url(server+"/test/json", status_code=400, method="POST", data=b"abraka crash", diff --git a/tests_integrity/test_profile.py b/tests_integrity/test_profile.py index ce536b6..3d56a91 100644 --- a/tests_integrity/test_profile.py +++ b/tests_integrity/test_profile.py @@ -11,11 +11,12 @@ # pylint: disable=redefined-outer-name # pylint: disable=no-self-use # pylint: disable=consider-using-f-string +# pylint: disable=duplicate-code @fixture(scope="module") def server(request): - value = environ.get("TEST_SIMPLE_URL", "").strip('/') + value = environ.get("TEST_SIMPLE_JSON_URL", "").strip('/') if value: return value diff --git a/tests_integrity/test_simple.py b/tests_integrity/test_simple.py index 600a001..4650bcc 100644 --- a/tests_integrity/test_simple.py +++ b/tests_integrity/test_simple.py @@ -7,6 +7,7 @@ from . support import start_server, check_url +# pylint: disable=inconsistent-return-statements # pylint: disable=missing-function-docstring # pylint: disable=no-self-use # pylint: disable=redefined-outer-name @@ -15,6 +16,7 @@ @fixture(scope="module") def url(request): + """URL (server fixture in fact).""" url = environ.get("TEST_SIMPLE_URL", "").strip('/') if url: return url @@ -40,6 +42,8 @@ def session(url): class TestSimple(): + """Test for routes.""" + def test_root(self, url): check_url(url) @@ -73,6 +77,7 @@ def test_debug_info(self, url): class TestRequest: """Test for requests.""" # pylint: disable=too-few-public-methods + def test_stream_request(self, url): def generator(): for i in range(5): @@ -83,6 +88,7 @@ def generator(): class TestResponses(): """Tests for Responses""" + def test_yield(self, url): """yield function is done by GeneratorResponse.""" check_url(url+"/yield") @@ -160,6 +166,8 @@ def test_unicodes(self, url): class TestSession(): + """Session tests.""" + def test_login(self, url): check_url(url+"/login", status_code=302, allow_redirects=False) @@ -181,27 +189,30 @@ def test_form_post(self, url, session): allow_redirects=False) def test_form_upload(self, url, session): - files = {'file_0': ('testfile.py', open(__file__, 'rb'), - 'text/x-python', {'Expires': '0'})} - res = check_url(url+"/test/upload", method="POST", session=session, - allow_redirects=False, files=files) - assert 'testfile.py' in res.text - assert __doc__ in res.text - assert 'anything' in res.text + with open(__file__, 'rb') as _file: + files = {'file_0': ('testfile.py', _file, + 'text/x-python', {'Expires': '0'})} + res = check_url(url+"/test/upload", method="POST", session=session, + allow_redirects=False, files=files) + assert 'testfile.py' in res.text + assert __doc__ in res.text + assert 'anything' in res.text def test_form_upload_small(self, url, session): manifest = join(dirname(__file__), pardir, 'MANIFEST.in') - files = {'file_0': ('MANIFEST.in', open(manifest, 'rb'), - 'text/plain', {'Expires': '0'})} - res = check_url(url+"/test/upload", method="POST", session=session, - allow_redirects=False, files=files) - assert 'MANIFEST.in' in res.text - assert 'graft' in res.text - assert 'global-exclude' in res.text + with open(manifest, 'rb') as _file: + files = {'file_0': ('MANIFEST.in', _file, + 'text/plain', {'Expires': '0'})} + res = check_url(url+"/test/upload", method="POST", session=session, + allow_redirects=False, files=files) + assert 'MANIFEST.in' in res.text + assert 'graft' in res.text + assert 'global-exclude' in res.text class TestErrors(): """Integrity tests for native http state handlers.""" + def test_internal_server_error(self, url): check_url(url+"/internal-server-error", status_code=500) diff --git a/tests_integrity/test_websocket.py b/tests_integrity/test_websocket.py index 7f5086d..101afff 100644 --- a/tests_integrity/test_websocket.py +++ b/tests_integrity/test_websocket.py @@ -6,7 +6,7 @@ from pytest import fixture # websocket-client -from websocket import WebSocket # type: ignore +from websocket import WebSocket from . support import start_server, check_url @@ -18,7 +18,7 @@ @fixture(scope="module") def server(request): - value = environ.get("TEST_SIMPLE_URL", "").strip('/') + value = environ.get("TEST_WEBSOCKET_URL", "").strip('/') if value: return value @@ -54,6 +54,8 @@ def test_upgrade(self, http_url): encodebytes(uuid).decode().strip()}) def test_websocket(self, ws_url): + # python websocket library breaks usage websocket-client with pylint + # pylint: disable=no-member wsck = WebSocket() wsck.connect(ws_url+"/ws") msg = wsck.recv()