-
Notifications
You must be signed in to change notification settings - Fork 690
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Migrate public keys from GPG to database-backed storage
Add an alembic migration that iterates over the GPG keyring, identifies source keys, exports them from GPG and saves them into the database. The main failure risks are the interactions with GPG. We already run `gpg.list_keys()` on startup, so it's unlikely that's broken (and if it is, we have bigger problems). So the main concern is exporting the key might fail. The export operation is wrapped in a try/except and we validate the exported key we get from GPG. In theory we could remove the GPG fallbacks from `Source.fingerprint` and `Source.public_key` but since that will require reworking a number of tests it would be better to do it after the secret key migration is in place too. Fixes #6800.
- Loading branch information
Showing
3 changed files
with
207 additions
and
1 deletion.
There are no files selected for viewing
92 changes: 92 additions & 0 deletions
92
securedrop/alembic/versions/17c559a7a685_pgp_public_keys.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
"""PGP public keys | ||
Revision ID: 17c559a7a685 | ||
Revises: 811334d7105f | ||
Create Date: 2023-09-21 12:33:56.550634 | ||
""" | ||
|
||
import traceback | ||
|
||
import pretty_bad_protocol as gnupg | ||
import sqlalchemy as sa | ||
from alembic import op | ||
from encryption import EncryptionManager | ||
from sdconfig import SecureDropConfig | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "17c559a7a685" | ||
down_revision = "811334d7105f" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade() -> None: | ||
""" | ||
Migrate public keys from the GPG keyring into the SQLite database | ||
We iterate over all the secret keys in the keyring, identify the | ||
corresponding source, export the key and save it in the database. | ||
""" | ||
config = SecureDropConfig.get_current() | ||
gpg = gnupg.GPG( | ||
binary="gpg2", | ||
homedir=str(config.GPG_KEY_DIR), | ||
options=["--pinentry-mode loopback", "--trust-model direct"], | ||
) | ||
# Source keys all have a secret key, so we can filter on that | ||
for keyinfo in gpg.list_keys(secret=True): | ||
if len(keyinfo["uids"]) > 1: | ||
# Source keys should only have one UID | ||
continue | ||
uid = keyinfo["uids"][0] | ||
search = EncryptionManager.SOURCE_KEY_UID_RE.search(uid) | ||
if not search: | ||
# Didn't match at all | ||
continue | ||
filesystem_id = search.group(2) | ||
# Check that it's a valid ID | ||
conn = op.get_bind() | ||
result = conn.execute( | ||
sa.text( | ||
""" | ||
SELECT pgp_public_key, pgp_fingerprint | ||
FROM sources | ||
WHERE filesystem_id=:filesystem_id | ||
""" | ||
).bindparams(filesystem_id=filesystem_id) | ||
).first() | ||
if result != (None, None): | ||
# Either not in the database or there's already some data in the DB. | ||
# In any case, skip. | ||
continue | ||
fingerprint = keyinfo["fingerprint"] | ||
try: | ||
public_key = gpg.export_keys(fingerprint) | ||
except: # noqa: E722 | ||
traceback.print_exc() | ||
continue | ||
if not public_key.startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----"): | ||
# FIXME: can we have a stronger well-formedness check here? | ||
continue | ||
# Save to database | ||
op.execute( | ||
sa.text( | ||
""" | ||
UPDATE sources | ||
SET pgp_public_key=:pgp_public_key, pgp_fingerprint=:pgp_fingerprint | ||
WHERE filesystem_id=:filesystem_id | ||
""" | ||
).bindparams( | ||
pgp_public_key=public_key, | ||
pgp_fingerprint=fingerprint, | ||
filesystem_id=filesystem_id, | ||
) | ||
) | ||
|
||
|
||
def downgrade() -> None: | ||
""" | ||
This is a non-destructive operation, so it's not worth implementing a | ||
migration from database storage to GPG. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import uuid | ||
|
||
import pretty_bad_protocol as gnupg | ||
from db import db | ||
from journalist_app import create_app | ||
from sqlalchemy import text | ||
|
||
|
||
class UpgradeTester: | ||
def __init__(self, config): | ||
"""This function MUST accept an argument named `config`. | ||
You will likely want to save a reference to the config in your | ||
class so you can access the database later. | ||
""" | ||
self.config = config | ||
self.app = create_app(self.config) | ||
self.gpg = gnupg.GPG( | ||
binary="gpg2", | ||
homedir=str(config.GPG_KEY_DIR), | ||
options=["--pinentry-mode loopback", "--trust-model direct"], | ||
) | ||
self.fingerprint = None | ||
# random, chosen by fair dice roll | ||
self.filesystem_id = ( | ||
"HAR5WIY3C4K3MMIVLYXER7DMTYCL5PWZEPNOCR2AIBCVWXDZQDMDFUHEFJM" | ||
"Z3JW5D6SLED3YKCBDAKNMSIYOKWEJK3ZRJT3WEFT3S5Q=" | ||
) | ||
|
||
def load_data(self): | ||
"""Create a source and GPG key pair""" | ||
with self.app.app_context(): | ||
source = { | ||
"uuid": str(uuid.uuid4()), | ||
"filesystem_id": self.filesystem_id, | ||
"journalist_designation": "psychic webcam", | ||
"interaction_count": 0, | ||
} | ||
sql = """\ | ||
INSERT INTO sources (uuid, filesystem_id, journalist_designation, | ||
interaction_count) | ||
VALUES (:uuid, :filesystem_id, :journalist_designation, | ||
:interaction_count)""" | ||
db.engine.execute(text(sql), **source) | ||
# Generate the GPG key pair | ||
gen_key_input = self.gpg.gen_key_input( | ||
passphrase="correct horse battery staple", | ||
name_email=self.filesystem_id, | ||
key_type="RSA", | ||
key_length=4096, | ||
name_real="Source Key", | ||
creation_date="2013-05-14", | ||
expire_date="0", | ||
) | ||
key = self.gpg.gen_key(gen_key_input) | ||
self.fingerprint = str(key.fingerprint) | ||
|
||
def check_upgrade(self): | ||
"""Verify PGP fields have been populated""" | ||
with self.app.app_context(): | ||
query_sql = """\ | ||
SELECT pgp_fingerprint, pgp_public_key, pgp_secret_key | ||
FROM sources | ||
WHERE filesystem_id = :filesystem_id""" | ||
source = db.engine.execute( | ||
text(query_sql), | ||
filesystem_id=self.filesystem_id, | ||
).fetchone() | ||
assert source[0] == self.fingerprint | ||
assert source[1].startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----") | ||
assert source[2] is None | ||
|
||
|
||
class DowngradeTester: | ||
def __init__(self, config): | ||
self.config = config | ||
self.app = create_app(self.config) | ||
self.uuid = str(uuid.uuid4()) | ||
|
||
def load_data(self): | ||
"""Create a source with a PGP key pair already migrated""" | ||
with self.app.app_context(): | ||
source = { | ||
"uuid": self.uuid, | ||
"filesystem_id": "1234", | ||
"journalist_designation": "mucky pine", | ||
"interaction_count": 0, | ||
"pgp_fingerprint": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", | ||
"pgp_public_key": "very public", | ||
"pgp_secret_key": None, | ||
} | ||
sql = """\ | ||
INSERT INTO sources (uuid, filesystem_id, journalist_designation, | ||
interaction_count, pgp_fingerprint, pgp_public_key, pgp_secret_key) | ||
VALUES (:uuid, :filesystem_id, :journalist_designation, | ||
:interaction_count, :pgp_fingerprint, :pgp_public_key, :pgp_secret_key)""" | ||
db.engine.execute(text(sql), **source) | ||
|
||
def check_downgrade(self): | ||
"""Verify the downgrade does nothing, i.e. the two PGP fields are still populated""" | ||
with self.app.app_context(): | ||
sql = """\ | ||
SELECT pgp_fingerprint, pgp_public_key, pgp_secret_key | ||
FROM sources | ||
WHERE uuid = :uuid""" | ||
source = db.engine.execute( | ||
text(sql), | ||
uuid=self.uuid, | ||
).fetchone() | ||
print(source) | ||
assert source == ( | ||
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", | ||
"very public", | ||
None, | ||
) |