From b1200350ad9ec5b66da7e78cf7e2f1bc2b0b00bc Mon Sep 17 00:00:00 2001 From: Jonathan Abrahamy Date: Wed, 13 Sep 2023 13:46:18 +0200 Subject: [PATCH] Add pdfminer integration --- extra/optional_dependencies.txt | 1 + lib/cuckoo/common/integrations/parse_pdf.py | 2 + lib/cuckoo/common/integrations/pdfminer.py | 64 +++++++++++++++++++++ 3 files changed, 67 insertions(+) create mode 100644 lib/cuckoo/common/integrations/pdfminer.py diff --git a/extra/optional_dependencies.txt b/extra/optional_dependencies.txt index 5993cb48264..240ee6efe45 100644 --- a/extra/optional_dependencies.txt +++ b/extra/optional_dependencies.txt @@ -22,3 +22,4 @@ regex ruff scp urlextract==1.5.0 +pdfminer==20191125 \ No newline at end of file diff --git a/lib/cuckoo/common/integrations/parse_pdf.py b/lib/cuckoo/common/integrations/parse_pdf.py index 67fa4ed6ad8..3038f0287be 100644 --- a/lib/cuckoo/common/integrations/parse_pdf.py +++ b/lib/cuckoo/common/integrations/parse_pdf.py @@ -6,6 +6,7 @@ import logging from typing import Any, Dict +from lib.cuckoo.common.integrations.pdfminer import pdfminer_parse from lib.cuckoo.common.integrations.peepdf import peepdf_parse from lib.cuckoo.common.path_utils import path_exists @@ -59,6 +60,7 @@ def _parse(self, filepath: str) -> Dict[str, Any]: "Keywords": {str(keyword["name"]): keyword["count"] for keyword in pdfid_data["pdfid"]["keywords"]["keyword"]}, } pdfresult = peepdf_parse(self.file_path, pdfresult) + pdfresult = pdfminer_parse(self.file_path, pdfresult) return pdfresult diff --git a/lib/cuckoo/common/integrations/pdfminer.py b/lib/cuckoo/common/integrations/pdfminer.py new file mode 100644 index 00000000000..56a008831a4 --- /dev/null +++ b/lib/cuckoo/common/integrations/pdfminer.py @@ -0,0 +1,64 @@ +import logging +from typing import Any +from typing import Dict +from typing import Iterable +from typing import Set +from typing import Union + +try: + from pdfminer import pdfparser + from pdfminer import pdfdocument + from pdfminer import pdftypes + + HAVE_PDFMINER = True +except ImportError: + HAVE_PDFMINER = False + +log = logging.getLogger(__name__) + + +def _search_for_url(obj: Union[dict, list]) -> Iterable[str]: + if obj is None: + return + + if isinstance(obj, pdftypes.PDFStream): + yield from _search_for_url(obj.attrs) + elif isinstance(obj, list): + for v in obj: + yield from _search_for_url(v) + elif isinstance(obj, dict): + for key, value in obj.items(): + if key == 'URI': + yield value.decode() if isinstance(value, bytes) else value + continue + + yield from _search_for_url(value) + + +def _mine_for_urls(file_path: str) -> Set[str]: + urls = set() + try: + with open(file_path, 'rb') as f: + parser = pdfparser.PDFParser(f) + doc = pdfdocument.PDFDocument(parser) + + for xref in doc.xrefs: + for object_id in xref.get_objids(): + try: + obj = doc.getobj(object_id) + urls.update(_search_for_url(obj)) + except Exception as ex: + log.error(ex, exc_info=True) + except Exception as ex: + log.error(ex, exc_info=True) + + return urls + + +def pdfminer_parse(filepath: str, pdfresult: Dict[str, Any]) -> Dict[str, Any]: + if not HAVE_PDFMINER: + return pdfresult + + urls = _mine_for_urls(filepath) + pdfresult["All_URLs"] = list(urls) + return pdfresult