Skip to content

Commit

Permalink
Merge pull request #128 from rowingdude/devel_3.0.3
Browse files Browse the repository at this point in the history
Devel_3.0.3
  • Loading branch information
rowingdude authored Sep 4, 2024
2 parents 29821c1 + cef6594 commit a965cf9
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 52 deletions.
35 changes: 28 additions & 7 deletions src/analyzeMFT/cli.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
import asyncio
from optparse import OptionParser
from optparse import OptionParser, OptionGroup
import sys
from .mft_analyzer import MftAnalyzer
from .constants import VERSION

async def main():
parser = OptionParser(usage="usage: %prog -f <mft_file> -o <output.csv> [-d] [-H]",
parser = OptionParser(usage="usage: %prog -f <mft_file> -o <output_file> [options]",
version=f"%prog {VERSION}")
parser.add_option("-f", "--file", dest="filename",
help="MFT file to analyze", metavar="FILE")
parser.add_option("-o", "--output", dest="csvfile",
help="Output CSV file", metavar="FILE")
parser.add_option("-o", "--output", dest="output_file",
help="Output file", metavar="FILE")

export_group = OptionGroup(parser, "Export Options")
export_group.add_option("--csv", action="store_const", const="csv", dest="export_format",
help="Export as CSV (default)")
export_group.add_option("--json", action="store_const", const="json", dest="export_format",
help="Export as JSON")
export_group.add_option("--xml", action="store_const", const="xml", dest="export_format",
help="Export as XML")
export_group.add_option("--excel", action="store_const", const="excel", dest="export_format",
help="Export as Excel")
export_group.add_option("--body", action="store_const", const="body", dest="export_format",
help="Export as body file (for mactime)")
export_group.add_option("--timeline", action="store_const", const="timeline", dest="export_format",
help="Export as TSK timeline")
export_group.add_option("--l2t", action="store_const", const="l2t", dest="export_format",
help="Export as log2timeline CSV")
parser.add_option_group(export_group)

parser.add_option("-d", "--debug", action="store_true", dest="debug",
help="Enable debug output", default=False)
parser.add_option("-H", "--hash", action="store_true", dest="compute_hashes",
help="Compute hashes (MD5, SHA256, SHA512, CRC32)", default=False)

(options, args) = parser.parse_args()

if not options.filename or not options.csvfile:
if not options.filename or not options.output_file:
parser.print_help()
sys.exit(1)

analyzer = MftAnalyzer(options.filename, options.csvfile, options.debug, options.compute_hashes)
if not options.export_format:
options.export_format = "csv" # Default to CSV if no format specified

analyzer = MftAnalyzer(options.filename, options.output_file, options.debug, options.compute_hashes, options.export_format)
await analyzer.analyze()
print(f"Analysis complete. Results written to {options.csvfile}")
print(f"Analysis complete. Results written to {options.output_file}")

if __name__ == "__main__":
asyncio.run(main())
88 changes: 88 additions & 0 deletions src/analyzeMFT/file_writers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import csv
import json
import xml.etree.ElementTree as ET
import asyncio
from typing import List, Dict, Any
from .mft_record import MftRecord

class FileWriters:
@staticmethod
async def write_csv(records: List[MftRecord], output_file: str) -> None:
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(CSV_HEADER)
for record in records:
writer.writerow(record.to_csv())
await asyncio.sleep(0)

@staticmethod
async def write_json(records: List[MftRecord], output_file: str) -> None:
json_data = [record.__dict__ for record in records]
with open(output_file, 'w', encoding='utf-8') as jsonfile:
json.dump(json_data, jsonfile, indent=2, default=str)
await asyncio.sleep(0)

@staticmethod
async def write_xml(records: List[MftRecord], output_file: str) -> None:
root = ET.Element("mft_records")
for record in records:
record_elem = ET.SubElement(root, "record")
for key, value in record.__dict__.items():
ET.SubElement(record_elem, key).text = str(value)
tree = ET.ElementTree(root)
tree.write(output_file, encoding='utf-8', xml_declaration=True)
await asyncio.sleep(0)

@staticmethod
async def write_excel(records: List[MftRecord], output_file: str) -> None:
try:
import openpyxl
except ImportError:
print("openpyxl is not installed. Please install it to use Excel export.")
return

wb = openpyxl.Workbook()
ws = wb.active
ws.append(CSV_HEADER)
for record in records:
ws.append(record.to_csv())
wb.save(output_file)
await asyncio.sleep(0)

@staticmethod
async def write_body(records: List[MftRecord], output_file: str) -> None:
with open(output_file, 'w', encoding='utf-8') as bodyfile:
for record in records:
# Format: MD5|name|inode|mode_as_string|UID|GID|size|atime|mtime|ctime|crtime
bodyfile.write(f"0|{record.filename}|{record.recordnum}|{record.flags:04o}|0|0|"
f"{record.filesize}|{record.fn_times['atime'].unixtime}|"
f"{record.fn_times['mtime'].unixtime}|{record.fn_times['ctime'].unixtime}|"
f"{record.fn_times['crtime'].unixtime}\n")
await asyncio.sleep(0)

@staticmethod
async def write_timeline(records: List[MftRecord], output_file: str) -> None:
with open(output_file, 'w', encoding='utf-8') as timeline:
for record in records:
# Format: Time|Source|Type|User|Host|Short|Desc|Version|Filename|Inode|Notes|Format|Extra
timeline.write(f"{record.fn_times['crtime'].unixtime}|MFT|CREATE|||||{record.filename}|{record.recordnum}||||\n")
timeline.write(f"{record.fn_times['mtime'].unixtime}|MFT|MODIFY|||||{record.filename}|{record.recordnum}||||\n")
timeline.write(f"{record.fn_times['atime'].unixtime}|MFT|ACCESS|||||{record.filename}|{record.recordnum}||||\n")
timeline.write(f"{record.fn_times['ctime'].unixtime}|MFT|CHANGE|||||{record.filename}|{record.recordnum}||||\n")
await asyncio.sleep(0)

@staticmethod
async def write_l2t(records: List[MftRecord], output_file: str) -> None:
with open(output_file, 'w', newline='', encoding='utf-8') as l2tfile:
writer = csv.writer(l2tfile)
writer.writerow(['date', 'time', 'timezone', 'MACB', 'source', 'sourcetype', 'type', 'user', 'host', 'short', 'desc', 'version', 'filename', 'inode', 'notes', 'format', 'extra'])
for record in records:
for time_type, time_obj in record.fn_times.items():
macb = 'M' if time_type == 'mtime' else 'A' if time_type == 'atime' else 'C' if time_type == 'ctime' else 'B'
date_str = time_obj.dt.strftime('%m/%d/%Y') if time_obj.dt else ''
time_str = time_obj.dt.strftime('%H:%M:%S') if time_obj.dt else ''
writer.writerow([
date_str, time_str, 'UTC', macb, 'MFT', 'FILESYSTEM', time_type, '', '', '',
f"{record.filename} {time_type}", '', record.filename, record.recordnum, '', '', ''
])
await asyncio.sleep(0)
47 changes: 25 additions & 22 deletions src/analyzeMFT/mft_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import io
import sys
import traceback
from typing import Dict, Set, List, Optional, Any
from .constants import *
from .mft_record import MftRecord

class MftAnalyzer:

def __init__(self, mft_file, output_file, debug=False, compute_hashes=False):
def __init__(self, mft_file: str, output_file: str, debug: bool = False, compute_hashes: bool = False, export_format: str = "csv") -> None:
self.mft_file = mft_file
self.output_file = output_file
self.debug = debug
self.compute_hashes = compute_hashes
self.mft_records = {}
self.export_format = export_format
self.mft_records = []
self.interrupt_flag = asyncio.Event()
self.csv_writer = None
self.csvfile = None
Expand All @@ -32,30 +34,19 @@ def __init__(self, mft_file, output_file, debug=False, compute_hashes=False):
})


async def analyze(self):
async def analyze(self) -> None:
try:
self.csvfile = io.open(self.output_file, 'w', newline='', encoding='utf-8')
self.csv_writer = csv.writer(self.csvfile)
header = CSV_HEADER.copy()
if self.compute_hashes:
header.extend(['MD5', 'SHA256', 'SHA512', 'CRC32'])
self.csv_writer.writerow(header)

self.handle_interrupt()
await self.process_mft()

await self.write_output()
except Exception as e:
print(f"An unexpected error occurred: {e}")
if self.debug:
traceback.print_exc()
finally:
await self.write_remaining_records()
self.print_statistics()
if self.csvfile:
self.csvfile.close()
print(f"Analysis complete. Results written to {self.output_file}")

async def process_mft(self):

async def process_mft(self) -> None:
try:
with open(self.mft_file, 'rb') as f:
while not self.interrupt_flag.is_set():
Expand Down Expand Up @@ -100,7 +91,7 @@ async def process_mft(self):
if self.debug:
traceback.print_exc()

def handle_interrupt(self):
def handle_interrupt(self) -> None:
if sys.platform == "win32":
# Windows-specific interrupt handling
import win32api
Expand All @@ -120,7 +111,7 @@ def unix_handler():
getattr(signal, signame),
unix_handler)

async def write_csv_block(self):
async def write_csv_block(self) -> None:
try:
for record in self.mft_records.values():
filepath = self.build_filepath(record)
Expand All @@ -143,11 +134,11 @@ async def write_csv_block(self):
traceback.print_exc()


async def write_remaining_records(self):
async def write_remaining_records(self) -> None:
await self.write_csv_block()
self.mft_records.clear()

def build_filepath(self, record):
def build_filepath(self, record: MftRecord) -> str:
path_parts = []
current_record = record
max_depth = 255
Expand Down Expand Up @@ -179,7 +170,7 @@ def build_filepath(self, record):

return '\\'.join(path_parts)

def print_statistics(self):
def print_statistics(self) -> None:
print("\nMFT Analysis Statistics:")
print(f"Total records processed: {self.stats['total_records']}")
print(f"Active records: {self.stats['active_records']}")
Expand All @@ -191,3 +182,15 @@ def print_statistics(self):
print(f"Unique SHA512 hashes: {len(self.stats['unique_sha512'])}")
print(f"Unique CRC32 hashes: {len(self.stats['unique_crc32'])}")


async def write_output(self) -> None:
if self.export_format == "csv":
await FileWriters.write_csv(self.mft_records, self.output_file)
elif self.export_format == "json":
await FileWriters.write_json(self.mft_records, self.output_file)
elif self.export_format == "xml":
await FileWriters.write_xml(self.mft_records, self.output_file)
elif self.export_format == "excel":
await FileWriters.write_excel(self.mft_records, self.output_file)
else:
print(f"Unsupported export format: {self.export_format}")
Loading

0 comments on commit a965cf9

Please sign in to comment.