Skip to content

Commit

Permalink
Support for WinZip AES in ScanZip / ScanEncryptedZip
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanohoro committed Dec 23, 2022
1 parent e460d1a commit 1d865cd
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 47 deletions.
2 changes: 2 additions & 0 deletions build/python/backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ pygments==2.9.0
pylzma==0.5.0
pytesseract==0.3.7
pytest-mock==3.10.0
pytest-unordered==0.5.2
pytest==7.2.0
python-docx==0.8.11
python-magic==0.4.22
pytz>=2022.1
pyxlsb2==0.0.8
pyyaml>=5.4.1
pyzbar==0.1.8
pyzipper==0.3.6
rarfile==4.0
redis==3.5.3
requests==2.25.1
Expand Down
2 changes: 2 additions & 0 deletions src/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ pygments==2.9.0
pylzma==0.5.0
pytesseract==0.3.7
pytest-mock==3.10.0
pytest-unordered==0.5.2
pytest==7.2.0
python-docx==0.8.11
python-magic==0.4.22
pytz>=2022.1
pyxlsb2==0.0.8
pyyaml>=5.4.1
pyzbar==0.1.8
pyzipper==0.3.6
rarfile==4.0
redis==3.5.3
requests==2.25.1
Expand Down
42 changes: 32 additions & 10 deletions src/python/strelka/scanners/scan_encrypted_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import tempfile
import io
import os
import zipfile
import pyzipper
import zlib

from strelka import strelka
Expand Down Expand Up @@ -60,9 +60,16 @@ def crack_zip(
stderr=subprocess.DEVNULL,
).communicate(timeout=scanner_timeout)

if stdout.split(b"\n")[1]:
self.flags.append("cracked_by_wordlist")
return stdout.split(b"\n")[1].split()[0]
# ZipCrypto
if b"PKZIP" in stdout.split(b"\n")[0]:
if stdout.split(b"\n")[1]:
self.flags.append("cracked_by_wordlist")
return stdout.split(b"\n")[1].split()[0]
# WinZip AES
elif b"WinZip" in stdout.split(b"\n")[0]:
if stdout.split(b"\n")[2]:
self.flags.append("cracked_by_wordlist")
return stdout.split(b"\n")[2].split()[0]
if brute:
(stdout, stderr) = subprocess.Popen(
[
Expand Down Expand Up @@ -115,7 +122,22 @@ def scan(self, data, file, options, expire_at):

with io.BytesIO(data) as zip_io:
try:
with zipfile.ZipFile(zip_io) as zip_obj:

is_aes = False
with pyzipper.ZipFile(zip_io) as zip_obj:

file_list = zip_obj.filelist # .filelist
for file_list_item in file_list:
if not file_list_item.is_dir():
# Check for the AES compression type
if file_list_item.compress_type == 99:
is_aes = True
break

with pyzipper.AESZipFile(zip_io) if is_aes else pyzipper.ZipFile(
zip_io
) as zip_obj:

file_list = zip_obj.filelist # .filelist
for file_list_item in file_list:
if not file_list_item.is_dir():
Expand All @@ -139,14 +161,14 @@ def scan(self, data, file, options, expire_at):
if log_extracted_pws:
self.event["cracked_password"] = extracted_pw

for i, file_item in enumerate(file_list):
if not file_item.filename.endswith("/"):
for file_item in file_list:
if not file_item.is_dir():
if self.event["total"]["extracted"] >= file_limit:
break

try:
extract_data = zip_obj.read(
file_item.filename, extracted_pw
file_item.filename, pwd=extracted_pw
)

if extract_data:
Expand All @@ -165,7 +187,7 @@ def scan(self, data, file, options, expire_at):
self.files.append(extract_file)
self.event["total"]["extracted"] += 1

except NotImplementedError:
except NotImplementedError as e:
self.flags.append("unsupported_compression")
except RuntimeError:
self.flags.append("runtime_error")
Expand All @@ -174,5 +196,5 @@ def scan(self, data, file, options, expire_at):
except zlib.error:
self.flags.append("zlib_error")

except zipfile.BadZipFile:
except pyzipper.BadZipFile:
self.flags.append("bad_zip")
102 changes: 65 additions & 37 deletions src/python/strelka/scanners/scan_zip.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
import os
import zipfile
import pyzipper
import zlib

from strelka import strelka
Expand All @@ -18,31 +18,45 @@ class ScanZip(strelka.Scanner):
password_file: Location of passwords file for zip archives.
Defaults to /etc/strelka/passwords.dat.
"""

def scan(self, data, file, options, expire_at):
file_limit = options.get('limit', 100)
password_file = options.get('password_file', '/etc/strelka/passwords.dat')
file_limit = options.get("limit", 100)
password_file = options.get("password_file", "/etc/strelka/passwords.dat")
passwords = []

# Gather count and list of files to be extracted
self.event['total'] = {'files': 0, 'extracted': 0}
self.event['files'] = []
self.event["total"] = {"files": 0, "extracted": 0}
self.event["files"] = []

# Temporary top level compression metrics
compress_size_total = 0
file_size_total = 0

if os.path.isfile(password_file):
with open(password_file, 'rb') as f:
with open(password_file, "rb") as f:
for line in f:
passwords.append(line.strip())

with io.BytesIO(data) as zip_io:
try:
with zipfile.ZipFile(zip_io) as zip_obj:

is_aes = False
with pyzipper.ZipFile(zip_io) as zip_obj:
filelist = zip_obj.filelist
for file in filelist:
if not file.is_dir():
# Check for the AES compression type
if file.compress_type == 99:
is_aes = True
break

with pyzipper.ZipFile(zip_io) if is_aes else pyzipper.ZipFile(
zip_io
) as zip_obj:
filelist = zip_obj.filelist
for file in filelist:
if not file.is_dir():
self.event['total']['files'] += 1
self.event["total"]["files"] += 1

# For each file in zip, gather metadata metrics and pass back to Strelka for recursive extraction.
for i, name in enumerate(filelist):
Expand All @@ -52,45 +66,57 @@ def scan(self, data, file, options, expire_at):
file_size_total += name.file_size

size_difference = name.file_size - name.compress_size
compression_rate = (size_difference * 100.0) / name.file_size
self.event['files'].append({
"file_name": name.filename,
"file_size": name.file_size,
"compression_size": name.compress_size,
"compression_rate": round(compression_rate, 2)
})

if self.event['total']['extracted'] >= file_limit:
compression_rate = (
size_difference * 100.0
) / name.file_size
self.event["files"].append(
{
"file_name": name.filename,
"file_size": name.file_size,
"compression_size": name.compress_size,
"compression_rate": round(compression_rate, 2),
}
)

if self.event["total"]["extracted"] >= file_limit:
break

try:
extract_data = b''
extract_data = b""
zinfo = zip_obj.getinfo(name.filename)

if zinfo.flag_bits & 0x1:
if 'encrypted' not in self.flags:
self.flags.append('encrypted')
if "encrypted" not in self.flags:
self.flags.append("encrypted")

if passwords:
for pw in passwords:
try:
extract_data = zip_obj.read(name.filename, pw)
self.event['password'] = pw.decode("utf-8")

except (RuntimeError, zipfile.BadZipFile, zlib.error):
extract_data = zip_obj.read(
name.filename, pw
)
self.event["password"] = pw.decode(
"utf-8"
)

except (
RuntimeError,
pyzipper.BadZipFile,
zlib.error,
):
pass
else:
try:
extract_data = zip_obj.read(name.filename)
except RuntimeError:
self.flags.append('runtime_error')
except zipfile.BadZipFile:
self.flags.append('bad_zip')
self.flags.append("runtime_error")
except pyzipper.BadZipFile:
self.flags.append("bad_zip")
except zlib.error:
self.flags.append('zlib_error')
self.flags.append("zlib_error")

# Suppress sending to coordinator in favor of ScanEncryptedZip
if extract_data and 'encrypted' not in self.flags:
if extract_data and "encrypted" not in self.flags:
extract_file = strelka.File(
name=name.filename,
source=self.name,
Expand All @@ -104,20 +130,22 @@ def scan(self, data, file, options, expire_at):
)

self.files.append(extract_file)
self.event['total']['extracted'] += 1
self.event["total"]["extracted"] += 1

except NotImplementedError:
self.flags.append('unsupported_compression')
self.flags.append("unsupported_compression")
except RuntimeError:
self.flags.append('runtime_error')
self.flags.append("runtime_error")
except ValueError:
self.flags.append('value_error')
self.flags.append("value_error")
except zlib.error:
self.flags.append('zlib_error')
self.flags.append("zlib_error")

# Top level compression metric
size_difference_total = file_size_total - compress_size_total
self.event['compression_rate'] = round((size_difference_total * 100.0) / file_size_total, 2)
self.event["compression_rate"] = round(
(size_difference_total * 100.0) / file_size_total, 2
)

except zipfile.BadZipFile:
self.flags.append('bad_zip')
except pyzipper.BadZipFile:
self.flags.append("bad_zip")
Binary file not shown.
22 changes: 22 additions & 0 deletions src/python/strelka/tests/test_scan_encrypted_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,25 @@ def test_scan_encrypted_zip(mocker):

TestCase.maxDiff = None
TestCase().assertDictEqual(test_scan_event, scanner_event)


def test_scan_encrypted_zip_aes256(mocker):
"""
Pass: Sample event matches output of scanner.
Failure: Unable to load file or sample event fails to match.
"""

test_scan_event = {
"elapsed": mock.ANY,
"flags": ["cracked_by_wordlist"],
"total": {"files": 4, "extracted": 4},
}

scanner_event = run_test_scan(
mocker=mocker,
scan_class=ScanUnderTest,
fixture_path=Path(__file__).parent / "fixtures/test_aes256_password.zip",
)

TestCase.maxDiff = None
TestCase().assertDictEqual(test_scan_event, scanner_event)
49 changes: 49 additions & 0 deletions src/python/strelka/tests/test_scan_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,52 @@ def test_scan_zip(mocker):

TestCase.maxDiff = None
TestCase().assertDictEqual(test_scan_event, scanner_event)


def test_scan_zip_aes256(mocker):
"""
Pass: Sample event matches output of scanner.
Failure: Unable to load file or sample event fails to match.
"""

test_scan_event = {
"elapsed": mock.ANY,
"flags": ["encrypted"],
"total": {"files": 4, "extracted": 0},
"files": [
{
"file_name": "hidden/lorem-hidden.txt",
"file_size": 4015,
"compression_size": 1453,
"compression_rate": 63.81,
},
{
"file_name": "hidden/lorem-readonly.txt",
"file_size": 4015,
"compression_size": 1453,
"compression_rate": 63.81,
},
{
"file_name": "hidden/lorem.txt",
"file_size": 4015,
"compression_size": 1453,
"compression_rate": 63.81,
},
{
"file_name": "lorem.txt",
"file_size": 4015,
"compression_size": 1453,
"compression_rate": 63.81,
},
],
"compression_rate": 63.81,
}

scanner_event = run_test_scan(
mocker=mocker,
scan_class=ScanUnderTest,
fixture_path=Path(__file__).parent / "fixtures/test_aes256_password.zip",
)

TestCase.maxDiff = None
TestCase().assertDictEqual(test_scan_event, scanner_event)

0 comments on commit 1d865cd

Please sign in to comment.