Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WD 05 SCO #285

Merged
merged 19 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 99 additions & 6 deletions stix2/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import collections
import copy
import datetime as dt
import uuid

import simplejson as json
import six

from stix2.canonicalization.Canonicalize import canonicalize

from .exceptions import (
AtLeastOnePropertyError, DependentPropertiesError, ExtraPropertiesError,
ImmutableError, InvalidObjRefError, InvalidValueError,
Expand Down Expand Up @@ -112,10 +115,15 @@ def _check_mutually_exclusive_properties(self, list_of_properties, at_least_one=
def _check_at_least_one_property(self, list_of_properties=None):
if not list_of_properties:
list_of_properties = sorted(list(self.__class__._properties.keys()))
if 'type' in list_of_properties:
list_of_properties.remove('type')
if isinstance(self, _Observable):
props_to_remove = ["type", "id", "defanged", "spec_version"]
else:
props_to_remove = ["type"]

list_of_properties = [prop for prop in list_of_properties if prop not in props_to_remove]
current_properties = self.properties_populated()
list_of_properties_populated = set(list_of_properties).intersection(current_properties)

if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == set(['extensions'])):
raise AtLeastOnePropertyError(self.__class__, list_of_properties)

Expand Down Expand Up @@ -300,9 +308,26 @@ def __init__(self, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)

try:
# Since `spec_version` is optional, this is how we check for a 2.1 SCO
self._id_contributing_properties

if 'id' not in kwargs:
possible_id = self._generate_id(kwargs)
if possible_id is not None:
kwargs['id'] = possible_id
except AttributeError:
# End up here if handling a 2.0 SCO, and don't need to do anything further
pass

super(_Observable, self).__init__(**kwargs)

def _check_ref(self, ref, prop, prop_name):
"""
Only for checking `*_ref` or `*_refs` properties in spec_version 2.0
STIX Cyber Observables (SCOs)
"""

if '*' in self._STIXBase__valid_refs:
return # don't check if refs are valid

Expand Down Expand Up @@ -331,12 +356,52 @@ def _check_property(self, prop_name, prop, kwargs):
if prop_name not in kwargs:
return

from .properties import ObjectReferenceProperty
if prop_name.endswith('_ref'):
ref = kwargs[prop_name]
self._check_ref(ref, prop, prop_name)
elif prop_name.endswith('_refs'):
for ref in kwargs[prop_name]:
if isinstance(prop, ObjectReferenceProperty):
ref = kwargs[prop_name]
self._check_ref(ref, prop, prop_name)
elif prop_name.endswith('_refs'):
if isinstance(prop.contained, ObjectReferenceProperty):
for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name)

def _generate_id(self, kwargs):
required_prefix = self._type + "--"
namespace = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7")

properties_to_use = self._id_contributing_properties
if properties_to_use:
streamlined_obj_vals = []
if "hashes" in kwargs and "hashes" in properties_to_use:
possible_hash = _choose_one_hash(kwargs["hashes"])
if possible_hash:
streamlined_obj_vals.append(possible_hash)
for key in properties_to_use:
if key != "hashes" and key in kwargs:
if isinstance(kwargs[key], dict) or isinstance(kwargs[key], _STIXBase):
temp_deep_copy = copy.deepcopy(dict(kwargs[key]))
_recursive_stix_to_dict(temp_deep_copy)
streamlined_obj_vals.append(temp_deep_copy)
elif isinstance(kwargs[key], list) and isinstance(kwargs[key][0], _STIXBase):
for obj in kwargs[key]:
temp_deep_copy = copy.deepcopy(dict(obj))
_recursive_stix_to_dict(temp_deep_copy)
streamlined_obj_vals.append(temp_deep_copy)
else:
streamlined_obj_vals.append(kwargs[key])

if streamlined_obj_vals:
data = canonicalize(streamlined_obj_vals, utf8=False)
# print (str(type(data)))
try:
return required_prefix + six.text_type(uuid.uuid5(namespace, data))
except UnicodeDecodeError:
return required_prefix + six.text_type(uuid.uuid5(namespace, six.binary_type(data)))
khdesai marked this conversation as resolved.
Show resolved Hide resolved
# return required_prefix + six.text_type(uuid.uuid5(namespace, data))

# We return None if there are no values specified for any of the id-contributing-properties
return None


class _Extension(_STIXBase):
Expand All @@ -346,6 +411,34 @@ def _check_object_constraints(self):
self._check_at_least_one_property()


def _choose_one_hash(hash_dict):
if "MD5" in hash_dict:
return {"MD5": hash_dict["MD5"]}
elif "SHA-1" in hash_dict:
return {"SHA-1": hash_dict["SHA-1"]}
elif "SHA-256" in hash_dict:
return {"SHA-256": hash_dict["SHA-256"]}
elif "SHA-512" in hash_dict:
return {"SHA-512": hash_dict["SHA-512"]}
else:
k = next(iter(hash_dict), None)
if k is not None:
return {k: hash_dict[k]}


def _cls_init(cls, obj, kwargs):
if getattr(cls, '__init__', object.__init__) is not object.__init__:
cls.__init__(obj, **kwargs)


def _recursive_stix_to_dict(input_dict):
for key in input_dict:
if isinstance(input_dict[key], dict):
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], _STIXBase):
input_dict[key] = dict(input_dict[key])

# There may stil be nested _STIXBase objects
_recursive_stix_to_dict(input_dict[key])
else:
return
Loading