Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement of ScanOcr Scanner and Strelka Scanners with IOC Changes #417

Merged
merged 9 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions configs/python/backend/backend.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 2023-07-07-update
version: 2024.02.01.01
logging_cfg: '/etc/strelka/logging.yaml'
limits:
max_files: 5000
Expand Down Expand Up @@ -380,9 +380,11 @@ scanners:
priority: 5
options:
extract_text: False
split_words: True
tmp_directory: '/dev/shm/'
tmp_directory: "/dev/shm/"
pdf_to_png: True
remove_formatting: True
create_thumbnail: True
thumbnail_size: [250, 250]
'ScanOle':
- positive:
flavors:
Expand Down
76 changes: 39 additions & 37 deletions src/python/strelka/scanners/scan_iqy.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,64 @@
# Description #
# This scanner is looking for iqy files used with excel.
#
# author: Tasha Taylor
# date: 10/30/2023

import re

from strelka import strelka


class ScanIqy(strelka.Scanner):
"""
Extract URLs from IQY files.
Strelka scanner for extracting URLs from IQY (Excel Web Query Internet Inquire) files.

IQY files are typically used to import data into Excel from the web. They often contain URLs
that specify the data source. This scanner aims to extract these URLs and process them for IOCs.

IQY files, or Excel Web Query Internet Inquire files, are typically created from a VBA Web Query output.
The following is a typical format:
WEB
1
[URL]
[optional parameters]
Additional properties can be found at: https://learn.microsoft.com/en-us/office/vba/api/excel.querytable
The following is a typical format of an IQY file:
WEB
1
[URL]
[optional parameters]

Reference for IQY file format: https://learn.microsoft.com/en-us/office/vba/api/excel.querytable
"""

def scan(self, data, file, options, expire_at):
"""
Processes the provided IQY data to extract URLs.

Attempts to decode the data and applies a regex pattern to identify and extract URLs.
Extracted URLs are added to the scanner's IOC list.

Args:
data (bytes): Data associated with the IQY file to be scanned.
file (strelka.File): File object associated with the data.
options (dict): Options to be applied during the scan.
expire_at (int): Expiration timestamp for extracted files.
"""
try:
# Regular expression for detecting a URL-like pattern
# Compile regex pattern for URL detection
address_pattern = re.compile(
r"\b(?:http|https|ftp|ftps|file|smb)://\S+|"
r"\\{2}\w+\\(?:[\w$]+\\)*[\w$]+",
re.IGNORECASE,
)

# Attempt UTF-8 decoding first, fall back to latin-1 if necessary
# Attempt to decode the data
try:
data = data.decode("utf-8")
decoded_data = data.decode("utf-8")
except UnicodeDecodeError:
data = data.decode("latin-1")

# Split lines to review each record separately
data_lines = data.splitlines()
decoded_data = data.decode("latin-1")

addresses = set()
# For each line, check if the line matches the address pattern.
# In a typical IQY file, the "WEB" keyword is at the beginning of the file,
# and what follows is usually just one URL with optional additional parameters.
# However, because we are iterating lines anyway, lets check for additional addresses anyway.
for entry in data_lines[1:]:
match = address_pattern.search(entry)
if match:
address = match.group().strip()
if address:
addresses.add(address)

# Evaluate if any addresses were found and assign the boolean result.
self.event["address_found"] = bool(addresses)
# Extract addresses from the data
addresses = set(
match.group().strip()
for line in decoded_data.splitlines()
if (match := address_pattern.search(line))
)

# Send all addresses to the IOC parser.
self.add_iocs(list(addresses), self.type.url)
# Add extracted URLs to the scanner's IOC list
if addresses:
self.event["address_found"] = True
self.add_iocs(list(addresses))
else:
self.event["address_found"] = False

except UnicodeDecodeError as e:
self.flags.append(f"Unicode decoding error: {e}")
Expand Down
75 changes: 54 additions & 21 deletions src/python/strelka/scanners/scan_ocr.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,101 @@
import base64
import io
import os
import subprocess
import tempfile

import fitz
from PIL import Image

from strelka import strelka


class ScanOcr(strelka.Scanner):
"""Collects metadata and extracts optical text from image files.
"""Extracts optical text from image files and creates a thumbnail.

This scanner extracts text from image files using OCR (Optical Character Recognition) and
generates a base64-encoded thumbnail. It supports direct image files and converting PDFs
to images for OCR.

Options:
extract_text: Boolean that determines if optical text should be
extracted as a child file.
Defaults to False.
tmp_directory: Location where tempfile writes temporary files.
Defaults to '/tmp/'.
extract_text: If True, extracted text is emitted as a child file. (default: False)
split_words: If True, splits the OCR text into words and stores an array. (default: True)
remove_formatting: If True, removes formatting characters (e.g., \r). Overridden by split_words. (default: True)
tmp_directory: Directory for temporary files. (default: '/tmp/')
pdf_to_png: If True, converts PDFs to PNG for OCR. (default: False)
create_thumbnail: If True, creates a thumbnail for the image. (default: False)
thumbnail_size: Size of the thumbnail to create. (default: (250, 250))
"""

def scan(self, data, file, options, expire_at):
extract_text = options.get("extract_text", False)
split_words = options.get("split_words", True)
remove_formatting = options.get("remove_formatting", True)
tmp_directory = options.get("tmp_directory", "/tmp/")
pdf_to_png = options.get("pdf_to_png", False)
create_thumbnail = options.get("create_thumbnail", False)
thumbnail_size = options.get("thumbnail_size", (250, 250))

# Convert PDF to PNG if required.
if pdf_to_png and "application/pdf" in file.flavors.get("mime", []):
doc = fitz.open(stream=data, filetype="pdf")
data = doc.get_page_pixmap(0).tobytes("png")

try:
doc = fitz.open(stream=data, filetype="pdf")
data = doc.get_page_pixmap(0).tobytes("png")
except Exception as e:
self.flags.append(
f"{self.__class__.__name__}: image_pdf_error: {str(e)[:50]}"
)

# Create a thumbnail from the image.
# Stores as a base64 value in the key: base64_thumbnail
if create_thumbnail:
try:
image = Image.open(io.BytesIO(data))
image.thumbnail(thumbnail_size, Image.Resampling.BILINEAR)
buffered = io.BytesIO()
image.save(buffered, format="WEBP", quality=70, optimize=True)
base64_image = base64.b64encode(buffered.getvalue()).decode("utf-8")
self.event["base64_thumbnail"] = base64_image
except Exception as e:
self.flags.append(
f"{self.__class__.__name__}: image_thumbnail_error: {str(e)[:50]}"
)
# Perform OCR on the image data.
with tempfile.NamedTemporaryFile(dir=tmp_directory) as tmp_data:
tmp_data.write(data)
tmp_data.flush()

with tempfile.NamedTemporaryFile(dir=tmp_directory) as tmp_tess:
try:
tess_txt_name = f"{tmp_tess.name}.txt"

completed_process = subprocess.run(
subprocess.run(
["tesseract", tmp_data.name, tmp_tess.name],
capture_output=True,
check=True,
)

_ = completed_process

with open(tess_txt_name, "rb") as tess_txt:
ocr_file = tess_txt.read()

if ocr_file:
if split_words:
self.event["text"] = ocr_file.split()
else:
self.event["text"] = (
ocr_file.replace(b"\r", b"")
.replace(b"\n", b"")
.replace(b"\f", b"")
)

if remove_formatting:
self.event["text"] = (
ocr_file.replace(b"\r", b"")
.replace(b"\n", b"")
.replace(b"\f", b"")
)
else:
self.event["text"] = ocr_file
if extract_text:
# Send extracted file back to Strelka
self.emit_file(ocr_file, name="text")

os.remove(tess_txt_name)

except subprocess.CalledProcessError as e:
self.flags.append("tesseract_process_error")
self.flags.append(
f"{self.__class__.__name__}: tesseract_process_error: {str(e)[:50]}"
)
raise strelka.ScannerException(e.stderr)
60 changes: 38 additions & 22 deletions src/python/strelka/scanners/scan_xl4ma.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,49 @@


class ScanXl4ma(strelka.Scanner):
"""Extracts Excel 4 cell contents and attempts to extract IOCs"""
"""
Strelka scanner for extracting Excel 4 cell contents and IOCs.

This scanner uses the xl4ma analyzer to extract data from Excel files.
It attempts to decode Excel 4 cell contents and extract any potential IOCs.
Extracted data is added to the scanner's event, and IOCs are processed
using the scanner's IOC processing capabilities.

Attributes inherited from strelka.Scanner:
- name (str): Name of the scanner class.
- key (str): Metadata key used to identify scanner metadata in scan results.
- event (dict): Dictionary containing the result of the scan.
- flags (list): List of flags raised during scanning.
- iocs (list): List of IOCs extracted during scanning.
"""

def scan(self, data, file, options, expire_at):
results = {}
"""
Overrideable scan method from strelka.Scanner.

# Attempt to process Excel data using analyzer
Processes the provided data using the xl4ma analyzer and extracts
relevant information and IOCs.

Args:
data (bytes): Data associated with the file to be scanned.
file (strelka.File): File object associated with the data.
options (dict): Options to be applied during the scan.
expire_at (int): Expiration timestamp for extracted files.
"""
# Attempt to process Excel data using the xl4ma analyzer
try:
# Process Excel data and store the results
results = analyzer.process_data(data=data, filename=file.name)

# Check if decoding and IOCs are present in the results
if "decoded" in results:
self.event["decoded"] = results["decoded"]
if "iocs" in results:
self.event["iocs"] = results["iocs"]
self.add_iocs(results["iocs"])
except strelka.ScannerTimeout:
# Propagate the timeout exception
raise
except Exception as e:
self.flags.append(str(e))
print(str(e))
return

# If processing successful, extract keys and apply to IOC scanner.
if results:
self.event["decoded"] = results.get("decoded", [])
self.event["iocs"] = results.get("iocs", [])

try:
self.add_iocs(
results.get("iocs", []),
self.type.url,
description="extracted from excel4 macro",
)
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("xl4ma_ioc_processing_error")
# Append exception message to flags for diagnostic purposes
self.flags.append(f"xl4ma_processing_exception: {str(e)}")
Loading