Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix gpg key import / replies / encrypt to source on qubes #377

Merged
merged 2 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions securedrop_client/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,30 @@ def _gpg_cmd_base(self) -> list:
cmd.extend(['--trust-model', 'always'])
return cmd

def import_key(self, source_uuid: UUID, key_data: str) -> None:
def import_key(self, source_uuid: UUID, key_data: str, fingerprint: str) -> None:
local_source = self.session.query(Source).filter_by(uuid=source_uuid).one()

fingerprints = self._import(key_data)
if len(fingerprints) != 1:
raise RuntimeError('Expected exactly one fingerprint.')
self._import(key_data)

local_source.fingerprint = fingerprints.pop()
local_source.fingerprint = fingerprint
self.session.add(local_source)
self.session.commit()

def _import(self, key_data: str, is_private: bool = False) -> set:
def _import(self, key_data: str) -> None:
'''Wrapper for `gpg --import-keys`'''
cmd = self._gpg_cmd_base()
if is_private:
cmd.append('--allow-secret-key-import')

with tempfile.NamedTemporaryFile('w+') as temp_key, \
tempfile.NamedTemporaryFile('w+') as stdout, \
tempfile.NamedTemporaryFile('w+') as stderr:
temp_key.write(key_data)
temp_key.seek(0)
cmd.extend(['--import-options', 'import-show',
'--with-colons', '--import',
temp_key.name])
if self.is_qubes: # pragma: no cover
cmd = ['qubes-gpg-import-key', temp_key.name]
else:
cmd = self._gpg_cmd_base()
cmd.extend(['--import-options', 'import-show',
'--with-colons', '--import',
temp_key.name])

try:
subprocess.check_call(cmd, stdout=stdout, stderr=stderr)
Expand All @@ -142,24 +141,6 @@ def _import(self, key_data: str, is_private: bool = False) -> set:
logger.error('Could not import key: {}\n{}'.format(e, stderr.read()))
raise CryptoError('Could not import key.')

stdout.seek(0)
# this is to ensure we only read the fingerprint attached to the public key
# and not a subkey
reading_pub = False
key_fingerprints = set()
for line in stdout:
if line.startswith('pub'):
reading_pub = True
continue
if not line.startswith('fpr'):
continue
if not reading_pub:
continue
key_fingerprints.add(line.split(':')[9])
reading_pub = False

return key_fingerprints

def encrypt_to_source(self, source_uuid: str, data: str) -> str:
'''
:param data: A string of data to encrypt to a source.
Expand All @@ -177,9 +158,14 @@ def encrypt_to_source(self, source_uuid: str, data: str) -> str:
cmd.extend(['--encrypt',
'-r', source.fingerprint,
'-r', self.journalist_key_fingerprint,
'--armor',
'-o-', # write to stdout
content.name])
'--armor'])
if not self.is_qubes:
# In Qubes, the ciphertext will go to stdout.
# In addition the option below cannot be passed
# through the gpg client wrapper.
cmd.extend(['-o-']) # write to stdout
cmd.extend([content.name])

try:
subprocess.check_call(cmd, stdout=stdout, stderr=stderr)
except subprocess.CalledProcessError as e:
Expand Down
5 changes: 3 additions & 2 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,11 @@ def on_sync_success(self, result) -> None:
for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
if not pub_key:
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
continue
try:
self.gpg.import_key(source.uuid, pub_key)
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))

Expand Down
20 changes: 2 additions & 18 deletions tests/test_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_import_key(homedir, config, source):
Using the `config` fixture to ensure the config is written to disk.
'''
helper = GpgHelper(homedir, is_qubes=False)
helper.import_key(source['uuid'], source['public_key'])
helper.import_key(source['uuid'], source['public_key'], source['fingerprint'])


def test_import_key_gpg_call_fail(homedir, config, mocker):
Expand All @@ -99,21 +99,6 @@ def test_import_key_gpg_call_fail(homedir, config, mocker):
assert mock_call.called


def test_import_key_multiple_fingerprints(homedir, source, config, mocker):
'''
Check that an error is raised if multiple fingerpints are found on key import.
Using the `config` fixture to ensure the config is written to disk.
'''
helper = GpgHelper(homedir, is_qubes=False)
mock_import = mocker.patch.object(helper, '_import', returnvalue={'a', 'b'})

with pytest.raises(RuntimeError, match='Expected exactly one fingerprint\\.'):
helper.import_key(source['uuid'], source['public_key'])

# ensure the mock was used
assert mock_import.called


def test_encrypt(homedir, source, config, mocker):
'''
Check that calling `encrypt` encrypts the message.
Expand All @@ -123,7 +108,7 @@ def test_encrypt(homedir, source, config, mocker):

# first we have to ensure the pubkeys are available
helper._import(PUB_KEY)
helper._import(JOURNO_KEY, is_private=True)
helper._import(JOURNO_KEY)

plaintext = 'bueller?'
cyphertext = helper.encrypt_to_source(source['uuid'], plaintext)
Expand Down Expand Up @@ -158,7 +143,6 @@ def test_encrypt_fail(homedir, source, config, mocker):

# first we have to ensure the pubkeys are available
helper._import(PUB_KEY)
helper._import(JOURNO_KEY, is_private=True)

plaintext = 'bueller?'

Expand Down
1 change: 1 addition & 0 deletions tests/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ def test_Controller_on_sync_success_with_key_import_fail(homedir, config, mocker
mock_source.key = {
'type': 'PGP',
'public': PUB_KEY,
'fingerprint': 'ABCDEFGHIJKLMAO'
}

mock_sources = [mock_source]
Expand Down