Skip to content

Commit

Permalink
Prepare certbot module for mypy check untyped defs
Browse files Browse the repository at this point in the history
* Fix certbot#5952

* Bump mypy to version 0.600 and fix associated bugs

* Fix pylint bugs after introducing mypy
  • Loading branch information
dmfigol committed May 16, 2018
1 parent 722dac8 commit 91a5050
Show file tree
Hide file tree
Showing 36 changed files with 299 additions and 213 deletions.
4 changes: 2 additions & 2 deletions acme/acme/challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ class KeyAuthorizationChallenge(_TokenChallenge):
:param response_cls: Subclass of `KeyAuthorizationChallengeResponse`
that will be used to generate `response`.
:param typ: type of the challenge
"""

type = NotImplemented
response_cls = NotImplemented
thumbprint_hash_function = (
KeyAuthorizationChallengeResponse.thumbprint_hash_function)
Expand Down
21 changes: 15 additions & 6 deletions acme/acme/crypto_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from acme import errors
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Callable, Text, Union
from acme.magic_typing import Callable, Union, Tuple


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -135,14 +135,23 @@ def probe_sni(name, host, port=443, timeout=300,

socket_kwargs = {'source_address': source_address}

host_protocol_agnostic = None if host == '::' or host == '0' else host
host_protocol_agnostic = host
if host == '::' or host == '0':
# https://github.com/python/typeshed/pull/2136
# while PR is not merged, it can't be None
host_protocol_agnostic = ''

try:
# pylint: disable=star-args
logger.debug("Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
" from {0}:{1}".format(source_address[0], source_address[1]) if \
socket_kwargs else "")
sock = socket.create_connection((host_protocol_agnostic, port), **socket_kwargs)
logger.debug(
"Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
" from {0}:{1}".format(
source_address[0],
source_address[1]
) if socket_kwargs else ""
)
socket_tuple = (host_protocol_agnostic, port) # type: Tuple[str, int]
sock = socket.create_connection(socket_tuple, **socket_kwargs)
except socket.error as error:
raise errors.Error(error)

Expand Down
1 change: 1 addition & 0 deletions acme/acme/magic_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ def __getattr__(self, name):
try:
# mypy doesn't respect modifying sys.modules
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
from typing import IO # pylint: disable=unused-import
except ImportError:
sys.modules[__name__] = TypingClass()
22 changes: 11 additions & 11 deletions certbot/auth_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from acme import challenges
from acme import messages

from acme.magic_typing import DefaultDict, Dict, List, Set # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
from certbot import error_handler
Expand Down Expand Up @@ -117,7 +117,7 @@ def _has_challenges(self, aauthzrs):

def _solve_challenges(self, aauthzrs):
"""Get Responses for challenges from authenticators."""
resp = []
resp = [] # type: List[str]
all_achalls = self._get_all_achalls(aauthzrs)
try:
if all_achalls:
Expand All @@ -133,10 +133,9 @@ def _solve_challenges(self, aauthzrs):

def _get_all_achalls(self, aauthzrs):
"""Return all active challenges."""
all_achalls = []
all_achalls = [] # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
for aauthzr in aauthzrs:
all_achalls.extend(aauthzr.achalls)

return all_achalls

def _respond(self, aauthzrs, resp, best_effort):
Expand All @@ -146,7 +145,8 @@ def _respond(self, aauthzrs, resp, best_effort):
"""
# TODO: chall_update is a dirty hack to get around acme-spec #105
chall_update = dict()
chall_update = dict() \
# type: Dict[int, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
self._send_responses(aauthzrs, resp, chall_update)

# Check for updated status...
Expand Down Expand Up @@ -198,7 +198,7 @@ def _poll_challenges(self, aauthzrs, chall_update,
while indices_to_check and rounds < max_rounds:
# TODO: Use retry-after...
time.sleep(min_sleep)
all_failed_achalls = set()
all_failed_achalls = set() # type: Set[achallenges.KeyAuthorizationAnnotatedChallenge]
for index in indices_to_check:
comp_achalls, failed_achalls = self._handle_check(
aauthzrs, index, chall_update[index])
Expand Down Expand Up @@ -424,7 +424,7 @@ def _find_smart_path(challbs, preferences, combinations):

# max_cost is now equal to sum(indices) + 1

best_combo = []
best_combo = None
# Set above completing all of the available challenges
best_combo_cost = max_cost

Expand Down Expand Up @@ -479,7 +479,7 @@ def _report_no_chall_path(challbs):
msg += (
" You may need to use an authenticator "
"plugin that can do challenges over DNS.")
logger.fatal(msg)
logger.critical(msg)
raise errors.AuthorizationError(msg)


Expand Down Expand Up @@ -522,11 +522,11 @@ def _report_failed_challs(failed_achalls):
:class:`certbot.achallenges.AnnotatedChallenge`.
"""
problems = dict()
problems = collections.defaultdict(list)\
# type: DefaultDict[str, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
for achall in failed_achalls:
if achall.error:
problems.setdefault(achall.error.typ, []).append(achall)

problems[achall.error.typ].append(achall)
reporter = zope.component.getUtility(interfaces.IReporter)
for achalls in six.itervalues(problems):
reporter.add_message(
Expand Down
5 changes: 3 additions & 2 deletions certbot/cert_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import traceback
import zope.component

from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
Expand Down Expand Up @@ -226,7 +227,7 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
def find_matches(candidate_lineage, return_value, acceptable_matches):
"""Returns a list of matches using _search_lineages."""
acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
acceptable_matches_rv = []
acceptable_matches_rv = [] # type: List[str]
for item in acceptable_matches:
if isinstance(item, list):
acceptable_matches_rv += item
Expand Down Expand Up @@ -340,7 +341,7 @@ def _report_human_readable(config, parsed_certs):

def _describe_certs(config, parsed_certs, parse_failures):
"""Print information about the certs we know about"""
out = []
out = [] # type: List[str]

notify = out.append

Expand Down
53 changes: 33 additions & 20 deletions certbot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import configargparse
import six
import zope.component
import zope.interface

from zope.interface import interfaces as zope_interfaces

from acme import challenges
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Any, Dict, Optional

import certbot

Expand All @@ -33,7 +36,7 @@
logger = logging.getLogger(__name__)

# Global, to save us from a lot of argument passing within the scope of this module
helpful_parser = None
helpful_parser = None # type: Optional[HelpfulArgumentParser]

# For help strings, figure out how the user ran us.
# When invoked from letsencrypt-auto, sys.argv[0] is something like:
Expand Down Expand Up @@ -196,17 +199,17 @@ def set_by_cli(var):
(CLI or config file) including if the user explicitly set it to the
default. Returns False if the variable was assigned a default value.
"""
detector = set_by_cli.detector
if detector is None:
detector = set_by_cli.detector # type: ignore
if detector is None and helpful_parser is not None:
# Setup on first run: `detector` is a weird version of config in which
# the default value of every attribute is wrangled to be boolean-false
plugins = plugins_disco.PluginsRegistry.find_all()
# reconstructed_args == sys.argv[1:], or whatever was passed to main()
reconstructed_args = helpful_parser.args + [helpful_parser.verb]
detector = set_by_cli.detector = prepare_and_parse_args(
detector = set_by_cli.detector = prepare_and_parse_args( # type: ignore
plugins, reconstructed_args, detect_defaults=True)
# propagate plugin requests: eg --standalone modifies config.authenticator
detector.authenticator, detector.installer = (
detector.authenticator, detector.installer = ( # type: ignore
plugin_selection.cli_plugin_requests(detector))

if not isinstance(getattr(detector, var), _Default):
Expand All @@ -220,7 +223,10 @@ def set_by_cli(var):
return True

return False

# static housekeeping var
# functions attributed are not supported by mypy
# https://github.com/python/mypy/issues/2087
set_by_cli.detector = None # type: ignore


Expand All @@ -236,8 +242,10 @@ def has_default_value(option, value):
:rtype: bool
"""
return (option in helpful_parser.defaults and
helpful_parser.defaults[option] == value)
if helpful_parser is not None:
return (option in helpful_parser.defaults and
helpful_parser.defaults[option] == value)
return False


def option_was_set(option, value):
Expand All @@ -254,11 +262,12 @@ def option_was_set(option, value):


def argparse_type(variable):
"Return our argparse type function for a config variable (default: str)"
"""Return our argparse type function for a config variable (default: str)"""
# pylint: disable=protected-access
for action in helpful_parser.parser._actions:
if action.type is not None and action.dest == variable:
return action.type
if helpful_parser is not None:
for action in helpful_parser.parser._actions:
if action.type is not None and action.dest == variable:
return action.type
return str

def read_file(filename, mode="rb"):
Expand Down Expand Up @@ -291,10 +300,12 @@ def flag_default(name):

def config_help(name, hidden=False):
"""Extract the help message for an `.IConfig` attribute."""
# pylint: disable=no-member
if hidden:
return argparse.SUPPRESS
else:
return interfaces.IConfig[name].__doc__
field = interfaces.IConfig.__getitem__(name) # type: zope.interface.interface.Attribute
return field.__doc__


class HelpfulArgumentGroup(object):
Expand Down Expand Up @@ -473,7 +484,7 @@ def __init__(self, args, plugins, detect_defaults=False):
HELP_TOPICS += list(self.VERBS) + self.COMMANDS_TOPICS + ["manage"]

plugin_names = list(plugins)
self.help_topics = HELP_TOPICS + plugin_names + [None]
self.help_topics = HELP_TOPICS + plugin_names + [None] # type: ignore

self.detect_defaults = detect_defaults
self.args = args
Expand All @@ -492,8 +503,11 @@ def __init__(self, args, plugins, detect_defaults=False):
short_usage = self._usage_string(plugins, self.help_arg)

self.visible_topics = self.determine_help_topics(self.help_arg)
self.groups = {} # elements are added by .add_group()
self.defaults = {} # elements are added by .parse_args()

# elements are added by .add_group()
self.groups = {} # type: Dict[str, argparse._ArgumentGroup]
# elements are added by .parse_args()
self.defaults = {} # type: Dict[str, Any]

self.parser = configargparse.ArgParser(
prog="certbot",
Expand Down Expand Up @@ -805,7 +819,6 @@ def add_group(self, topic, verbs=(), **kwargs):
if self.help_arg:
for v in verbs:
self.groups[topic].add_argument(v, help=VERB_HELP_MAP[v]["short"])

return HelpfulArgumentGroup(self, topic)

def add_plugin_args(self, plugins):
Expand Down Expand Up @@ -1296,14 +1309,14 @@ def _paths_parser(helpful):
verb = helpful.help_arg

cph = "Path to where certificate is saved (with auth --csr), installed from, or revoked."
section = ["paths", "install", "revoke", "certonly", "manage"]
sections = ["paths", "install", "revoke", "certonly", "manage"]
if verb == "certonly":
add(section, "--cert-path", type=os.path.abspath,
add(sections, "--cert-path", type=os.path.abspath,
default=flag_default("auth_cert_path"), help=cph)
elif verb == "revoke":
add(section, "--cert-path", type=read_file, required=True, help=cph)
add(sections, "--cert-path", type=read_file, required=True, help=cph)
else:
add(section, "--cert-path", type=os.path.abspath, help=cph)
add(sections, "--cert-path", type=os.path.abspath, help=cph)

section = "paths"
if verb in ("install", "revoke"):
Expand Down
14 changes: 8 additions & 6 deletions certbot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import platform

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key # type: ignore
import josepy as jose
import OpenSSL
import zope.component
Expand Down Expand Up @@ -156,11 +156,11 @@ def register(config, account_storage, tos_cb=None):
logger.info("Registering without email!")

# Each new registration shall use a fresh new key
key = jose.JWKRSA(key=jose.ComparableRSAKey(
rsa.generate_private_key(
rsa_key = generate_private_key(
public_exponent=65537,
key_size=config.rsa_key_size,
backend=default_backend())))
backend=default_backend())
key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa_key))
acme = acme_from_config_key(config, key)
# TODO: add phone?
regr = perform_registration(acme, config, tos_cb)
Expand Down Expand Up @@ -605,8 +605,10 @@ def validate_key_csr(privkey, csr=None):
if csr.form == "der":
csr_obj = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, csr.data)
csr = util.CSR(csr.file, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, csr_obj), "pem")
cert_buffer = OpenSSL.crypto.dump_certificate( # type: ignore
OpenSSL.crypto.FILETYPE_PEM, csr_obj
)
csr = util.CSR(csr.file, cert_buffer, "pem")

# If CSR is provided, it must be readable and valid.
if csr.data and not crypto_util.valid_csr(csr.data):
Expand Down
Loading

0 comments on commit 91a5050

Please sign in to comment.