diff --git a/stix2/parsing.py b/stix2/parsing.py index d9b65164..e13ff678 100644 --- a/stix2/parsing.py +++ b/stix2/parsing.py @@ -4,7 +4,7 @@ from . import registry from .exceptions import ParseError -from .utils import _get_dict +from .utils import _get_dict, detect_spec_version def parse(data, allow_custom=False, version=None): @@ -42,47 +42,6 @@ def parse(data, allow_custom=False, version=None): return obj -def _detect_spec_version(stix_dict): - """ - Given a dict representing a STIX object, try to detect what spec version - it is likely to comply with. - - :param stix_dict: A dict with some STIX content. Must at least have a - "type" property. - :return: A string in "vXX" format, where "XX" indicates the spec version, - e.g. "v20", "v21", etc. - """ - - obj_type = stix_dict["type"] - - if 'spec_version' in stix_dict: - # For STIX 2.0, applies to bundles only. - # For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only. - v = 'v' + stix_dict['spec_version'].replace('.', '') - elif "id" not in stix_dict: - # Only 2.0 SCOs don't have ID properties - v = "v20" - elif obj_type == 'bundle': - # Bundle without a spec_version property: must be 2.1. But to - # future-proof, use max version over all contained SCOs, with 2.1 - # minimum. - v = max( - "v21", - max( - _detect_spec_version(obj) for obj in stix_dict["objects"] - ), - ) - elif obj_type in registry.STIX2_OBJ_MAPS["v21"]["observables"]: - # Non-bundle object with an ID and without spec_version. Could be a - # 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO... - v = "v21" - else: - # Not a 2.1 SCO; must be a 2.0 object. - v = "v20" - - return v - - def dict_to_stix2(stix_dict, allow_custom=False, version=None): """convert dictionary to full python-stix2 object @@ -115,25 +74,19 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): if 'type' not in stix_dict: raise ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict)) - if version: - # If the version argument was passed, override other approaches. - v = 'v' + version.replace('.', '') - else: - v = _detect_spec_version(stix_dict) + if not version: + version = detect_spec_version(stix_dict) - OBJ_MAP = dict( - registry.STIX2_OBJ_MAPS[v]['objects'], - **registry.STIX2_OBJ_MAPS[v]['observables'] - ) + obj_type = stix_dict["type"] + obj_class = registry.class_for_type(obj_type, version, "objects") \ + or registry.class_for_type(obj_type, version, "observables") - try: - obj_class = OBJ_MAP[stix_dict['type']] - except KeyError: + if not obj_class: if allow_custom: # flag allows for unknown custom objects too, but will not # be parsed into STIX object, returned as is return stix_dict - raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type']) + raise ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj_type) return obj_class(allow_custom=allow_custom, **stix_dict) @@ -168,16 +121,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): obj['_valid_refs'] = _valid_refs or [] - if version: - # If the version argument was passed, override other approaches. - v = 'v' + version.replace('.', '') - else: - v = _detect_spec_version(obj) + if not version: + version = detect_spec_version(obj) - try: - OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables'] - obj_class = OBJ_MAP_OBSERVABLE[obj['type']] - except KeyError: + obj_type = obj["type"] + obj_class = registry.class_for_type(obj_type, version, "observables") + if not obj_class: if allow_custom: # flag allows for unknown custom objects too, but will not # be parsed into STIX observable object, just returned as is diff --git a/stix2/properties.py b/stix2/properties.py index ba31d784..bf7fc8c0 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -503,14 +503,14 @@ def clean(self, value): possible_prefix = value[:value.index('--')] if self.valid_types: - ref_valid_types = enumerate_types(self.valid_types, 'v' + self.spec_version.replace(".", "")) + ref_valid_types = enumerate_types(self.valid_types, self.spec_version) if possible_prefix in ref_valid_types: required_prefix = possible_prefix else: raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (possible_prefix)) elif self.invalid_types: - ref_invalid_types = enumerate_types(self.invalid_types, 'v' + self.spec_version.replace(".", "")) + ref_invalid_types = enumerate_types(self.invalid_types, self.spec_version) if possible_prefix not in ref_invalid_types: required_prefix = possible_prefix @@ -655,9 +655,7 @@ def clean(self, value): except ValueError: raise ValueError("The extensions property must contain a dictionary") - v = 'v' + self.spec_version.replace('.', '') - - specific_type_map = STIX2_OBJ_MAPS[v]['observable-extensions'].get(self.enclosing_type, {}) + specific_type_map = STIX2_OBJ_MAPS[self.spec_version]['observable-extensions'].get(self.enclosing_type, {}) for key, subvalue in dictified.items(): if key in specific_type_map: cls = specific_type_map[key] diff --git a/stix2/registration.py b/stix2/registration.py index bc079954..4ec019a2 100644 --- a/stix2/registration.py +++ b/stix2/registration.py @@ -31,18 +31,15 @@ def _register_object(new_type, version=DEFAULT_VERSION): properties = new_type._properties + if not version: + version = DEFAULT_VERSION + if version == "2.1": for prop_name, prop in properties.items(): if not re.match(PREFIX_21_REGEX, prop_name): raise ValueError("Property name '%s' must begin with an alpha character" % prop_name) - if version: - v = 'v' + version.replace('.', '') - else: - # Use default version (latest) if no version was provided. - v = 'v' + DEFAULT_VERSION.replace('.', '') - - OBJ_MAP = registry.STIX2_OBJ_MAPS[v]['objects'] + OBJ_MAP = registry.STIX2_OBJ_MAPS[version]['objects'] if new_type._type in OBJ_MAP.keys(): raise DuplicateRegistrationError("STIX Object", new_type._type) OBJ_MAP[new_type._type] = new_type @@ -61,6 +58,9 @@ def _register_marking(new_marking, version=DEFAULT_VERSION): mark_type = new_marking._type properties = new_marking._properties + if not version: + version = DEFAULT_VERSION + _validate_type(mark_type, version) if version == "2.1": @@ -68,13 +68,7 @@ def _register_marking(new_marking, version=DEFAULT_VERSION): if not re.match(PREFIX_21_REGEX, prop_name): raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) - if version: - v = 'v' + version.replace('.', '') - else: - # Use default version (latest) if no version was provided. - v = 'v' + DEFAULT_VERSION.replace('.', '') - - OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[v]['markings'] + OBJ_MAP_MARKING = registry.STIX2_OBJ_MAPS[version]['markings'] if mark_type in OBJ_MAP_MARKING.keys(): raise DuplicateRegistrationError("STIX Marking", mark_type) OBJ_MAP_MARKING[mark_type] = new_marking @@ -91,6 +85,9 @@ def _register_observable(new_observable, version=DEFAULT_VERSION): """ properties = new_observable._properties + if not version: + version = DEFAULT_VERSION + if version == "2.0": # If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties for prop_name, prop in properties.items(): @@ -130,13 +127,7 @@ def _register_observable(new_observable, version=DEFAULT_VERSION): "is not a ListProperty containing ReferenceProperty." % prop_name, ) - if version: - v = 'v' + version.replace('.', '') - else: - # Use default version (latest) if no version was provided. - v = 'v' + DEFAULT_VERSION.replace('.', '') - - OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables'] + OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables'] if new_observable._type in OBJ_MAP_OBSERVABLE.keys(): raise DuplicateRegistrationError("Cyber Observable", new_observable._type) OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable @@ -182,8 +173,6 @@ def _register_observable_extension( if not re.match(PREFIX_21_REGEX, prop_name): raise ValueError("Property name '%s' must begin with an alpha character." % prop_name) - v = 'v' + version.replace('.', '') - try: observable_type = observable._type except AttributeError: @@ -192,8 +181,8 @@ def _register_observable_extension( "created with the @CustomObservable decorator.", ) - OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[v]['observables'] - EXT_MAP = registry.STIX2_OBJ_MAPS[v]['observable-extensions'] + OBJ_MAP_OBSERVABLE = registry.STIX2_OBJ_MAPS[version]['observables'] + EXT_MAP = registry.STIX2_OBJ_MAPS[version]['observable-extensions'] try: if ext_type in EXT_MAP[observable_type].keys(): diff --git a/stix2/registry.py b/stix2/registry.py index 6cb6cd8f..3dcc3a5d 100644 --- a/stix2/registry.py +++ b/stix2/registry.py @@ -7,6 +7,20 @@ STIX2_OBJ_MAPS = {} +def _stix_vid_to_version(stix_vid): + """ + Convert a python package name representing a STIX version in the form "vXX" + to the dotted style used in the public APIs of this library, "X.X". + + :param stix_vid: A package name in the form "vXX" + :return: A STIX version in dotted style + """ + assert len(stix_vid) >= 3 + + stix_version = stix_vid[1] + "." + stix_vid[2:] + return stix_version + + def _collect_stix2_mappings(): """Navigate the package once and retrieve all object mapping dicts for each v2X package. Includes OBJ_MAP, OBJ_MAP_OBSERVABLE, EXT_MAP.""" @@ -16,13 +30,51 @@ def _collect_stix2_mappings(): prefix = str(top_level_module.__name__) + '.' for module_loader, name, is_pkg in pkgutil.walk_packages(path=path, prefix=prefix): - ver = name.split('.')[1] + stix_vid = name.split('.')[1] if re.match(r'^stix2\.v2[0-9]$', name) and is_pkg: + ver = _stix_vid_to_version(stix_vid) mod = importlib.import_module(name, str(top_level_module.__name__)) STIX2_OBJ_MAPS[ver] = {} STIX2_OBJ_MAPS[ver]['objects'] = mod.OBJ_MAP STIX2_OBJ_MAPS[ver]['observables'] = mod.OBJ_MAP_OBSERVABLE STIX2_OBJ_MAPS[ver]['observable-extensions'] = mod.EXT_MAP elif re.match(r'^stix2\.v2[0-9]\.common$', name) and is_pkg is False: + ver = _stix_vid_to_version(stix_vid) mod = importlib.import_module(name, str(top_level_module.__name__)) STIX2_OBJ_MAPS[ver]['markings'] = mod.OBJ_MAP_MARKING + + +def class_for_type(stix_type, stix_version, category=None): + """ + Get the registered class which implements a particular STIX type for a + particular STIX version. + + :param stix_type: A STIX type as a string + :param stix_version: A STIX version as a string, e.g. "2.1" + :param category: An optional "category" value, which is just used directly + as a second key after the STIX version, and depends on how the types + are internally categorized. This would be useful if the same STIX type + is used to mean two different things within the same STIX version. So + it's unlikely to be necessary. Pass None to just search all the + categories and return the first class found. + :return: A registered python class which implements the given STIX type, or + None if one is not found. + """ + cls = None + + cat_map = STIX2_OBJ_MAPS.get(stix_version) + if cat_map: + if category: + class_map = cat_map.get(category) + if class_map: + cls = class_map.get(stix_type) + else: + cls = cat_map["objects"].get(stix_type) \ + or cat_map["observables"].get(stix_type) \ + or cat_map["markings"].get(stix_type) + + # Left "observable-extensions" out; it has a different + # substructure. A version->category->type lookup would result + # in another map, not a class. So it doesn't fit the pattern. + + return cls diff --git a/stix2/test/test_spec_version_detect.py b/stix2/test/test_spec_version_detect.py index 70390246..570cc8e7 100644 --- a/stix2/test/test_spec_version_detect.py +++ b/stix2/test/test_spec_version_detect.py @@ -2,7 +2,7 @@ import pytest -from stix2.parsing import _detect_spec_version +from stix2.utils import detect_spec_version @pytest.mark.parametrize( @@ -17,7 +17,7 @@ "name": "alice", "identity_class": "individual", }, - "v20", + "2.0", ), ( { @@ -29,14 +29,14 @@ "target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f", "relationship_type": "targets", }, - "v20", + "2.0", ), ( { "type": "file", "name": "notes.txt", }, - "v20", + "2.0", ), ( { @@ -48,7 +48,7 @@ "statement": "Copyright (c) ACME Corp.", }, }, - "v20", + "2.0", ), ( { @@ -75,7 +75,7 @@ }, ], }, - "v20", + "2.0", ), # STIX 2.1 examples ( @@ -87,7 +87,7 @@ "modified": "2001-07-01T09:33:17.000Z", "name": "alice", }, - "v21", + "2.1", ), ( { @@ -100,7 +100,7 @@ "target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f", "relationship_type": "targets", }, - "v21", + "2.1", ), ( { @@ -109,7 +109,7 @@ "spec_version": "2.1", "name": "notes.txt", }, - "v21", + "2.1", ), ( { @@ -117,7 +117,7 @@ "id": "file--5eef3404-6a94-4db3-9a1a-5684cbea0dfe", "name": "notes.txt", }, - "v21", + "2.1", ), ( { @@ -131,7 +131,7 @@ "tlp": "green", }, }, - "v21", + "2.1", ), ( { @@ -153,7 +153,7 @@ }, ], }, - "v21", + "2.1", ), # Mixed spec examples ( @@ -180,7 +180,7 @@ }, ], }, - "v21", + "2.1", ), ( { @@ -202,11 +202,11 @@ }, ], }, - "v21", + "2.1", ), ], ) def test_spec_version_detect(obj_dict, expected_ver): - detected_ver = _detect_spec_version(obj_dict) + detected_ver = detect_spec_version(obj_dict) assert detected_ver == expected_ver diff --git a/stix2/test/test_utils_type_checks.py b/stix2/test/test_utils_type_checks.py new file mode 100644 index 00000000..2144653a --- /dev/null +++ b/stix2/test/test_utils_type_checks.py @@ -0,0 +1,262 @@ +import pytest + +import stix2.utils + +### +# Tests using types/behaviors common to STIX 2.0 and 2.1. +### + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "attack-pattern", + "campaign", + "course-of-action", + "identity", + "indicator", + "intrusion-set", + "malware", + "observed-data", + "report", + "threat-actor", + "tool", + "vulnerability", + ], +) +def test_is_sdo(type_, stix_version): + assert stix2.utils.is_sdo(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert stix2.utils.is_sdo(id_, stix_version) + + assert stix2.utils.is_stix_type( + type_, stix_version, stix2.utils.STIXTypeClass.SDO, + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "relationship", + "sighting", + "marking-definition", + "bundle", + "language-content", + "ipv4-addr", + "foo", + ], +) +def test_is_not_sdo(type_, stix_version): + assert not stix2.utils.is_sdo(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert not stix2.utils.is_sdo(id_, stix_version) + + d = { + "type": type_, + } + assert not stix2.utils.is_sdo(d, stix_version) + + assert not stix2.utils.is_stix_type( + type_, stix_version, stix2.utils.STIXTypeClass.SDO, + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "artifact", + "autonomous-system", + "directory", + "domain-name", + "email-addr", + "email-message", + "file", + "ipv4-addr", + "ipv6-addr", + "mac-addr", + "mutex", + "network-traffic", + "process", + "software", + "url", + "user-account", + "windows-registry-key", + "x509-certificate", + ], +) +def test_is_sco(type_, stix_version): + assert stix2.utils.is_sco(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert stix2.utils.is_sco(id_, stix_version) + + assert stix2.utils.is_stix_type( + type_, stix_version, stix2.utils.STIXTypeClass.SCO, + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "identity", + "sighting", + "marking-definition", + "bundle", + "language-content", + "foo", + ], +) +def test_is_not_sco(type_, stix_version): + assert not stix2.utils.is_sco(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert not stix2.utils.is_sco(id_, stix_version) + + d = { + "type": type_, + } + assert not stix2.utils.is_sco(d, stix_version) + + assert not stix2.utils.is_stix_type( + type_, stix_version, stix2.utils.STIXTypeClass.SCO, + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "relationship", + "sighting", + ], +) +def test_is_sro(type_, stix_version): + assert stix2.utils.is_sro(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert stix2.utils.is_sro(id_, stix_version) + + assert stix2.utils.is_stix_type( + type_, stix_version, stix2.utils.STIXTypeClass.SRO, + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "identity", + "marking-definition", + "bundle", + "language-content", + "ipv4-addr", + "foo", + ], +) +def test_is_not_sro(type_, stix_version): + assert not stix2.utils.is_sro(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert not stix2.utils.is_sro(id_, stix_version) + + d = { + "type": type_, + } + assert not stix2.utils.is_sro(d, stix_version) + + assert not stix2.utils.is_stix_type( + type_, stix_version, stix2.utils.STIXTypeClass.SRO, + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +def test_is_marking(stix_version): + assert stix2.utils.is_marking("marking-definition", stix_version) + + id_ = "marking-definition--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert stix2.utils.is_marking(id_, stix_version) + + assert stix2.utils.is_stix_type( + "marking-definition", stix_version, "marking-definition", + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "identity", + "bundle", + "language-content", + "ipv4-addr", + "foo", + ], +) +def test_is_not_marking(type_, stix_version): + assert not stix2.utils.is_marking(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert not stix2.utils.is_marking(id_, stix_version) + + d = { + "type": type_, + } + assert not stix2.utils.is_marking(d, stix_version) + + assert not stix2.utils.is_stix_type( + type_, stix_version, "marking-definition", + ) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +@pytest.mark.parametrize( + "type_", [ + "identity", + "relationship", + "sighting", + "marking-definition", + "bundle", + "ipv4-addr", + ], +) +def test_is_object(type_, stix_version): + assert stix2.utils.is_object(type_, stix_version) + + id_ = type_ + "--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert stix2.utils.is_object(id_, stix_version) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +def test_is_not_object(stix_version): + assert not stix2.utils.is_object("foo", stix_version) + + id_ = "foo--a12fa04c-6586-4128-8d1a-cfe0d1c081f5" + assert not stix2.utils.is_object(id_, stix_version) + + d = { + "type": "foo", + } + assert not stix2.utils.is_object(d, stix_version) + + +@pytest.mark.parametrize("stix_version", ["2.0", "2.1"]) +def test_is_stix_type(stix_version): + + assert not stix2.utils.is_stix_type( + "foo", stix_version, stix2.utils.STIXTypeClass.SDO, "foo", + ) + + assert stix2.utils.is_stix_type( + "bundle", stix_version, "foo", "bundle", + ) + + assert stix2.utils.is_stix_type( + "identity", stix_version, + stix2.utils.STIXTypeClass.SDO, + stix2.utils.STIXTypeClass.SRO, + ) + + assert stix2.utils.is_stix_type( + "software", stix_version, + stix2.utils.STIXTypeClass.SDO, + stix2.utils.STIXTypeClass.SCO, + ) diff --git a/stix2/test/v20/test_custom.py b/stix2/test/v20/test_custom.py index 6ce4a62c..a83bf246 100644 --- a/stix2/test/v20/test_custom.py +++ b/stix2/test/v20/test_custom.py @@ -1044,9 +1044,8 @@ def test_register_custom_object_with_version(): } cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.0') - v = 'v20' - assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects'] + assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS['2.0']['objects'] # spec_version is not in STIX 2.0, and is required in 2.1, so this # suffices as a test for a STIX 2.0 object. assert "spec_version" not in cust_obj_1 @@ -1076,9 +1075,8 @@ class NewObservable2(object): def test_register_observable_with_version(): custom_obs = NewObservable2(property1="Test Observable") - v = 'v20' - assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.0']['observables'] def test_register_duplicate_observable_with_version(): @@ -1101,10 +1099,9 @@ def test_register_marking_with_version(): ) class NewObj2(): pass - v = 'v20' no = NewObj2(property1='something') - assert no._type in stix2.registry.STIX2_OBJ_MAPS[v]['markings'] + assert no._type in stix2.registry.STIX2_OBJ_MAPS['2.0']['markings'] def test_register_observable_extension_with_version(): @@ -1116,10 +1113,9 @@ def test_register_observable_extension_with_version(): class SomeCustomExtension2: pass - v = 'v20' example = SomeCustomExtension2(keys='test123') - assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['user-account'] + assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.0']['observable-extensions']['user-account'] def test_register_duplicate_observable_extension(): diff --git a/stix2/test/v20/test_parsing.py b/stix2/test/v20/test_parsing.py index 01c66073..6317e5a0 100644 --- a/stix2/test/v20/test_parsing.py +++ b/stix2/test/v20/test_parsing.py @@ -73,7 +73,6 @@ class NewMarking1: _properties = OrderedDict() registration._register_marking(NewMarking1, version='2.0') - v = 'v20' - assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings'] - assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) + assert NewMarking1._type in registry.STIX2_OBJ_MAPS['2.0']['markings'] + assert 'v20' in str(registry.STIX2_OBJ_MAPS['2.0']['markings'][NewMarking1._type]) diff --git a/stix2/test/v20/test_utils.py b/stix2/test/v20/test_utils.py index de66332c..04439337 100644 --- a/stix2/test/v20/test_utils.py +++ b/stix2/test/v20/test_utils.py @@ -237,3 +237,146 @@ def test_find_property_index(object, tuple_to_find, expected_index): ) def test_iterate_over_values(dict_value, tuple_to_find, expected_index): assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index + + +@pytest.mark.parametrize( + "type_", [ + "attack-pattern", + "campaign", + "course-of-action", + "identity", + "indicator", + "intrusion-set", + "malware", + "observed-data", + "report", + "threat-actor", + "tool", + "vulnerability", + ], +) +def test_is_sdo_dict(type_): + d = { + "type": type_, + } + assert stix2.utils.is_sdo(d, "2.0") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "software", "spec_version": "2.1"}, + {"type": "software"}, + {"type": "identity", "spec_version": "2.1"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "marking-definition"}, + {"type": "bundle", "spec_version": "2.1"}, + {"type": "bundle"}, + {"type": "language-content", "spec_version": "2.1"}, + {"type": "language-content"}, + {"type": "relationship", "spec_version": "2.1"}, + {"type": "relationship"}, + {"type": "foo", "spec_version": "2.1"}, + {"type": "foo"}, + ], +) +def test_is_not_sdo_dict(dict_): + assert not stix2.utils.is_sdo(dict_, "2.0") + + +def test_is_sco_dict(): + d = { + "type": "file", + } + + assert stix2.utils.is_sco(d, "2.0") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity"}, + {"type": "identity", "spec_version": "2.1"}, + {"type": "software", "spec_version": "2.1"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "marking-definition"}, + {"type": "bundle", "spec_version": "2.1"}, + {"type": "bundle"}, + {"type": "language-content", "spec_version": "2.1"}, + {"type": "language-content"}, + {"type": "relationship", "spec_version": "2.1"}, + {"type": "relationship"}, + {"type": "foo", "spec_version": "2.1"}, + {"type": "foo"}, + ], +) +def test_is_not_sco_dict(dict_): + assert not stix2.utils.is_sco(dict_, "2.0") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "relationship"}, + {"type": "sighting"}, + ], +) +def test_is_sro_dict(dict_): + assert stix2.utils.is_sro(dict_, "2.0") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity", "spec_version": "2.1"}, + {"type": "identity"}, + {"type": "software", "spec_version": "2.1"}, + {"type": "software"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "marking-definition"}, + {"type": "bundle", "spec_version": "2.1"}, + {"type": "bundle"}, + {"type": "language-content", "spec_version": "2.1"}, + {"type": "language-content"}, + {"type": "relationship", "spec_version": "2.1"}, + {"type": "sighting", "spec_version": "2.1"}, + {"type": "foo", "spec_version": "2.1"}, + {"type": "foo"}, + ], +) +def test_is_not_sro_dict(dict_): + assert not stix2.utils.is_sro(dict_, "2.0") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity"}, + {"type": "software"}, + {"type": "marking-definition"}, + { + "type": "bundle", + "id": "bundle--8f431680-6278-4767-ba43-5edb682d7086", + "spec_version": "2.0", + "objects": [ + {"type": "identity"}, + {"type": "software"}, + {"type": "marking-definition"}, + ], + }, + ], +) +def test_is_object_dict(dict_): + assert stix2.utils.is_object(dict_, "2.0") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity", "spec_version": "2.1"}, + {"type": "software", "spec_version": "2.1"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "bundle", "spec_version": "2.1"}, + {"type": "language-content", "spec_version": "2.1"}, + {"type": "relationship", "spec_version": "2.1"}, + {"type": "sighting", "spec_version": "2.1"}, + {"type": "foo", "spec_version": "2.1"}, + {"type": "foo"}, + ], +) +def test_is_not_object_dict(dict_): + assert not stix2.utils.is_object(dict_, "2.0") diff --git a/stix2/test/v21/test_custom.py b/stix2/test/v21/test_custom.py index f9cb574d..36e35487 100644 --- a/stix2/test/v21/test_custom.py +++ b/stix2/test/v21/test_custom.py @@ -1265,9 +1265,8 @@ def test_register_custom_object_with_version(): } cust_obj_1 = stix2.parsing.dict_to_stix2(custom_obj_1, version='2.1') - v = 'v21' - assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS[v]['objects'] + assert cust_obj_1.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['objects'] assert cust_obj_1.spec_version == "2.1" @@ -1295,9 +1294,8 @@ class NewObservable3(object): def test_register_observable(): custom_obs = NewObservable3(property1="Test Observable") - v = 'v21' - assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS[v]['observables'] + assert custom_obs.type in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables'] def test_register_duplicate_observable(): @@ -1323,10 +1321,9 @@ class NewExtension2(): pass example = NewExtension2(property1="Hi there") - v = 'v21' - assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS[v]['observables'] - assert example._type in stix2.registry.STIX2_OBJ_MAPS[v]['observable-extensions']['domain-name'] + assert 'domain-name' in stix2.registry.STIX2_OBJ_MAPS['2.1']['observables'] + assert example._type in stix2.registry.STIX2_OBJ_MAPS['2.1']['observable-extensions']['domain-name'] def test_register_duplicate_observable_extension(): diff --git a/stix2/test/v21/test_parsing.py b/stix2/test/v21/test_parsing.py index a68d9fea..f23eb7d0 100644 --- a/stix2/test/v21/test_parsing.py +++ b/stix2/test/v21/test_parsing.py @@ -78,10 +78,9 @@ class NewMarking1: _properties = OrderedDict() registration._register_marking(NewMarking1, version='2.1') - v = 'v21' - assert NewMarking1._type in registry.STIX2_OBJ_MAPS[v]['markings'] - assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking1._type]) + assert NewMarking1._type in registry.STIX2_OBJ_MAPS['2.1']['markings'] + assert 'v21' in str(registry.STIX2_OBJ_MAPS['2.1']['markings'][NewMarking1._type]) @pytest.mark.xfail(reason="The default version is not 2.1", condition=DEFAULT_VERSION != "2.1") @@ -92,7 +91,6 @@ class NewMarking2: _properties = OrderedDict() registration._register_marking(NewMarking2) - v = 'v21' - assert NewMarking2._type in registry.STIX2_OBJ_MAPS[v]['markings'] - assert v in str(registry.STIX2_OBJ_MAPS[v]['markings'][NewMarking2._type]) + assert NewMarking2._type in registry.STIX2_OBJ_MAPS['2.1']['markings'] + assert 'v21' in str(registry.STIX2_OBJ_MAPS['2.1']['markings'][NewMarking2._type]) diff --git a/stix2/test/v21/test_utils.py b/stix2/test/v21/test_utils.py index 41bc087e..6d108d48 100644 --- a/stix2/test/v21/test_utils.py +++ b/stix2/test/v21/test_utils.py @@ -241,3 +241,153 @@ def test_find_property_index(object, tuple_to_find, expected_index): ) def test_iterate_over_values(dict_value, tuple_to_find, expected_index): assert stix2.serialization._find_property_in_seq(dict_value.values(), *tuple_to_find) == expected_index + + +@pytest.mark.parametrize( + "type_", [ + "attack-pattern", + "campaign", + "course-of-action", + "identity", + "indicator", + "intrusion-set", + "malware", + "observed-data", + "report", + "threat-actor", + "tool", + "vulnerability", + + # New in 2.1 + "grouping", + "infrastructure", + "location", + "malware-analysis", + "note", + "opinion", + ], +) +def test_is_sdo_dict(type_): + d = { + "type": type_, + "spec_version": "2.1", + } + assert stix2.utils.is_sdo(d, "2.1") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "software", "spec_version": "2.1"}, + {"type": "software"}, + {"type": "identity"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "marking-definition"}, + {"type": "bundle", "spec_version": "2.1"}, + {"type": "bundle"}, + {"type": "language-content", "spec_version": "2.1"}, + {"type": "language-content"}, + {"type": "relationship", "spec_version": "2.1"}, + {"type": "relationship"}, + {"type": "foo", "spec_version": "2.1"}, + ], +) +def test_is_not_sdo_dict(dict_): + assert not stix2.utils.is_sdo(dict_, "2.1") + + +def test_is_sco_dict(): + d = { + "type": "file", + "spec_version": "2.1", + } + + assert stix2.utils.is_sco(d, "2.1") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity"}, + {"type": "identity", "spec_version": "2.1"}, + {"type": "software"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "marking-definition"}, + {"type": "bundle", "spec_version": "2.1"}, + {"type": "bundle"}, + {"type": "language-content", "spec_version": "2.1"}, + {"type": "language-content"}, + {"type": "relationship", "spec_version": "2.1"}, + {"type": "relationship"}, + {"type": "foo", "spec_version": "2.1"}, + ], +) +def test_is_not_sco_dict(dict_): + assert not stix2.utils.is_sco(dict_, "2.1") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "relationship", "spec_version": "2.1"}, + {"type": "sighting", "spec_version": "2.1"}, + ], +) +def test_is_sro_dict(dict_): + assert stix2.utils.is_sro(dict_, "2.1") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity", "spec_version": "2.1"}, + {"type": "identity"}, + {"type": "software", "spec_version": "2.1"}, + {"type": "software"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "marking-definition"}, + {"type": "bundle", "spec_version": "2.1"}, + {"type": "bundle"}, + {"type": "language-content", "spec_version": "2.1"}, + {"type": "language-content"}, + {"type": "relationship"}, + {"type": "sighting"}, + {"type": "foo", "spec_version": "2.1"}, + ], +) +def test_is_not_sro_dict(dict_): + assert not stix2.utils.is_sro(dict_, "2.1") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity", "spec_version": "2.1"}, + {"type": "software", "spec_version": "2.1"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "language-content", "spec_version": "2.1"}, + { + "type": "bundle", + "id": "bundle--8f431680-6278-4767-ba43-5edb682d7086", + "objects": [ + {"type": "identity", "spec_version": "2.1"}, + {"type": "software", "spec_version": "2.1"}, + {"type": "marking-definition", "spec_version": "2.1"}, + {"type": "language-content", "spec_version": "2.1"}, + ], + }, + ], +) +def test_is_object_dict(dict_): + assert stix2.utils.is_object(dict_, "2.1") + + +@pytest.mark.parametrize( + "dict_", [ + {"type": "identity"}, + {"type": "software"}, + {"type": "marking-definition"}, + {"type": "bundle"}, + {"type": "language-content"}, + {"type": "relationship"}, + {"type": "sighting"}, + {"type": "foo"}, + ], +) +def test_is_not_object_dict(dict_): + assert not stix2.utils.is_object(dict_, "2.1") diff --git a/stix2/test/v21/test_versioning.py b/stix2/test/v21/test_versioning.py index f10877ff..051fb2e4 100644 --- a/stix2/test/v21/test_versioning.py +++ b/stix2/test/v21/test_versioning.py @@ -346,6 +346,38 @@ def test_version_sco_with_custom(): assert revoked_obj.revoked +def test_version_sco_id_contributing_properties(): + file_sco_obj = stix2.v21.File( + name="data.txt", + created="1973-11-23T02:31:37Z", + modified="1991-05-13T19:24:57Z", + revoked=False, + allow_custom=True, + ) + + with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as e: + stix2.versioning.new_version(file_sco_obj, name="foo.dat") + + assert e.value.unchangable_properties == {"name"} + + +def test_version_sco_id_contributing_properties_dict(): + file_sco_dict = { + "type": "file", + "id": "file--c27c572c-2e17-5ce1-817e-67bb97629a56", + "spec_version": "2.1", + "name": "data.txt", + "created": "1973-11-23T02:31:37Z", + "modified": "1991-05-13T19:24:57Z", + "revoked": False, + } + + with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as e: + stix2.versioning.new_version(file_sco_dict, name="foo.dat") + + assert e.value.unchangable_properties == {"name"} + + def test_version_disable_custom(): m = stix2.v21.Malware( name="foo", description="Steals your identity!", is_family=False, diff --git a/stix2/utils.py b/stix2/utils.py index 22efcc24..3e272f8e 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -1,5 +1,6 @@ """Utility functions and classes for the STIX2 library.""" +import collections.abc import datetime as dt import enum import json @@ -8,7 +9,8 @@ import pytz import six -import stix2 +import stix2.registry as mappings +import stix2.version # Sentinel value for properties that should be set to the current time. # We can't use the standard 'default' approach, since if there are multiple @@ -313,18 +315,262 @@ def get_type_from_id(stix_id): return stix_id.split('--', 1)[0] -def is_marking(obj_or_id): - """Determines whether the given object or object ID is/is for a marking - definition. +def detect_spec_version(stix_dict): + """ + Given a dict representing a STIX object, try to detect what spec version + it is likely to comply with. + + :param stix_dict: A dict with some STIX content. Must at least have a + "type" property. + :return: A STIX version in "X.Y" format + """ + + obj_type = stix_dict["type"] + + if 'spec_version' in stix_dict: + # For STIX 2.0, applies to bundles only. + # For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only. + v = stix_dict['spec_version'] + elif "id" not in stix_dict: + # Only 2.0 SCOs don't have ID properties + v = "2.0" + elif obj_type == 'bundle': + # Bundle without a spec_version property: must be 2.1. But to + # future-proof, use max version over all contained SCOs, with 2.1 + # minimum. + v = max( + "2.1", + max( + detect_spec_version(obj) for obj in stix_dict["objects"] + ), + ) + elif obj_type in mappings.STIX2_OBJ_MAPS["2.1"]["observables"]: + # Non-bundle object with an ID and without spec_version. Could be a + # 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO... + v = "2.1" + else: + # Not a 2.1 SCO; must be a 2.0 object. + v = "2.0" + + return v + + +def _stix_type_of(value): + """ + Get a STIX type from the given value: if a STIX ID is passed, the type + prefix is extracted; if string which is not a STIX ID is passed, it is + assumed to be a STIX type and is returned; otherwise it is assumed to be a + mapping with a "type" property, and the value of that property is returned. + + :param value: A mapping with a "type" property, or a STIX ID or type + as a string + :return: A STIX type + """ + if isinstance(value, str): + if "--" in value: + type_ = get_type_from_id(value) + else: + type_ = value + else: + type_ = value["type"] + + return type_ + + +def is_sdo(value, stix_version=stix2.version.DEFAULT_VERSION): + """ + Determine whether the given object, type, or ID is/is for an SDO of the + given STIX version. If value is a type or ID, this just checks whether + the type was registered as an SDO in the given STIX version. If a mapping, + *simple* STIX version inference is additionally done on the value, and the + result is checked against stix_version. It does not attempt to fully + validate the value. + + :param value: A mapping with a "type" property, or a STIX ID or type + as a string + :param stix_version: A STIX version as a string + :return: True if the type of the given value is an SDO type of the given + version; False if not + """ + + result = True + if isinstance(value, collections.abc.Mapping): + value_stix_version = detect_spec_version(value) + if value_stix_version != stix_version: + result = False + + if result: + cls_maps = mappings.STIX2_OBJ_MAPS[stix_version] + type_ = _stix_type_of(value) + result = type_ in cls_maps["objects"] and type_ not in { + "relationship", "sighting", "marking-definition", "bundle", + "language-content", + } + + return result + + +def is_sco(value, stix_version=stix2.version.DEFAULT_VERSION): + """ + Determine whether the given object, type, or ID is/is for an SCO of the + given STIX version. If value is a type or ID, this just checks whether + the type was registered as an SCO in the given STIX version. If a mapping, + *simple* STIX version inference is additionally done on the value, and the + result is checked against stix_version. It does not attempt to fully + validate the value. + + :param value: A mapping with a "type" property, or a STIX ID or type + as a string + :param stix_version: A STIX version as a string + :return: True if the type of the given value is an SCO type of the given + version; False if not + """ + + result = True + if isinstance(value, collections.abc.Mapping): + value_stix_version = detect_spec_version(value) + if value_stix_version != stix_version: + result = False + + if result: + cls_maps = mappings.STIX2_OBJ_MAPS[stix_version] + type_ = _stix_type_of(value) + result = type_ in cls_maps["observables"] - :param obj_or_id: A STIX object or object ID as a string. - :return: True if a marking definition, False otherwise. + return result + + +def is_sro(value, stix_version=stix2.version.DEFAULT_VERSION): + """ + Determine whether the given object, type, or ID is/is for an SRO of the + given STIX version. If value is a type or ID, this just checks whether + the type is "sighting" or "relationship". If a mapping, *simple* STIX + version inference is additionally done on the value, and the result is + checked against stix_version. It does not attempt to fully validate the + value. + + :param value: A mapping with a "type" property, or a STIX ID or type + as a string + :param stix_version: A STIX version as a string + :return: True if the type of the given value is an SRO type of the given + version; False if not """ - if isinstance(obj_or_id, (stix2.base._STIXBase, dict)): - result = obj_or_id["type"] == "marking-definition" + result = True + if isinstance(value, collections.abc.Mapping): + value_stix_version = detect_spec_version(value) + if value_stix_version != stix_version: + result = False + + if result: + # No need to check registration in this case + type_ = _stix_type_of(value) + result = type_ in ("sighting", "relationship") + + return result + + +def is_object(value, stix_version=stix2.version.DEFAULT_VERSION): + """ + Determine whether an object, type, or ID is/is for any STIX object. This + includes all SDOs, SCOs, meta-objects, and bundle. If value is a type or + ID, this just checks whether the type was registered in the given STIX + version. If a mapping, *simple* STIX version inference is additionally + done on the value, and the result is checked against stix_version. It does + not attempt to fully validate the value. + + :param value: A mapping with a "type" property, or a STIX ID or type + as a string + :param stix_version: A STIX version as a string + :return: True if the type of the given value is a valid STIX type with + respect to the given STIX version; False if not + """ + + result = True + if isinstance(value, collections.abc.Mapping): + value_stix_version = detect_spec_version(value) + if value_stix_version != stix_version: + result = False + + if result: + cls_maps = mappings.STIX2_OBJ_MAPS[stix_version] + type_ = _stix_type_of(value) + result = type_ in cls_maps["observables"] \ + or type_ in cls_maps["objects"] + + return result + + +def is_marking(value, stix_version=stix2.version.DEFAULT_VERSION): + """ + Determine whether the given object, type, or ID is/is for an marking + definition of the given STIX version. If value is a type or ID, this just + checks whether the type is "marking-definition". If a mapping, *simple* + STIX version inference is additionally done on the value, and the result + is checked against stix_version. It does not attempt to fully validate the + value. + + :param value: A STIX object, object ID, or type as a string. + :param stix_version: A STIX version as a string + :return: True if the value is/is for a marking definition, False otherwise. + """ + + result = True + if isinstance(value, collections.abc.Mapping): + value_stix_version = detect_spec_version(value) + if value_stix_version != stix_version: + result = False + + if result: + # No need to check registration in this case + type_ = _stix_type_of(value) + result = type_ == "marking-definition" + + return result + + +class STIXTypeClass(enum.Enum): + """ + Represents different classes of STIX type. + """ + SDO = 0 + SCO = 1 + SRO = 2 + + +def is_stix_type(value, stix_version=stix2.version.DEFAULT_VERSION, *types): + """ + Determine whether the type of the given value satisfies the given + constraints. 'types' must contain STIX types as strings, and/or the + STIXTypeClass enum values. STIX types imply an exact match constraint; + STIXTypeClass enum values imply a more general constraint, that the object + or type be in that class of STIX type. These constraints are implicitly + OR'd together. + + :param value: A mapping with a "type" property, or a STIX ID or type + as a string + :param stix_version: A STIX version as a string + :param types: A sequence of STIX type strings or STIXTypeClass enum values + :return: True if the object or type satisfies the constraints; False if not + """ + + for type_ in types: + if type_ is STIXTypeClass.SDO: + result = is_sdo(value, stix_version) + elif type_ is STIXTypeClass.SCO: + result = is_sco(value, stix_version) + elif type_ is STIXTypeClass.SRO: + result = is_sro(value, stix_version) + else: + # Assume a string STIX type is given instead of a class enum, + # and just check for exact match. + obj_type = _stix_type_of(value) + result = obj_type == type_ and is_object(value, stix_version) + + if result: + break + else: - # it's a string ID - result = obj_or_id.startswith("marking-definition--") + result = False return result diff --git a/stix2/versioning.py b/stix2/versioning.py index e66f3945..01affe9c 100644 --- a/stix2/versioning.py +++ b/stix2/versioning.py @@ -1,16 +1,17 @@ """STIX2 core versioning methods.""" +from collections.abc import Mapping import copy import datetime as dt import itertools import uuid -import six -from six.moves.collections_abc import Mapping - import stix2.base import stix2.registry -from stix2.utils import get_timestamp, parse_into_datetime +from stix2.utils import ( + detect_spec_version, get_timestamp, is_sco, is_sdo, is_sro, + parse_into_datetime, +) import stix2.v20 from .exceptions import ( @@ -74,58 +75,47 @@ def _is_versionable(data): """ is_versionable = False - is_21 = False - stix_vid = None + stix_version = None if isinstance(data, Mapping): # First, determine spec version. It's easy for our stix2 objects; more # work for dicts. - is_21 = False - if isinstance(data, stix2.base._STIXBase) and \ - not isinstance(data, stix2.v20._STIXBase20): - # (is_21 means 2.1 or later; try not to be 2.1-specific) - is_21 = True + if isinstance(data, stix2.v20._STIXBase20): + stix_version = "2.0" + elif isinstance(data, stix2.v21._STIXBase21): + stix_version = "2.1" elif isinstance(data, dict): - stix_vid = stix2.parsing._detect_spec_version(data) - is_21 = stix_vid != "v20" + stix_version = detect_spec_version(data) # Then, determine versionability. - if six.PY2: - # dumb python2 compatibility: map.keys() returns a list, not a set! - # six.viewkeys() compatibility function uses dict.viewkeys() on - # python2, which is not a Mapping mixin method, so that doesn't - # work either (for our stix2 objects). - keys = set(data) - else: - keys = data.keys() - # This should be sufficient for STIX objects; maybe we get lucky with # dicts here but probably not. - if keys >= _VERSIONING_PROPERTIES: + if data.keys() >= _VERSIONING_PROPERTIES: is_versionable = True # Tougher to handle dicts. We need to consider STIX version, map to a # registered class, and from that get a more complete picture of its # properties. elif isinstance(data, dict): - class_maps = stix2.registry.STIX2_OBJ_MAPS[stix_vid] obj_type = data["type"] - if obj_type in class_maps["objects"]: + if is_sdo(obj_type, stix_version) or is_sro(obj_type, stix_version): # Should we bother checking properties for SDOs/SROs? # They were designed to be versionable. is_versionable = True - elif obj_type in class_maps["observables"]: + elif is_sco(obj_type, stix_version): # but do check SCOs - cls = class_maps["observables"][obj_type] + cls = stix2.registry.class_for_type( + obj_type, stix_version, "observables", + ) is_versionable = _VERSIONING_PROPERTIES.issubset( cls._properties, ) - return is_versionable, is_21 + return is_versionable, stix_version def new_version(data, allow_custom=None, **kwargs): @@ -144,7 +134,7 @@ def new_version(data, allow_custom=None, **kwargs): :return: The new object. """ - is_versionable, is_21 = _is_versionable(data) + is_versionable, stix_version = _is_versionable(data) if not is_versionable: raise ValueError( @@ -165,10 +155,17 @@ def new_version(data, allow_custom=None, **kwargs): # probably were). That would imply an ID change, which is not allowed # across versions. sco_locked_props = [] - if is_21 and isinstance(data, stix2.base._Observable): + if is_sco(data, "2.1"): uuid_ = uuid.UUID(data["id"][-36:]) if uuid_.variant == uuid.RFC_4122 and uuid_.version == 5: - sco_locked_props = data._id_contributing_properties + if isinstance(data, stix2.base._Observable): + cls = data.__class__ + else: + cls = stix2.registry.class_for_type( + data["type"], stix_version, "observables", + ) + + sco_locked_props = cls._id_contributing_properties unchangable_properties = set() for prop in itertools.chain(STIX_UNMOD_PROPERTIES, sco_locked_props): @@ -179,7 +176,7 @@ def new_version(data, allow_custom=None, **kwargs): # Different versioning precision rules in STIX 2.0 vs 2.1, so we need # to know which rules to apply. - precision_constraint = "min" if is_21 else "exact" + precision_constraint = "min" if stix_version == "2.1" else "exact" cls = type(data) if 'modified' not in kwargs: @@ -189,7 +186,9 @@ def new_version(data, allow_custom=None, **kwargs): ) new_modified = get_timestamp() - new_modified = _fudge_modified(old_modified, new_modified, is_21) + new_modified = _fudge_modified( + old_modified, new_modified, stix_version == "2.1", + ) kwargs['modified'] = new_modified