diff --git a/securedrop_client/crypto.py b/securedrop_client/crypto.py index 2174468a5..f44af1ab0 100644 --- a/securedrop_client/crypto.py +++ b/securedrop_client/crypto.py @@ -109,31 +109,30 @@ def _gpg_cmd_base(self) -> list: cmd.extend(['--trust-model', 'always']) return cmd - def import_key(self, source_uuid: UUID, key_data: str) -> None: + def import_key(self, source_uuid: UUID, key_data: str, fingerprint: str) -> None: local_source = self.session.query(Source).filter_by(uuid=source_uuid).one() - fingerprints = self._import(key_data) - if len(fingerprints) != 1: - raise RuntimeError('Expected exactly one fingerprint.') + self._import(key_data) - local_source.fingerprint = fingerprints.pop() + local_source.fingerprint = fingerprint self.session.add(local_source) self.session.commit() - def _import(self, key_data: str, is_private: bool = False) -> set: + def _import(self, key_data: str) -> None: '''Wrapper for `gpg --import-keys`''' - cmd = self._gpg_cmd_base() - if is_private: - cmd.append('--allow-secret-key-import') with tempfile.NamedTemporaryFile('w+') as temp_key, \ tempfile.NamedTemporaryFile('w+') as stdout, \ tempfile.NamedTemporaryFile('w+') as stderr: temp_key.write(key_data) temp_key.seek(0) - cmd.extend(['--import-options', 'import-show', - '--with-colons', '--import', - temp_key.name]) + if self.is_qubes: # pragma: no cover + cmd = ['qubes-gpg-import-key', temp_key.name] + else: + cmd = self._gpg_cmd_base() + cmd.extend(['--import-options', 'import-show', + '--with-colons', '--import', + temp_key.name]) try: subprocess.check_call(cmd, stdout=stdout, stderr=stderr) @@ -142,24 +141,6 @@ def _import(self, key_data: str, is_private: bool = False) -> set: logger.error('Could not import key: {}\n{}'.format(e, stderr.read())) raise CryptoError('Could not import key.') - stdout.seek(0) - # this is to ensure we only read the fingerprint attached to the public key - # and not a subkey - reading_pub = False - key_fingerprints = set() - for line in stdout: - if line.startswith('pub'): - reading_pub = True - continue - if not line.startswith('fpr'): - continue - if not reading_pub: - continue - key_fingerprints.add(line.split(':')[9]) - reading_pub = False - - return key_fingerprints - def encrypt_to_source(self, source_uuid: str, data: str) -> str: ''' :param data: A string of data to encrypt to a source. @@ -177,9 +158,14 @@ def encrypt_to_source(self, source_uuid: str, data: str) -> str: cmd.extend(['--encrypt', '-r', source.fingerprint, '-r', self.journalist_key_fingerprint, - '--armor', - '-o-', # write to stdout - content.name]) + '--armor']) + if not self.is_qubes: + # In Qubes, the ciphertext will go to stdout. + # In addition the option below cannot be passed + # through the gpg client wrapper. + cmd.extend(['-o-']) # write to stdout + cmd.extend([content.name]) + try: subprocess.check_call(cmd, stdout=stdout, stderr=stderr) except subprocess.CalledProcessError as e: diff --git a/securedrop_client/logic.py b/securedrop_client/logic.py index 9e4c81bd8..d3f98b9b1 100644 --- a/securedrop_client/logic.py +++ b/securedrop_client/logic.py @@ -395,10 +395,11 @@ def on_sync_success(self, result) -> None: for source in remote_sources: if source.key and source.key.get('type', None) == 'PGP': pub_key = source.key.get('public', None) - if not pub_key: + fingerprint = source.key.get('fingerprint', None) + if not pub_key or not fingerprint: continue try: - self.gpg.import_key(source.uuid, pub_key) + self.gpg.import_key(source.uuid, pub_key, fingerprint) except CryptoError: logger.warning('Failed to import key for source {}'.format(source.uuid)) diff --git a/tests/test_crypto.py b/tests/test_crypto.py index 7c4e4d804..6696c6447 100644 --- a/tests/test_crypto.py +++ b/tests/test_crypto.py @@ -79,7 +79,7 @@ def test_import_key(homedir, config, source): Using the `config` fixture to ensure the config is written to disk. ''' helper = GpgHelper(homedir, is_qubes=False) - helper.import_key(source['uuid'], source['public_key']) + helper.import_key(source['uuid'], source['public_key'], source['fingerprint']) def test_import_key_gpg_call_fail(homedir, config, mocker): @@ -99,21 +99,6 @@ def test_import_key_gpg_call_fail(homedir, config, mocker): assert mock_call.called -def test_import_key_multiple_fingerprints(homedir, source, config, mocker): - ''' - Check that an error is raised if multiple fingerpints are found on key import. - Using the `config` fixture to ensure the config is written to disk. - ''' - helper = GpgHelper(homedir, is_qubes=False) - mock_import = mocker.patch.object(helper, '_import', returnvalue={'a', 'b'}) - - with pytest.raises(RuntimeError, match='Expected exactly one fingerprint\\.'): - helper.import_key(source['uuid'], source['public_key']) - - # ensure the mock was used - assert mock_import.called - - def test_encrypt(homedir, source, config, mocker): ''' Check that calling `encrypt` encrypts the message. @@ -123,7 +108,7 @@ def test_encrypt(homedir, source, config, mocker): # first we have to ensure the pubkeys are available helper._import(PUB_KEY) - helper._import(JOURNO_KEY, is_private=True) + helper._import(JOURNO_KEY) plaintext = 'bueller?' cyphertext = helper.encrypt_to_source(source['uuid'], plaintext) @@ -158,7 +143,6 @@ def test_encrypt_fail(homedir, source, config, mocker): # first we have to ensure the pubkeys are available helper._import(PUB_KEY) - helper._import(JOURNO_KEY, is_private=True) plaintext = 'bueller?' diff --git a/tests/test_logic.py b/tests/test_logic.py index 266db98bf..fdf2dc25a 100644 --- a/tests/test_logic.py +++ b/tests/test_logic.py @@ -533,6 +533,7 @@ def test_Controller_on_sync_success_with_key_import_fail(homedir, config, mocker mock_source.key = { 'type': 'PGP', 'public': PUB_KEY, + 'fingerprint': 'ABCDEFGHIJKLMAO' } mock_sources = [mock_source]