Skip to content

Commit

Permalink
decrypt encrypted files
Browse files Browse the repository at this point in the history
  • Loading branch information
carderne committed Aug 16, 2024
1 parent 4c36022 commit 3111deb
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 39 deletions.
144 changes: 112 additions & 32 deletions sigexport/files.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,82 @@
import base64
import hashlib
import hmac
import shutil
from collections.abc import Iterable
from datetime import datetime
from pathlib import Path

from typer import colors
from Crypto.Cipher import AES
from typer import colors, secho

from sigexport import models
from sigexport.logging import log

CIPHER_KEY_SIZE = 32
IV_SIZE = AES.block_size
MAC_KEY_SIZE = 32
MAC_SIZE = hashlib.sha256().digest_size


def decrypt_attachment(att: dict[str, str], src_path: Path, dst_path: Path) -> None:
"""Decrypt attachment and save to `dst_path`.
Code adapted from:
https://github.com/tbvdm/sigtop
"""
try:
keys = base64.b64decode(att["localKey"])
except KeyError:
raise ValueError("No key in attachment")
except Exception as e:
raise ValueError(f"Cannot decode keys: {str(e)}")

if len(keys) != CIPHER_KEY_SIZE + MAC_KEY_SIZE:
raise ValueError("Invalid keys length")

cipher_key = keys[:CIPHER_KEY_SIZE]
mac_key = keys[CIPHER_KEY_SIZE:]

try:
with open(src_path, "rb") as fp:
data = fp.read()
except Exception as e:
raise ValueError(f"Failed to read file: {str(e)}")

if len(data) < IV_SIZE + MAC_SIZE:
raise ValueError("Attachment data too short")

iv = data[:IV_SIZE]
their_mac = data[-MAC_SIZE:]
data = data[IV_SIZE:-MAC_SIZE]

if len(data) % AES.block_size != 0:
raise ValueError("Invalid attachment data length")

m = hmac.new(mac_key, iv + data, hashlib.sha256)
our_mac = m.digest()

if not hmac.compare_digest(our_mac, their_mac):
raise ValueError("MAC mismatch")

try:
cipher = AES.new(cipher_key, AES.MODE_CBC, iv)
decrypted_data = cipher.decrypt(data)
except Exception as e:
raise ValueError(f"Decryption failed: {str(e)}")

if len(decrypted_data) < int(att["size"]):
raise ValueError("Invalid attachment data length")

data_decrypted = decrypted_data[: att["size"]]
with open(dst_path, "wb") as fp:
fp.write(data_decrypted)


def copy_attachments(
src: Path, dest: Path, convos: models.Convos, contacts: models.Contacts
) -> Iterable[tuple[Path, Path]]:
) -> None:
"""Copy attachments and reorganise in destination directory."""
src_att = Path(src) / "attachments.noindex"
src_root = Path(src) / "attachments.noindex"
dest = Path(dest)

for key, messages in convos.items():
Expand All @@ -22,8 +85,8 @@ def copy_attachments(
# some contact names are None
if not name:
name = "None"
contact_path = dest / name / "media"
contact_path.mkdir(exist_ok=True, parents=True)
dst_root = dest / name / "media"
dst_root.mkdir(exist_ok=True, parents=True)
for msg in messages:
if msg.attachments:
attachments = msg.attachments
Expand All @@ -33,35 +96,52 @@ def copy_attachments(
.replace(":", "-")
)
for i, att in enumerate(attachments):
# Account for no fileName key
file_name = str(att["fileName"]) if "fileName" in att else "None"
# Sometimes the key is there but it is None, needs extension
if "." not in file_name:
content_type = att.get("contentType", "").split("/")
if len(content_type) > 1:
ext = content_type[1]
else:
ext = content_type[0]
file_name += "." + ext
att["fileName"] = (
f"{date}_{i:02}_{file_name}".replace(" ", "_")
.replace("/", "-")
.replace(",", "")
.replace(":", "-")
.replace("|", "-")
)
# account for erroneous backslash in path
try:
# Account for no fileName key
file_name = (
str(att["fileName"]) if "fileName" in att else "None"
)
# Sometimes the key is there but it is None, needs extension
if "." not in file_name:
content_type = att["contentType"].split("/")
try:
ext = content_type[1]
except IndexError:
ext = content_type[0]
file_name += "." + ext
att["fileName"] = (
f"{date}_{i:02}_{file_name}".replace(" ", "_")
.replace("/", "-")
.replace(",", "")
.replace(":", "-")
.replace("|", "-")
)
# account for erroneous backslash in path
att_path = str(att["path"]).replace("\\", "/")
yield src_att / att_path, contact_path / att["fileName"]
except KeyError:
p = att["path"] if "path" in att else ""
log(f"\t\tBroken attachment:\t{name}\t{p}")
except FileNotFoundError:
p = att["path"] if "path" in att else ""
log(f"\t\tAttachment not found:\t{name}\t{p}")
log(f"\t\tBroken attachment:\t{name}")
continue
src_path = src_root / att_path
dst_path = dst_root / att["fileName"]
if int(att.get("version", 0)) >= 2:
try:
decrypt_attachment(att, src_path, dst_path)
except ValueError as e:
secho(
f"Failed to decrypt {src_path} error {e}, skipping",
fg=colors.MAGENTA,
)
else:
try:
shutil.copy2(src_path, dst_path)
except FileNotFoundError:
secho(
f"No file to copy at {src_path}, skipping!",
fg=colors.MAGENTA,
)
except OSError as exc:
secho(
f"Error copying file {src_path}, skipping!\n{exc}",
fg=colors.MAGENTA,
)
else:
msg.attachments = []

Expand Down
8 changes: 1 addition & 7 deletions sigexport/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,7 @@ def main(
contacts = utils.fix_names(contacts)

secho("Copying and renaming attachments")
for att_src, att_dst in files.copy_attachments(source_dir, dest, convos, contacts):
try:
shutil.copy2(att_src, att_dst)
except FileNotFoundError:
secho(f"No file to copy at {att_src}, skipping!", fg=colors.MAGENTA)
except OSError as exc:
secho(f"Error copying file {att_src}, skipping!\n{exc}", fg=colors.MAGENTA)
files.copy_attachments(source_dir, dest, convos, contacts)

if json_output and old:
secho(
Expand Down

0 comments on commit 3111deb

Please sign in to comment.