diff --git a/.gitignore b/.gitignore index abdec692..0caf67a9 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,8 @@ __pycache__ *.tfstate.backup credentials.json token.json +.hash +.pot _ignore* *.tfvars !tests/fixtures/*/** diff --git a/Brewfile b/Brewfile index ad5613cd..08f66307 100644 --- a/Brewfile +++ b/Brewfile @@ -5,3 +5,4 @@ brew "tesseract" brew "terraform" brew "precommit" brew "make" +brwe "john-jumbo" diff --git a/monopoly/banks/hsbc.py b/monopoly/banks/hsbc.py index 793b900d..4eb63075 100644 --- a/monopoly/banks/hsbc.py +++ b/monopoly/banks/hsbc.py @@ -24,7 +24,8 @@ class Hsbc(BankBase): ) pdf_config = PdfConfig( - password=settings.hsbc_pdf_password, + password=settings.hsbc_pdf_password_prefix, page_range=(0, -1), page_bbox=(0, 0, 379, 842), + brute_force_mask="?d?d?d?d?d?d", ) diff --git a/monopoly/config.py b/monopoly/config.py index 7cf02de2..575d537e 100644 --- a/monopoly/config.py +++ b/monopoly/config.py @@ -8,7 +8,7 @@ class Settings(BaseSettings): secret_id: str = "" gcs_bucket: str = "" ocbc_pdf_password: str = "" - hsbc_pdf_password: str = "" + hsbc_pdf_password_prefix: str = "" trusted_user_emails: list = [] model_config = SettingsConfigDict(env_file=".env", extra="allow") diff --git a/monopoly/helpers/pdf2john.py b/monopoly/helpers/pdf2john.py new file mode 100644 index 00000000..4d517536 --- /dev/null +++ b/monopoly/helpers/pdf2john.py @@ -0,0 +1,355 @@ +# pylint: skip-file +#!/usr/bin/env python + +# Copyright (c) 2013 Shane Quigley, < shane at softwareontheside.info > + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import re +import sys +from xml.dom import minidom + +PY3 = sys.version_info[0] == 3 + + +class PdfHashExtractor: + def __init__(self, file_name): + self.file_name = file_name + f = open(file_name, "rb") + self.encrypted = f.read() + f.close() + self.process = True + psr = re.compile(b"PDF-\d\.\d") + try: + self.pdf_spec = psr.findall(self.encrypted)[0] + except IndexError: + sys.stderr.write("%s is not a PDF file!\n" % file_name) + self.process = False + + def parse(self): + if not self.process: + return + + try: + trailer = self.get_trailer() + except RuntimeError: + e = sys.exc_info()[1] + sys.stderr.write("%s : %s\n" % (self.file_name, str(e))) + return + # print >> sys.stderr, trailer + object_id = self.get_object_id(b"Encrypt", trailer) + # print >> sys.stderr, object_id + if len(object_id) == 0: + raise RuntimeError("Could not find object id") + encryption_dictionary = self.get_encryption_dictionary(object_id) + # print >> sys.stderr, encryption_dictionary + dr = re.compile(b"\d+") + vr = re.compile(b"\/V \d") + rr = re.compile(b"\/R \d") + try: + v = dr.findall(vr.findall(encryption_dictionary)[0])[0] + except IndexError: + raise RuntimeError("Could not find /V") + r = dr.findall(rr.findall(encryption_dictionary)[0])[0] + lr = re.compile(b"\/Length \d+") + longest = 0 + # According to the docs: + # Length : (Optional; PDF 1.4; only if V is 2 or 3). Default value: 40 + length = b"40" + for le in lr.findall(encryption_dictionary): + if int(dr.findall(le)[0]) > longest: + longest = int(dr.findall(le)[0]) + length = dr.findall(le)[0] + pr = re.compile(b"\/P -?\d+") + try: + p = pr.findall(encryption_dictionary)[0] + except IndexError: + # print >> sys.stderr, "** dict:", encryption_dictionary + raise RuntimeError("Could not find /P") + pr = re.compile(b"-?\d+") + p = pr.findall(p)[0] + meta = "1" if self.is_meta_data_encrypted(encryption_dictionary) else "0" + idr = re.compile(b"\/ID\s*\[\s*<\w+>\s*<\w+>\s*\]") + try: + i_d = idr.findall(trailer)[0] # id key word + except IndexError: + # some pdf files use () instead of <> + idr = re.compile(b"\/ID\s*\[\s*\(\w+\)\s*\(\w+\)\s*\]") + try: + i_d = idr.findall(trailer)[0] # id key word + except IndexError: + # print >> sys.stderr, "** idr:", idr + # print >> sys.stderr, "** trailer:", trailer + raise RuntimeError("Could not find /ID tag") + + idr = re.compile(b"<\w+>") + try: + i_d = idr.findall(trailer)[0] + except IndexError: + idr = re.compile(b"\(\w+\)") + i_d = idr.findall(trailer)[0] + i_d = i_d.replace(b"<", b"") + i_d = i_d.replace(b">", b"") + i_d = i_d.lower() + passwords = self.get_passwords_for_JtR(encryption_dictionary) + output = ( + "$pdf$" + + v.decode("ascii") + + "*" + + r.decode("ascii") + + "*" + + length.decode("ascii") + + "*" + ) + output += p.decode("ascii") + "*" + meta + "*" + output += str(int(len(i_d) / 2)) + "*" + i_d.decode("ascii") + "*" + passwords + print(output) + return output + + def get_passwords_for_JtR(self, encryption_dictionary): + output = "" + letters = [b"U", b"O"] + if b"1.7" in self.pdf_spec: + letters = [b"U", b"O", b"UE", b"OE"] + for let in letters: + pr_str = b"\/" + let + b"\s*\([^)]+\)" + pr = re.compile(pr_str) + pas = pr.findall(encryption_dictionary) + if len(pas) > 0: + pas = pr.findall(encryption_dictionary)[0] + # because regexs in python suck <=== LOL + while pas[-2] == b"\\": + pr_str += b"[^)]+\)" + pr = re.compile(pr_str) + # print >> sys.stderr, "pr_str:", pr_str + # print >> sys.stderr, encryption_dictionary + try: + pas = pr.findall(encryption_dictionary)[0] + except IndexError: + break + output += self.get_password_from_byte_string(pas) + "*" + else: + pr = re.compile(let + b"\s*<\w+>") + pas = pr.findall(encryption_dictionary) + if not pas: + continue + pas = pas[0] + pr = re.compile(b"<\w+>") + pas = pr.findall(pas)[0] + pas = pas.replace(b"<", b"") + pas = pas.replace(b">", b"") + if PY3: + output += ( + str(int(len(pas) / 2)) + "*" + str(pas.lower(), "ascii") + "*" + ) + else: + output += str(int(len(pas) / 2)) + "*" + pas.lower() + "*" + return output[:-1] + + def is_meta_data_encrypted(self, encryption_dictionary): + mr = re.compile(b"\/EncryptMetadata\s\w+") + if len(mr.findall(encryption_dictionary)) > 0: + wr = re.compile(b"\w+") + is_encrypted = wr.findall(mr.findall(encryption_dictionary)[0])[-1] + if is_encrypted == b"false": + return False + else: + return True + else: + return True + + def parse_meta_data(self, trailer): + root_object_id = self.get_object_id(b"Root", trailer) + root_object = self.get_pdf_object(root_object_id) + object_id = self.get_object_id(b"Metadata", root_object) + xmp_metadata_object = self.get_pdf_object(object_id) + return self.get_xmp_values(xmp_metadata_object) + + def get_xmp_values(self, xmp_metadata_object): + xmp_metadata_object = xmp_metadata_object.partition(b"stream")[2] + xmp_metadata_object = xmp_metadata_object.partition(b"endstream")[0] + try: + xml_metadata = minidom.parseString(xmp_metadata_object) + except: + return "" + values = [] + values.append(self.get_dc_value("title", xml_metadata)) + values.append(self.get_dc_value("creator", xml_metadata)) + values.append(self.get_dc_value("description", xml_metadata)) + values.append(self.get_dc_value("subject", xml_metadata)) + created_year = xml_metadata.getElementsByTagName("xmp:CreateDate") + if len(created_year) > 0: + created_year = created_year[0].firstChild.data[0:4] + values.append(str(created_year)) + return " ".join(values).replace(":", "") + + def get_dc_value(self, value, xml_metadata): + output = xml_metadata.getElementsByTagName("dc:" + value) + if len(output) > 0: + output = output[0] + output = output.getElementsByTagName("rdf:li")[0] + if output.firstChild: + output = output.firstChild.data + return output + return "" + + def get_encryption_dictionary(self, object_id): + encryption_dictionary = self.get_pdf_object(object_id) + for o in encryption_dictionary.split(b"endobj"): + if object_id + b" obj" in o: + encryption_dictionary = o + return encryption_dictionary + + def get_object_id(self, name, trailer): + oir = re.compile(b"\/" + name + b"\s\d+\s\d\sR") + try: + object_id = oir.findall(trailer)[0] + except IndexError: + # print >> sys.stderr, " ** get_object_id: name \"", name, "\", trailer ", trailer + return "" + oir = re.compile(b"\d+ \d") + object_id = oir.findall(object_id)[0] + return object_id + + def get_pdf_object(self, object_id): + output = ( + object_id + + b" obj" + + self.encrypted.partition(b"\r" + object_id + b" obj")[2] + ) + if output == object_id + b" obj": + output = ( + object_id + + b" obj" + + self.encrypted.partition(b"\n" + object_id + b" obj")[2] + ) + output = output.partition(b"endobj")[0] + b"endobj" + # print >> sys.stderr, output + return output + + def get_trailer(self): + trailer = self.get_data_between(b"trailer", b">>", b"/ID") + if trailer == b"": + trailer = self.get_data_between(b"DecodeParms", b"stream", b"") + if trailer == "": + raise RuntimeError("Can't find trailer") + if trailer != "" and trailer.find(b"Encrypt") == -1: + # print >> sys.stderr, trailer + raise RuntimeError("File not encrypted") + return trailer + + def get_data_between(self, s1, s2, tag): + output = b"" + inside_first = False + lines = re.split(b"\n|\r", self.encrypted) + for line in lines: + inside_first = inside_first or line.find(s1) != -1 + if inside_first: + output += line + if line.find(s2) != -1: + if tag == b"" or output.find(tag) != -1: + break + else: + output = b"" + inside_first = False + return output + + def get_hex_byte(self, o_or_u, i): + if PY3: + return hex(o_or_u[i]).replace("0x", "") + else: + return hex(ord(o_or_u[i])).replace("0x", "") + + def get_password_from_byte_string(self, o_or_u): + pas = "" + escape_seq = False + escapes = 0 + excluded_indexes = [0, 1, 2] + # For UE & OE in 1.7 spec + if not PY3: + if o_or_u[2] != "(": + excluded_indexes.append(3) + else: + if o_or_u[2] != 40: + excluded_indexes.append(3) + for i in range(len(o_or_u)): + if i not in excluded_indexes: + if len(self.get_hex_byte(o_or_u, i)) == 1 and o_or_u[i] != "\\"[0]: + pas += "0" # need to be 2 digit hex numbers + is_back_slash = True + if not PY3: + is_back_slash = o_or_u[i] != "\\"[0] + else: + is_back_slash = o_or_u[i] != 92 + if is_back_slash or escape_seq: + if escape_seq: + if not PY3: + esc = "\\" + o_or_u[i] + else: + esc = "\\" + chr(o_or_u[i]) + esc = self.unescape(esc) + if len(hex(ord(esc[0])).replace("0x", "")) == 1: + pas += "0" + pas += hex(ord(esc[0])).replace("0x", "") + escape_seq = False + else: + pas += self.get_hex_byte(o_or_u, i) + else: + escape_seq = True + escapes += 1 + output = len(o_or_u) - (len(excluded_indexes) + 1) - escapes + return str(output) + "*" + pas[:-2] + + def unescape(self, esc): + escape_seq_map = { + "\\n": "\n", + "\\s": "\s", + "\\e": "\e", + "\\r": "\r", + "\\t": "\t", + "\\v": "\v", + "\\f": "\f", + "\\b": "\b", + "\\a": "\a", + "\\)": ")", + "\\(": "(", + "\\\\": "\\", + "\\0": "\0", + } + + return escape_seq_map[esc] + + +if __name__ == "__main__": + if len(sys.argv) < 2: + sys.stderr.write("Usage: %s \n" % os.path.basename(sys.argv[0])) + sys.exit(-1) + for j in range(1, len(sys.argv)): + if not PY3: + filename = sys.argv[j].decode("UTF-8") + else: + filename = sys.argv[j] + # sys.stderr.write("Analyzing %s\n" % sys.argv[j].decode('UTF-8')) + unlocker = PdfHashExtractor(filename) + try: + unlocker.parse() + except RuntimeError: + e = sys.exc_info()[1] + sys.stderr.write("%s : %s\n" % (filename, str(e))) diff --git a/monopoly/pdf.py b/monopoly/pdf.py index 315b43fc..74dc482c 100644 --- a/monopoly/pdf.py +++ b/monopoly/pdf.py @@ -1,10 +1,13 @@ import logging +import subprocess from dataclasses import dataclass import fitz import pytesseract from PIL import Image +from monopoly.helpers.pdf2john import PdfHashExtractor + logger = logging.getLogger(__name__) @@ -21,6 +24,7 @@ def lines(self) -> list: @dataclass class PdfConfig: + brute_force_mask: str = None password: str = None page_range: tuple = (None, None) page_bbox: tuple = None @@ -41,16 +45,46 @@ def __init__(self, file_path: str, config: PdfConfig = None): self.password = config.password self.page_range = slice(*config.page_range) self.page_bbox: tuple = config.page_bbox + self.brute_force_mask = config.brute_force_mask self.remove_vertical_text = True def open(self): logger.info("Opening pdf from path %s", self.file_path) document = fitz.Document(self.file_path) - document.authenticate(self.password) - if document.is_encrypted: - raise ValueError("Wrong password - document is encrypted") - return document + if not document.is_encrypted: + return document + + if self.password and not self.brute_force_mask: + document.authenticate(self.password) + + if document.is_encrypted: + raise ValueError("Wrong password - document is encrypted") + + return document + + # This attempts to unlock statements based on a common password, + # followed by the last few digits of a card + if document.is_encrypted and self.brute_force_mask: + logger.info("Unlocking PDF using a string prefix with mask") + password = self.unlock_pdf( + pdf_file_path=self.file_path, + static_string=self.password, + mask=self.brute_force_mask, + ) + + document.authenticate(password) + + if not document.is_encrypted: + logger.info("Successfully authenticated with password") + return document + + # If no successful authentication, raise an error + raise ValueError( + "Unable to unlock PDF password using static string and mask" + ) + + return None def get_pages(self) -> list[PdfPage]: logger.info("Extracting text from PDF") @@ -61,6 +95,38 @@ def get_pages(self) -> list[PdfPage]: return [self._process_page(page) for page in document] + @staticmethod + def unlock_pdf(pdf_file_path: str, static_string: str, mask: str): + hash_extractor = PdfHashExtractor(pdf_file_path) + pdf_hash = hash_extractor.parse() + + hash_path = ".hash" + with open(hash_path, "w", encoding="utf-8") as file: + file.write(pdf_hash) + + mask_command = [ + f"john --format=PDF --mask={static_string}{mask} {hash_path} --pot=.pot" + ] + process = subprocess.run(mask_command, shell=True, check=False) + + if not process.returncode == 0: + raise ValueError(f"Return code is not 0: {process}") + + show_command = ["john", "--show", hash_path, "--pot=.pot"] + output = subprocess.run( + show_command, capture_output=True, text=True, check=False + ) + + if not output.returncode == 0: + raise ValueError(f"Return code is not 0: {output}") + + if "1 password hash cracked, 0 left" not in output.stdout: + raise ValueError(f"PDF was not unlocked: {output}") + + password = output.stdout.split("\n")[0].split(":")[-1] + + return password + def _process_page(self, page: fitz.Page) -> PdfPage: logger.info("Processing: %s", page) if self.page_bbox: diff --git a/pdf2john.py b/pdf2john.py new file mode 100644 index 00000000..79f1e50a --- /dev/null +++ b/pdf2john.py @@ -0,0 +1,373 @@ +# pylint: skip-file +#!/usr/bin/env python + +# Copyright (c) 2013 Shane Quigley, < shane at softwareontheside.info > + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import re +import sys +from xml.dom import minidom + +PY3 = sys.version_info[0] == 3 + + +class PdfParser: + def __init__(self, file_name): + self.file_name = file_name + f = open(file_name, "rb") + self.encrypted = f.read() + f.close() + self.process = True + psr = re.compile(b"PDF-\d\.\d") + try: + self.pdf_spec = psr.findall(self.encrypted)[0] + except IndexError: + sys.stderr.write("%s is not a PDF file!\n" % file_name) + self.process = False + + def parse(self): + if not self.process: + return + + try: + trailer = self.get_trailer() + except RuntimeError: + e = sys.exc_info()[1] + sys.stderr.write("%s : %s\n" % (self.file_name, str(e))) + return + # print >> sys.stderr, trailer + object_id = self.get_object_id(b"Encrypt", trailer) + # print >> sys.stderr, object_id + if len(object_id) == 0: + raise RuntimeError("Could not find object id") + encryption_dictionary = self.get_encryption_dictionary(object_id) + # print >> sys.stderr, encryption_dictionary + dr = re.compile(b"\d+") + vr = re.compile(b"\/V \d") + rr = re.compile(b"\/R \d") + try: + v = dr.findall(vr.findall(encryption_dictionary)[0])[0] + except IndexError: + raise RuntimeError("Could not find /V") + r = dr.findall(rr.findall(encryption_dictionary)[0])[0] + lr = re.compile(b"\/Length \d+") + longest = 0 + # According to the docs: + # Length : (Optional; PDF 1.4; only if V is 2 or 3). Default value: 40 + length = b"40" + for le in lr.findall(encryption_dictionary): + if int(dr.findall(le)[0]) > longest: + longest = int(dr.findall(le)[0]) + length = dr.findall(le)[0] + pr = re.compile(b"\/P -?\d+") + try: + p = pr.findall(encryption_dictionary)[0] + except IndexError: + # print >> sys.stderr, "** dict:", encryption_dictionary + raise RuntimeError("Could not find /P") + pr = re.compile(b"-?\d+") + p = pr.findall(p)[0] + meta = "1" if self.is_meta_data_encrypted(encryption_dictionary) else "0" + idr = re.compile(b"\/ID\s*\[\s*<\w+>\s*<\w+>\s*\]") + try: + i_d = idr.findall(trailer)[0] # id key word + except IndexError: + # some pdf files use () instead of <> + idr = re.compile(b"\/ID\s*\[\s*\(\w+\)\s*\(\w+\)\s*\]") + try: + i_d = idr.findall(trailer)[0] # id key word + except IndexError: + # print >> sys.stderr, "** idr:", idr + # print >> sys.stderr, "** trailer:", trailer + raise RuntimeError("Could not find /ID tag") + return + idr = re.compile(b"<\w+>") + try: + i_d = idr.findall(trailer)[0] + except IndexError: + idr = re.compile(b"\(\w+\)") + i_d = idr.findall(trailer)[0] + i_d = i_d.replace(b"<", b"") + i_d = i_d.replace(b">", b"") + i_d = i_d.lower() + passwords = self.get_passwords_for_JtR(encryption_dictionary) + output = ( + "$pdf$" + + v.decode("ascii") + + "*" + + r.decode("ascii") + + "*" + + length.decode("ascii") + + "*" + ) + output += p.decode("ascii") + "*" + meta + "*" + output += str(int(len(i_d) / 2)) + "*" + i_d.decode("ascii") + "*" + passwords + if self.is_meta_data_encrypted(encryption_dictionary): + sys.stdout.write( + "%s:%s:::::%s\n" + % ( + os.path.basename(self.file_name.encode("UTF-8")), + output.encode("UTF-8"), + self.file_name.encode("UTF-8"), + ) + ) + else: + gecos = self.parse_meta_data(trailer) + sys.stdout.write( + "%s:%s:::%s::%s\n" + % ( + os.path.basename(self.file_name.encode("UTF-8")), + output.encode("UTF-8"), + gecos.encode("UTF-8"), + self.file_name.encode("UTF-8"), + ) + ) + + def get_passwords_for_JtR(self, encryption_dictionary): + output = "" + letters = [b"U", b"O"] + if b"1.7" in self.pdf_spec: + letters = [b"U", b"O", b"UE", b"OE"] + for let in letters: + pr_str = b"\/" + let + b"\s*\([^)]+\)" + pr = re.compile(pr_str) + pas = pr.findall(encryption_dictionary) + if len(pas) > 0: + pas = pr.findall(encryption_dictionary)[0] + # because regexs in python suck <=== LOL + while pas[-2] == b"\\": + pr_str += b"[^)]+\)" + pr = re.compile(pr_str) + # print >> sys.stderr, "pr_str:", pr_str + # print >> sys.stderr, encryption_dictionary + try: + pas = pr.findall(encryption_dictionary)[0] + except IndexError: + break + output += self.get_password_from_byte_string(pas) + "*" + else: + pr = re.compile(let + b"\s*<\w+>") + pas = pr.findall(encryption_dictionary) + if not pas: + continue + pas = pas[0] + pr = re.compile(b"<\w+>") + pas = pr.findall(pas)[0] + pas = pas.replace(b"<", b"") + pas = pas.replace(b">", b"") + if PY3: + output += ( + str(int(len(pas) / 2)) + "*" + str(pas.lower(), "ascii") + "*" + ) + else: + output += str(int(len(pas) / 2)) + "*" + pas.lower() + "*" + return output[:-1] + + def is_meta_data_encrypted(self, encryption_dictionary): + mr = re.compile(b"\/EncryptMetadata\s\w+") + if len(mr.findall(encryption_dictionary)) > 0: + wr = re.compile(b"\w+") + is_encrypted = wr.findall(mr.findall(encryption_dictionary)[0])[-1] + if is_encrypted == b"false": + return False + else: + return True + else: + return True + + def parse_meta_data(self, trailer): + root_object_id = self.get_object_id(b"Root", trailer) + root_object = self.get_pdf_object(root_object_id) + object_id = self.get_object_id(b"Metadata", root_object) + xmp_metadata_object = self.get_pdf_object(object_id) + return self.get_xmp_values(xmp_metadata_object) + + def get_xmp_values(self, xmp_metadata_object): + xmp_metadata_object = xmp_metadata_object.partition(b"stream")[2] + xmp_metadata_object = xmp_metadata_object.partition(b"endstream")[0] + try: + xml_metadata = minidom.parseString(xmp_metadata_object) + except: + return "" + values = [] + values.append(self.get_dc_value("title", xml_metadata)) + values.append(self.get_dc_value("creator", xml_metadata)) + values.append(self.get_dc_value("description", xml_metadata)) + values.append(self.get_dc_value("subject", xml_metadata)) + created_year = xml_metadata.getElementsByTagName("xmp:CreateDate") + if len(created_year) > 0: + created_year = created_year[0].firstChild.data[0:4] + values.append(str(created_year)) + return " ".join(values).replace(":", "") + + def get_dc_value(self, value, xml_metadata): + output = xml_metadata.getElementsByTagName("dc:" + value) + if len(output) > 0: + output = output[0] + output = output.getElementsByTagName("rdf:li")[0] + if output.firstChild: + output = output.firstChild.data + return output + return "" + + def get_encryption_dictionary(self, object_id): + encryption_dictionary = self.get_pdf_object(object_id) + for o in encryption_dictionary.split(b"endobj"): + if object_id + b" obj" in o: + encryption_dictionary = o + return encryption_dictionary + + def get_object_id(self, name, trailer): + oir = re.compile(b"\/" + name + b"\s\d+\s\d\sR") + try: + object_id = oir.findall(trailer)[0] + except IndexError: + # print >> sys.stderr, " ** get_object_id: name \"", name, "\", trailer ", trailer + return "" + oir = re.compile(b"\d+ \d") + object_id = oir.findall(object_id)[0] + return object_id + + def get_pdf_object(self, object_id): + output = ( + object_id + + b" obj" + + self.encrypted.partition(b"\r" + object_id + b" obj")[2] + ) + if output == object_id + b" obj": + output = ( + object_id + + b" obj" + + self.encrypted.partition(b"\n" + object_id + b" obj")[2] + ) + output = output.partition(b"endobj")[0] + b"endobj" + # print >> sys.stderr, output + return output + + def get_trailer(self): + trailer = self.get_data_between(b"trailer", b">>", b"/ID") + if trailer == b"": + trailer = self.get_data_between(b"DecodeParms", b"stream", b"") + if trailer == "": + raise RuntimeError("Can't find trailer") + if trailer != "" and trailer.find(b"Encrypt") == -1: + # print >> sys.stderr, trailer + raise RuntimeError("File not encrypted") + return trailer + + def get_data_between(self, s1, s2, tag): + output = b"" + inside_first = False + lines = re.split(b"\n|\r", self.encrypted) + for line in lines: + inside_first = inside_first or line.find(s1) != -1 + if inside_first: + output += line + if line.find(s2) != -1: + if tag == b"" or output.find(tag) != -1: + break + else: + output = b"" + inside_first = False + return output + + def get_hex_byte(self, o_or_u, i): + if PY3: + return hex(o_or_u[i]).replace("0x", "") + else: + return hex(ord(o_or_u[i])).replace("0x", "") + + def get_password_from_byte_string(self, o_or_u): + pas = "" + escape_seq = False + escapes = 0 + excluded_indexes = [0, 1, 2] + # For UE & OE in 1.7 spec + if not PY3: + if o_or_u[2] != "(": + excluded_indexes.append(3) + else: + if o_or_u[2] != 40: + excluded_indexes.append(3) + for i in range(len(o_or_u)): + if i not in excluded_indexes: + if len(self.get_hex_byte(o_or_u, i)) == 1 and o_or_u[i] != "\\"[0]: + pas += "0" # need to be 2 digit hex numbers + is_back_slash = True + if not PY3: + is_back_slash = o_or_u[i] != "\\"[0] + else: + is_back_slash = o_or_u[i] != 92 + if is_back_slash or escape_seq: + if escape_seq: + if not PY3: + esc = "\\" + o_or_u[i] + else: + esc = "\\" + chr(o_or_u[i]) + esc = self.unescape(esc) + if len(hex(ord(esc[0])).replace("0x", "")) == 1: + pas += "0" + pas += hex(ord(esc[0])).replace("0x", "") + escape_seq = False + else: + pas += self.get_hex_byte(o_or_u, i) + else: + escape_seq = True + escapes += 1 + output = len(o_or_u) - (len(excluded_indexes) + 1) - escapes + return str(output) + "*" + pas[:-2] + + def unescape(self, esc): + escape_seq_map = { + "\\n": "\n", + "\\s": "\s", + "\\e": "\e", + "\\r": "\r", + "\\t": "\t", + "\\v": "\v", + "\\f": "\f", + "\\b": "\b", + "\\a": "\a", + "\\)": ")", + "\\(": "(", + "\\\\": "\\", + "\\0": "\0", + } + + return escape_seq_map[esc] + + +if __name__ == "__main__": + if len(sys.argv) < 2: + sys.stderr.write("Usage: %s \n" % os.path.basename(sys.argv[0])) + sys.exit(-1) + for j in range(1, len(sys.argv)): + if not PY3: + filename = sys.argv[j].decode("UTF-8") + else: + filename = sys.argv[j] + # sys.stderr.write("Analyzing %s\n" % sys.argv[j].decode('UTF-8')) + parser = PdfParser(filename) + try: + parser.parse() + except RuntimeError: + e = sys.exc_info()[1] + sys.stderr.write("%s : %s\n" % (filename, str(e))) diff --git a/terraform/main.tf b/terraform/main.tf index 0176d0f0..3a27c060 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -76,7 +76,7 @@ resource "google_cloud_run_v2_job" "default" { value = var.ocbc_password } env { - name = "HSBC_PDF_PASSWORD" + name = "HSBC_PDF_PASSWORD_PREFIX" value = var.hsbc_password } } diff --git a/tests/fixtures/protected.pdf b/tests/fixtures/protected.pdf index a7a6b509..bd5db796 100644 Binary files a/tests/fixtures/protected.pdf and b/tests/fixtures/protected.pdf differ diff --git a/tests/test_parser.py b/tests/test_parser.py index f192bc29..0b598ed0 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -32,3 +32,13 @@ def test_get_pages_invalid_returns_error(parser: PdfParser): with pytest.raises(ValueError, match="bad page number"): parser.get_pages() + + +def test_pdf_unlock(parser: PdfParser): + password = parser.unlock_pdf( + pdf_file_path="tests/fixtures/protected.pdf", + static_string="foobar", + mask="?d?d?d", + ) + + assert password == "foobar123"