Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type annotations to crypto_utils and config #5532

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,11 @@ disallow_untyped_defs = True
[mypy-journalist_app.api]
disallow_untyped_defs = True

[mypy-crypto_util]
disallow_untyped_defs = True

[mypy-sdconfig]
disallow_untyped_defs = True

[mypy-config]
ignore_errors = True
100 changes: 41 additions & 59 deletions securedrop/crypto_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-

from distutils.version import StrictVersion
from typing import Optional

import pretty_bad_protocol as gnupg
import os
import io
Expand All @@ -13,20 +15,14 @@
from flask import current_app
from pretty_bad_protocol._util import _is_stream, _make_binary_stream
from redis import Redis
from typing import List

from typing import Dict

import rm

from models import Source

import typing
# https://www.python.org/dev/peps/pep-0484/#runtime-or-type-checking
if typing.TYPE_CHECKING:
# flake8 can not understand type annotation yet.
# That is why all type annotation relative import
# statements has to be marked as noqa.
# http://flake8.pycqa.org/en/latest/user/error-codes.html?highlight=f401stream
from typing import Dict, List, Text # noqa: F401


# to fix GPG error #78 on production
os.environ['USERNAME'] = 'www-data'
Expand All @@ -41,7 +37,9 @@
'BCDEFGHIJKLMNOPQRSTUVWXYZ')


def monkey_patch_delete_handle_status(self, key, value):
def monkey_patch_delete_handle_status(
self: gnupg._parsers.DeleteResult, key: str, value: str
) -> None:
"""
Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
Expand Down Expand Up @@ -82,14 +80,14 @@ class CryptoUtil:
SOURCE_KEY_UID_RE = re.compile(r"(Source|Autogenerated) Key <[-A-Za-z0-9+/=_]+>")

def __init__(self,
scrypt_params,
scrypt_id_pepper,
scrypt_gpg_pepper,
securedrop_root,
word_list,
nouns_file,
adjectives_file,
gpg_key_dir):
scrypt_params: Dict[str, int],
scrypt_id_pepper: str,
scrypt_gpg_pepper: str,
securedrop_root: str,
word_list: str,
nouns_file: str,
adjectives_file: str,
gpg_key_dir: str) -> None:
self.__securedrop_root = securedrop_root
self.__word_list = word_list

Expand Down Expand Up @@ -119,7 +117,7 @@ def __init__(self,
self.gpg = gpg_binary

# map code for a given language to a localized wordlist
self.__language2words = {} # type: Dict[Text, List[str]]
self.__language2words = {} # type: Dict[str, List[str]]

with io.open(nouns_file) as f:
self.nouns = f.read().splitlines()
Expand All @@ -130,14 +128,14 @@ def __init__(self,
self.redis = Redis(decode_responses=True)

# Make sure these pass before the app can run
def do_runtime_tests(self):
def do_runtime_tests(self) -> None:
if self.scrypt_id_pepper == self.scrypt_gpg_pepper:
raise AssertionError('scrypt_id_pepper == scrypt_gpg_pepper')
# crash if we don't have a way to securely remove files
if not rm.check_secure_delete_capability():
raise AssertionError("Secure file deletion is not possible.")

def get_wordlist(self, locale: 'Text') -> 'List[str]':
def get_wordlist(self, locale: str) -> List[str]:
"""" Ensure the wordlist for the desired locale is read and available
in the words global variable. If there is no wordlist for the
desired local, fallback to the default english wordlist.
Expand Down Expand Up @@ -166,14 +164,12 @@ def get_wordlist(self, locale: 'Text') -> 'List[str]':
return self.__language2words[locale]

def genrandomid(self,
words_in_random_id=None,
locale='en'):
if words_in_random_id is None:
words_in_random_id = self.DEFAULT_WORDS_IN_RANDOM_ID
words_in_random_id: int = DEFAULT_WORDS_IN_RANDOM_ID,
locale: str = 'en') -> str:
return ' '.join(random.choice(self.get_wordlist(locale))
for x in range(words_in_random_id))

def display_id(self):
def display_id(self) -> str:
"""Generate random journalist_designation until we get an unused one"""

tries = 0
Expand All @@ -190,20 +186,19 @@ def display_id(self):

raise ValueError("Could not generate unique journalist designation for new source")

def hash_codename(self, codename, salt=None):
def hash_codename(self, codename: str, salt: Optional[str] = None) -> str:
"""Salts and hashes a codename using scrypt.

:param str codename: A source's codename.
:param str salt: The salt to mix with the codename when hashing.
:param codename: A source's codename.
:param salt: The salt to mix with the codename when hashing.
:returns: A base32 encoded string; the salted codename hash.
"""
if salt is None:
salt = self.scrypt_id_pepper
return b32encode(scrypt.hash(clean(codename),
salt,
**self.scrypt_params)).decode('utf-8')
_validate_name_for_diceware(codename)
return b32encode(scrypt.hash(codename, salt, **self.scrypt_params)).decode('utf-8')

def genkeypair(self, name, secret):
def genkeypair(self, name: str, secret: str) -> gnupg._parsers.GenKey:
"""Generate a GPG key through batch file key generation. A source's
codename is salted with SCRYPT_GPG_PEPPER and hashed with scrypt to
provide the passphrase used to encrypt their private key. Their name
Expand All @@ -215,15 +210,15 @@ def genkeypair(self, name, secret):
... u'P'
u'P'

:param str name: The source's filesystem id (their codename, salted
:param name: The source's filesystem id (their codename, salted
with SCRYPT_ID_PEPPER, and hashed with scrypt).
:param str secret: The source's codename.
:param secret: The source's codename.
:returns: a :class:`GenKey <gnupg._parser.GenKey>` object, on which
the ``__str__()`` method may be called to return the
generated key's fingeprint.

"""
name = clean(name)
rmol marked this conversation as resolved.
Show resolved Hide resolved
_validate_name_for_diceware(name)
secret = self.hash_codename(secret, salt=self.scrypt_gpg_pepper)
genkey_obj = self.gpg.gen_key(self.gpg.gen_key_input(
key_type=self.GPG_KEY_TYPE,
Expand All @@ -236,7 +231,7 @@ def genkeypair(self, name, secret):
))
return genkey_obj

def find_source_key(self, fingerprint: str) -> typing.Optional[typing.Dict]:
def find_source_key(self, fingerprint: str) -> Optional[Dict]:
"""
Searches the GPG keyring for a source key.

Expand All @@ -257,7 +252,7 @@ def find_source_key(self, fingerprint: str) -> typing.Optional[typing.Dict]:
return None
return None

def delete_reply_keypair(self, source_filesystem_id):
def delete_reply_keypair(self, source_filesystem_id: str) -> None:
fingerprint = self.get_fingerprint(source_filesystem_id)

# If this source was never flagged for review, they won't have a reply
Expand All @@ -279,7 +274,7 @@ def delete_reply_keypair(self, source_filesystem_id):
self.redis.hdel(self.REDIS_KEY_HASH, self.get_fingerprint(source_filesystem_id))
self.redis.hdel(self.REDIS_FINGERPRINT_HASH, source_filesystem_id)

def get_fingerprint(self, name):
def get_fingerprint(self, name: str) -> Optional[str]:
"""
Returns the fingerprint of the GPG key for the given name.

Expand All @@ -297,7 +292,7 @@ def get_fingerprint(self, name):

return None

def get_pubkey(self, name):
def get_pubkey(self, name: str) -> Optional[str]:
"""
Returns the GPG public key for the given name.

Expand All @@ -315,13 +310,11 @@ def get_pubkey(self, name):
self.redis.hset(self.REDIS_KEY_HASH, fingerprint, key)
return key

def encrypt(self, plaintext, fingerprints, output=None):
def encrypt(self, plaintext: str, fingerprints: List[str], output: Optional[str] = None) -> str:
# Verify the output path
if output:
current_app.storage.verify(output)

if not isinstance(fingerprints, (list, tuple)):
fingerprints = [fingerprints, ]
rmol marked this conversation as resolved.
Show resolved Hide resolved
# Remove any spaces from provided fingerprints GPG outputs fingerprints
# with spaces for readability, but requires the spaces to be removed
# when using fingerprints to specify recipients.
Expand All @@ -340,7 +333,7 @@ def encrypt(self, plaintext, fingerprints, output=None):
else:
raise CryptoException(out.stderr)

def decrypt(self, secret, ciphertext):
def decrypt(self, secret: str, ciphertext: bytes) -> str:
"""
>>> crypto = current_app.crypto_util
>>> key = crypto.genkeypair('randomid', 'randomid')
Expand All @@ -356,18 +349,7 @@ def decrypt(self, secret, ciphertext):
return data.decode('utf-8')


def clean(s, also=''):
"""
>>> clean("[]")
Traceback (most recent call last):
...
CryptoException: invalid input: []
>>> clean("Helloworld")
'Helloworld'
"""
for c in s:
if c not in DICEWARE_SAFE_CHARS and c not in also:
raise CryptoException("invalid input: {0}".format(s))
# scrypt.hash requires input of type str. Since the wordlist is all ASCII
# characters, this conversion is not problematic
return str(s)
def _validate_name_for_diceware(name: str) -> None:
rmol marked this conversation as resolved.
Show resolved Hide resolved
for char in name:
if char not in DICEWARE_SAFE_CHARS:
raise CryptoException("invalid input: {0}".format(name))
20 changes: 6 additions & 14 deletions securedrop/journalist_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,28 @@ def create_app(config: 'SDConfig') -> Flask:
template_folder=config.JOURNALIST_TEMPLATES_DIR,
static_folder=path.join(config.SECUREDROP_ROOT, 'static'))

app.config.from_object(config.JournalistInterfaceFlaskConfig) # type: ignore
app.sdconfig = config
rmol marked this conversation as resolved.
Show resolved Hide resolved
app.config.from_object(config.JOURNALIST_APP_FLASK_CONFIG_CLS)
app.session_interface = JournalistInterfaceSessionInterface()

csrf = CSRFProtect(app)
Environment(app)

if config.DATABASE_ENGINE == "sqlite":
db_uri = (config.DATABASE_ENGINE + ":///" +
config.DATABASE_FILE)
else:
db_uri = (
config.DATABASE_ENGINE + '://' +
config.DATABASE_USERNAME + ':' +
config.DATABASE_PASSWORD + '@' +
config.DATABASE_HOST + '/' +
config.DATABASE_NAME
)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
app.config['SQLALCHEMY_DATABASE_URI'] = config.DATABASE_URI
db.init_app(app)

v2_enabled = path.exists(path.join(config.SECUREDROP_DATA_ROOT, 'source_v2_url'))
v3_enabled = path.exists(path.join(config.SECUREDROP_DATA_ROOT, 'source_v3_url'))
app.config.update(V2_ONION_ENABLED=v2_enabled, V3_ONION_ENABLED=v3_enabled)

# TODO: Attaching a Storage dynamically like this disables all type checking (and
# breaks code analysis tools) for code that uses current_app.storage; it should be refactored
app.storage = Storage(config.STORE_DIR,
config.TEMP_DIR,
config.JOURNALIST_KEY)

# TODO: Attaching a CryptoUtil dynamically like this disables all type checking (and
# breaks code analysis tools) for code that uses current_app.storage; it should be refactored
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is really hurts the effectiveness of type annotations and code analysis tools in general (especially for critical code like CryptoUtil).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a solution in mind? A subclass of Flask?

Copy link
Contributor Author

@nabla-c0d3 nabla-c0d3 Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A subclass wouldn't work because the pattern I've seen in the code base is:

from flask import current_app
# [...]
codename = current_app.crypto_util.genrandomid()

There is no way (I think?) to change the class of flask.current_app.

One approach I thought of is to have a global variable with the "current" CryptoUtil or Storage instance (based on the config) so it would be like this:

crypto_util = CryptoUtil.get_current()
crypto_util.genrandomid()

I can make that change but it's a big refactor and should probably be done in a separate PR?

I have a poc branch where I did this with only the diceware / words lists logic (as a first step), that I took out of CryptoUtil: nabla-c0d3/securedrop@typing-annotations-i18n-store-and-misc...nabla-c0d3:refactor-diceware-generator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this?

class JournalistApp(Flask):
    crypto_util = None  # type: CryptoUtil
    storage = None  # type: Storage


def create_app(config: 'SDConfig') -> JournalistApp:
    app = JournalistApp(
        __name__,
        template_folder=config.JOURNALIST_TEMPLATES_DIR,
        static_folder=path.join(config.SECUREDROP_ROOT, 'static')
    )
    [...]

A quick run through tests/test_journalist.py passes, and type(current_app._get_current_object()) returns journalist_app.JournalistApp. You have to use the old comment annotations before 3.6, but could that solve the annotation/static analysis problems you saw?

At any rate, like you said, that change would merit its own PR. Certainly doesn't have to be done here.

Copy link
Contributor Author

@nabla-c0d3 nabla-c0d3 Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it would work for the pattern I described; you can see it there for example: https://github.com/freedomofpress/securedrop/blob/develop/securedrop/journalist_app/main.py#L108

from flask import current_app
# [...]
current_app.crypto_util.encrypt(etc.)

flask.current_app is typed as a _FlaskLocalProxy and I don't see a way to change that.

I think fixing this will require changing how the code retrieves the CryptoUtil instance for the current app.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. With a Flask subclass in place, if I add these two lines there:

        current_app.logger.error(
            "current_app is a {}".format(type(current_app))
        )
        current_app.logger.error(
            "current_app proxies a {}".format(type(current_app._get_current_object()))
        )

The dev server logs this:

[2020-09-29 15:16:14,897] ERROR in main: current_app is a <class 'werkzeug.local.LocalProxy'>
[2020-09-29 15:16:14,897] ERROR in main: current_app proxies a <class 'journalist_app.JournalistApp'>

I was going by the Werkzeug docs. I'm wondering how you're seeing a different proxy class, but it is supposed to be some sort of proxy for an instance of Flask or a derivation of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type does get returned at runtime (as the actual object gets returned, after all), but I don't think this type information is available to mypy when its doing its (static-only) analysis. This is how the method is typed:

class _FlaskLocalProxy(Flask):
    def _get_current_object(self) -> Flask: ...

There is no way to change it to a subclass of Flask. Additionally:

  • The code base only use current_app and does not call _get_current_object() (and people will forget to call it I think).
  • I'm not familiar with it but _get_current_object() seems private.

Overall I think fixing this requires a new approach. I'm also not sure if the CryptoUtil or Storage instances need to be attached to the app - to me it just adds another layer of indirection/complexity compared to doing just CryptoUtil.get_current(). I might be missing something tho.

app.crypto_util = CryptoUtil(
scrypt_params=config.SCRYPT_PARAMS,
scrypt_id_pepper=config.SCRYPT_ID_PEPPER,
Expand Down
Loading