Skip to content

Commit

Permalink
Added a FS to parse arbitrary files
Browse files Browse the repository at this point in the history
  • Loading branch information
KOLANICH committed Jul 4, 2018
1 parent aceba3e commit 446f7b9
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 1 deletion.
25 changes: 24 additions & 1 deletion kaitaifs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,41 @@
HeroesOfMightAndMagicAggFS
)
from .fs.iso9660_fs import Iso9660FS
from .fs.AnyKSFS import AnyKSFS

from fuse import FUSE

from functools import partial

from kaitaifs.parser.rar import Rar

FILESYSTEMS = {
'rar': RarFS,
'quake_pak': QuakePakFS,
'heroes_of_might_and_magic_agg': HeroesOfMightAndMagicAggFS,
'iso9660': Iso9660FS,
}

def getParserCtor(moduleName:str):
import importlib
import ast
from pathlib import Path
try:
mod=importlib.import_module("."+moduleName, package="kaitaifs.parser")
except:
mod=importlib.import_module(moduleName)
p=Path(mod.__file__)
a=ast.parse(p.read_text())
for el in a.body:
if isinstance(el, ast.ClassDef):
return getattr(mod, el.name)


def parse_args():
parser = argparse.ArgumentParser(
description='Mount a filesystem based on Kaitai Struct spec.'
)
parser.add_argument('--any', dest='any', action='store_true', default=False, help='use any KaitaiStruct parser class')
parser.add_argument(
'fstype',
metavar='TYPE',
Expand All @@ -46,7 +66,10 @@ def parse_args():

def main():
args = parse_args()
fs_class = FILESYSTEMS[args.fstype]
if args.any:
fs_class = partial(AnyKSFS, getParserCtor(args.fstype))
else:
fs_class = FILESYSTEMS[args.fstype]
FUSE(
fs_class(args.image_file),
args.mount_point,
Expand Down
156 changes: 156 additions & 0 deletions kaitaifs/fs/AnyKSFS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
__all__=("dumpStruct",)
from fuse import FuseOSError
import errno

from pprint import pprint

from pathlib import Path, PurePath

import os
from kaitaistruct import KaitaiStruct
from collections import OrderedDict
try:
import json5 as json
except:
import json

from ..kaitai_tree_fs import KaitaiTreeFS
from datetime import *

someServiceMethodsAndProps={
'close',
'from_bytes',
'from_file',
'from_io'
}
def isFinalProductPropertyName(name):
return (name[0].islower() and name not in someServiceMethodsAndProps)

from fuse import Operations

class AnyKSFS(KaitaiTreeFS):
def __init__(self, ctor, filename:Path):
filename=Path(filename)
st=filename.lstat()
self.ATTR_DIR={
'st_atime': st.st_atime,
'st_ctime': st.st_ctime,
'st_mtime': st.st_mtime,
}
self.obj = ctor.from_file(str(filename))
super().__init__()

def obj_by_path(self, path):
print("obj_by_path", path)
tree = self.obj
for comp in path:
tree = self.find_name_in_dir(tree, comp)
return tree

def find_name_in_dir(self, cur_dir, name):
print("find_name_in_dir", cur_dir, name)
(plainProps, dirProps, binaryProps) = __class__.splitProps(cur_dir)
pprint(plainProps)
if name == "__dict__.json":
return json.dumps(plainProps, ensure_ascii=False, indent="\t").encode("utf-8")

if name in binaryProps:
return binaryProps[name]
if name in dirProps:
return dirProps[name]
raise FuseOSError(errno.ENOENT)


def splitProps(s):
plainProps=OrderedDict()
dirProps=OrderedDict()
binaryProps=OrderedDict()
def classifyProp(name, prop):
if callable(prop):
return
if isinstance(prop, (int, float, str)):
plainProps[name]=prop
elif isinstance(s, KaitaiStruct):
dirProps[name]=prop
else:
binaryProps[name]=prop

if isinstance(s, list):
for i, item in enumerate(s):
classifyProp(str(i), item)
elif isinstance(s, KaitaiStruct):
for name in filter(isFinalProductPropertyName, dir(s)):
classifyProp(name, getattr(s, name))
return (plainProps, dirProps, binaryProps)


def readdir(self, path, fh):
obj = self.obj_by_pathstr(path)
for r in ['.', '..']:
yield r
for r in self.list_files(obj):
yield r

def list_files(self, cur_dir):
print("list_files", cur_dir)
(plainProps, dirProps, binaryProps) = __class__.splitProps(cur_dir)

allFiles=["__dict__.json"]
allFiles.extend(plainProps.keys())
allFiles.extend(dirProps.keys())
allFiles.extend(binaryProps.keys())
pprint(allFiles)
yield from allFiles

"""
if plainProps:
yield "__dict__.json"
yield from dirProps.keys()
yield from binaryProps.keys()
"""

def getattr(self, path, fh=None):
if path == "/":
return self.ATTR_DIR

obj = self.obj_by_pathstr(path)
res=type(self.ATTR_DIR)(self.ATTR_DIR)
res.update(self.get_file_attrs(obj))
return res

def get_file_attrs(self, obj):
print("get_file_attrs", obj)
#(plainProps, dirProps, binaryProps)=splitProps(obj)

# Directory or file?
if isinstance(obj, (list, KaitaiStruct)):
# Directory
mode = 0o040755
print(obj, " is dir")
else:
# Regular file
mode = 0o100644

if isinstance(obj, (bytes, bytearray)):
size=len(obj)
else:
size=0

res={
'st_nlink': 1,
'st_mode': mode,
'st_size': size,
'st_gid': 0,
'st_uid': 0,
}
#pprint(res)
return res

def read(self, path, length, offset, fh):
obj = self.openfiles[fh]
data = self.get_file_body(obj, offset, length)
return data

def get_file_body(self, obj, offset, length):
print("get_file_body", obj, offset, length)
return obj[offset:offset + length]

0 comments on commit 446f7b9

Please sign in to comment.