Skip to content

Commit

Permalink
Merge pull request #483 from chisholm/is_functions
Browse files Browse the repository at this point in the history
add is_sdo() et al functions
  • Loading branch information
clenk authored Jan 29, 2021
2 parents 03b3423 + 3878788 commit 5e2d888
Show file tree
Hide file tree
Showing 15 changed files with 987 additions and 177 deletions.
77 changes: 13 additions & 64 deletions stix2/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from . import registry
from .exceptions import ParseError
from .utils import _get_dict
from .utils import _get_dict, detect_spec_version


def parse(data, allow_custom=False, version=None):
Expand Down Expand Up @@ -42,47 +42,6 @@ def parse(data, allow_custom=False, version=None):
return obj


def _detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A string in "vXX" format, where "XX" indicates the spec version,
e.g. "v20", "v21", etc.
"""

obj_type = stix_dict["type"]

if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
v = 'v' + stix_dict['spec_version'].replace('.', '')
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "v20"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"v21",
max(
_detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in registry.STIX2_OBJ_MAPS["v21"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "v21"
else:
# Not a 2.1 SCO; must be a 2.0 object.
v = "v20"

return v


def dict_to_stix2(stix_dict, allow_custom=False, version=None):
"""convert dictionary to full python-stix2 object
Expand Down Expand Up @@ -115,25 +74,19 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
if 'type' not in stix_dict:
raise ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict))

if version:
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
else:
v = _detect_spec_version(stix_dict)
if not version:
version = detect_spec_version(stix_dict)

OBJ_MAP = dict(
registry.STIX2_OBJ_MAPS[v]['objects'],
**registry.STIX2_OBJ_MAPS[v]['observables']
)
obj_type = stix_dict["type"]
obj_class = registry.class_for_type(obj_type, version, "objects") \
or registry.class_for_type(obj_type, version, "observables")

try:
obj_class = OBJ_MAP[stix_dict['type']]
except KeyError:
if not obj_class:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX object, returned as is
return stix_dict
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type'])
raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj_type)

return obj_class(allow_custom=allow_custom, **stix_dict)

Expand Down Expand Up @@ -168,16 +121,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):

obj['_valid_refs'] = _valid_refs or []

if version:
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
else:
v = _detect_spec_version(obj)
if not version:
version = detect_spec_version(obj)

try:
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
obj_type = obj["type"]
obj_class = registry.class_for_type(obj_type, version, "observables")
if not obj_class:
if allow_custom:
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is
Expand Down
8 changes: 3 additions & 5 deletions stix2/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,14 @@ def clean(self, value):
possible_prefix = value[:value.index('--')]

if self.valid_types:
ref_valid_types = enumerate_types(self.valid_types, 'v' + self.spec_version.replace(".", ""))
ref_valid_types = enumerate_types(self.valid_types, self.spec_version)

if possible_prefix in ref_valid_types:
required_prefix = possible_prefix
else:
raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (possible_prefix))
elif self.invalid_types:
ref_invalid_types = enumerate_types(self.invalid_types, 'v' + self.spec_version.replace(".", ""))
ref_invalid_types = enumerate_types(self.invalid_types, self.spec_version)

if possible_prefix not in ref_invalid_types:
required_prefix = possible_prefix
Expand Down Expand Up @@ -655,9 +655,7 @@ def clean(self, value):
except ValueError:
raise ValueError("The extensions property must contain a dictionary")

v = 'v' + self.spec_version.replace('.', '')

specific_type_map = STIX2_OBJ_MAPS[v]['observable-extensions'].get(self.enclosing_type, {})
specific_type_map = STIX2_OBJ_MAPS[self.spec_version]['observable-extensions'].get(self.enclosing_type, {})
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
Expand Down
39 changes: 14 additions & 25 deletions stix2/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,15 @@ def _register_object(new_type, version=DEFAULT_VERSION):

properties = new_type._properties

if not version:
version = DEFAULT_VERSION

if version == "2.1":
for prop_name, prop in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character" % prop_name)

if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')

OBJ_MAP = registry.STIX2_OBJ_MAPS[v]['objects']
OBJ_MAP = registry.STIX2_OBJ_MAPS[version]['objects']
if new_type._type in OBJ_MAP.keys():
raise DuplicateRegistrationError("STIX Object", new_type._type)
OBJ_MAP[new_type._type] = new_type
Expand All @@ -61,20 +58,17 @@ def _register_marking(new_marking, version=DEFAULT_VERSION):
mark_type = new_marking._type
properties = new_marking._properties

if not version:
version = DEFAULT_VERSION

_validate_type(mark_type, version)

if version == "2.1":
for prop_name, prop_value in properties.items():
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)

if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')

OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[v]['markings']
OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[version]['markings']
if mark_type in OBJ_MAP_MARKING.keys():
raise DuplicateRegistrationError("STIX Marking", mark_type)
OBJ_MAP_MARKING[mark_type] = new_marking
Expand All @@ -91,6 +85,9 @@ def _register_observable(new_observable, version=DEFAULT_VERSION):
"""
properties = new_observable._properties

if not version:
version = DEFAULT_VERSION

if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties.items():
Expand Down Expand Up @@ -130,13 +127,7 @@ def _register_observable(new_observable, version=DEFAULT_VERSION):
"is not a ListProperty containing ReferenceProperty." % prop_name,
)

if version:
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + DEFAULT_VERSION.replace('.', '')

OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
if new_observable._type in OBJ_MAP_OBSERVABLE.keys():
raise DuplicateRegistrationError("Cyber Observable", new_observable._type)
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
Expand Down Expand Up @@ -182,8 +173,6 @@ def _register_observable_extension(
if not re.match(PREFIX_21_REGEX, prop_name):
raise ValueError("Property name '%s' must begin with an alpha character." % prop_name)

v = 'v' + version.replace('.', '')

try:
observable_type = observable._type
except AttributeError:
Expand All @@ -192,8 +181,8 @@ def _register_observable_extension(
"created with the @CustomObservable decorator.",
)

OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables']
EXT_MAP = registry.STIX2_OBJ_MAPS[v]['observable-extensions']
OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables']
EXT_MAP = registry.STIX2_OBJ_MAPS[version]['observable-extensions']

try:
if ext_type in EXT_MAP[observable_type].keys():
Expand Down
54 changes: 53 additions & 1 deletion stix2/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@
STIX2_OBJ_MAPS = {}


def _stix_vid_to_version(stix_vid):
"""
Convert a python package name representing a STIX version in the form "vXX"
to the dotted style used in the public APIs of this library, "X.X".
:param stix_vid: A package name in the form "vXX"
:return: A STIX version in dotted style
"""
assert len(stix_vid) >= 3

stix_version = stix_vid[1] + "." + stix_vid[2:]
return stix_version


def _collect_stix2_mappings():
"""Navigate the package once and retrieve all object mapping dicts for each
v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP."""
Expand All @@ -16,13 +30,51 @@ def _collect_stix2_mappings():
prefix = str(top_level_module.__name__) + '.'

for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix):
ver = name.split('.')[1]
stix_vid = name.split('.')[1]
if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg:
ver = _stix_vid_to_version(stix_vid)
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver] = {}
STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP
STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE
STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP
elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False:
ver = _stix_vid_to_version(stix_vid)
mod = importlib.import_module(name, str(top_level_module.__name__))
STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING


def class_for_type(stix_type, stix_version, category=None):
"""
Get the registered class which implements a particular STIX type for a
particular STIX version.
:param stix_type: A STIX type as a string
:param stix_version: A STIX version as a string, e.g. "2.1"
:param category: An optional "category" value, which is just used directly
as a second key after the STIX version, and depends on how the types
are internally categorized. This would be useful if the same STIX type
is used to mean two different things within the same STIX version. So
it's unlikely to be necessary. Pass None to just search all the
categories and return the first class found.
:return: A registered python class which implements the given STIX type, or
None if one is not found.
"""
cls = None

cat_map = STIX2_OBJ_MAPS.get(stix_version)
if cat_map:
if category:
class_map = cat_map.get(category)
if class_map:
cls = class_map.get(stix_type)
else:
cls = cat_map["objects"].get(stix_type) \
or cat_map["observables"].get(stix_type) \
or cat_map["markings"].get(stix_type)

# Left "observable-extensions" out; it has a different
# substructure. A version->category->type lookup would result
# in another map, not a class. So it doesn't fit the pattern.

return cls
Loading

0 comments on commit 5e2d888

Please sign in to comment.