Skip to content

Commit

Permalink
Merge pull request #402 from chisholm/fix_deterministic_ids
Browse files Browse the repository at this point in the history
Fix deterministic ids
  • Loading branch information
clenk authored Jun 8, 2020
2 parents 41525f9 + 5a5484d commit 6faf6b9
Show file tree
Hide file tree
Showing 3 changed files with 481 additions and 201 deletions.
215 changes: 144 additions & 71 deletions stix2/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,23 +334,20 @@ class _Observable(_STIXBase):
def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])

self._allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
super(_Observable, self).__init__(**kwargs)

try:
# Since `spec_version` is optional, this is how we check for a 2.1 SCO
self._id_contributing_properties
if 'id' not in kwargs and not isinstance(self, stix2.v20._Observable):
# Specific to 2.1+ observables: generate a deterministic ID
id_ = self._generate_id()

if 'id' not in kwargs:
possible_id = self._generate_id(kwargs)
if possible_id is not None:
kwargs['id'] = possible_id
except AttributeError:
# End up here if handling a 2.0 SCO, and don't need to do anything further
pass

super(_Observable, self).__init__(**kwargs)
# Spec says fall back to UUIDv4 if no contributing properties were
# given. That's what already happened (the following is actually
# overwriting the default uuidv4), so nothing to do here.
if id_ is not None:
# Can't assign to self (we're immutable), so slip the ID in
# more sneakily.
self._inner["id"] = id_

def _check_ref(self, ref, prop, prop_name):
"""
Expand Down Expand Up @@ -396,42 +393,53 @@ def _check_property(self, prop_name, prop, kwargs):
for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name)

def _generate_id(self, kwargs):
required_prefix = self._type + "--"

properties_to_use = self._id_contributing_properties
if properties_to_use:
streamlined_object = {}
if "hashes" in kwargs and "hashes" in properties_to_use:
possible_hash = _choose_one_hash(kwargs["hashes"])
if possible_hash:
streamlined_object["hashes"] = possible_hash
for key in properties_to_use:
if key != "hashes" and key in kwargs:
if isinstance(kwargs[key], dict) or isinstance(kwargs[key], _STIXBase):
temp_deep_copy = copy.deepcopy(dict(kwargs[key]))
_recursive_stix_to_dict(temp_deep_copy)
streamlined_object[key] = temp_deep_copy
elif isinstance(kwargs[key], list):
temp_deep_copy = copy.deepcopy(kwargs[key])
_recursive_stix_list_to_dict(temp_deep_copy)
streamlined_object[key] = temp_deep_copy
else:
streamlined_object[key] = kwargs[key]
if streamlined_object:
data = canonicalize(streamlined_object, utf8=False)

# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data))
def _generate_id(self):
"""
Generate a UUIDv5 for this observable, using its "ID contributing
properties".
:return: The ID, or None if no ID contributing properties are set
"""

id_ = None
json_serializable_object = {}

for key in self._id_contributing_properties:

if key in self:
obj_value = self[key]

if key == "hashes":
serializable_value = _choose_one_hash(obj_value)

if serializable_value is None:
raise InvalidValueError(
self, key, "No hashes given",
)

else:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data.encode("utf-8")))
serializable_value = _make_json_serializable(obj_value)

# We return None if there are no values specified for any of the id-contributing-properties
return None
json_serializable_object[key] = serializable_value

if json_serializable_object:

data = canonicalize(json_serializable_object, utf8=False)

# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, data.encode("utf-8"),
)

id_ = "{}--{}".format(self._type, six.text_type(uuid_))

return id_


class _Extension(_STIXBase):
Expand All @@ -455,35 +463,100 @@ def _choose_one_hash(hash_dict):
if k is not None:
return {k: hash_dict[k]}

return None


def _cls_init(cls, obj, kwargs):
if getattr(cls, '__init__', object.__init__) is not object.__init__:
cls.__init__(obj, **kwargs)


def _recursive_stix_to_dict(input_dict):
for key in input_dict:
if isinstance(input_dict[key], dict):
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], _STIXBase):
input_dict[key] = dict(input_dict[key])
def _make_json_serializable(value):
"""
Make the given value JSON-serializable; required for the JSON canonicalizer
to work. This recurses into lists/dicts, converts stix objects to dicts,
etc. "Convenience" types this library uses as property values are
JSON-serialized to produce a JSON-serializable value. (So you will always
get strings for those.)
The conversion will not affect the passed in value.
:param value: The value to make JSON-serializable.
:return: The JSON-serializable value.
:raises ValueError: If value is None (since nulls are not allowed in STIX
objects).
"""
if value is None:
raise ValueError("Illegal null value found in a STIX object")

json_value = value # default assumption

if isinstance(value, Mapping):
json_value = {
k: _make_json_serializable(v)
for k, v in value.items()
}

elif isinstance(value, list):
json_value = [
_make_json_serializable(v)
for v in value
]

elif not isinstance(value, (int, float, six.string_types, bool)):
# If a "simple" value which is not already JSON-serializable,
# JSON-serialize to a string and use that as our JSON-serializable
# value. This applies to our datetime objects currently (timestamp
# properties), and could apply to any other "convenience" types this
# library uses for property values in the future.
json_value = json.dumps(value, ensure_ascii=False, cls=STIXJSONEncoder)

# If it looks like a string literal was output, strip off the quotes.
# Otherwise, a second pair will be added when it's canonicalized. Also
# to be extra safe, we need to unescape.
if len(json_value) >= 2 and \
json_value[0] == '"' and json_value[-1] == '"':
json_value = _un_json_escape(json_value[1:-1])

return json_value


_JSON_ESCAPE_RE = re.compile(r"\\.")
# I don't think I should need to worry about the unicode escapes (\uXXXX)
# since I use ensure_ascii=False when generating it. I will just fix all
# the other escapes, e.g. \n, \r, etc.
#
# This list is taken from RFC8259 section 7:
# https://tools.ietf.org/html/rfc8259#section-7
# Maps the second char of a "\X" style escape, to a replacement char
_JSON_ESCAPE_MAP = {
'"': '"',
"\\": "\\",
"/": "/",
"b": "\b",
"f": "\f",
"n": "\n",
"r": "\r",
"t": "\t"
}
def _un_json_escape(json_string):
"""
Removes JSON string literal escapes. We should undo these things Python's
serializer does, so we can ensure they're done canonically. The
canonicalizer should be in charge of everything, as much as is feasible.
# There may stil be nested _STIXBase objects
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], list):
_recursive_stix_list_to_dict(input_dict[key])
else:
pass
:param json_string: String literal output of Python's JSON serializer,
minus the surrounding quotes.
:return: The unescaped string
"""

def replace(m):
replacement = _JSON_ESCAPE_MAP.get(m.group(0)[1])
if replacement is None:
raise ValueError("Unrecognized JSON escape: " + m.group(0))

def _recursive_stix_list_to_dict(input_list):
for i in range(len(input_list)):
if isinstance(input_list[i], _STIXBase):
input_list[i] = dict(input_list[i])
elif isinstance(input_list[i], dict):
pass
elif isinstance(input_list[i], list):
_recursive_stix_list_to_dict(input_list[i])
else:
continue
_recursive_stix_to_dict(input_list[i])
return replacement

result = _JSON_ESCAPE_RE.sub(replace, json_string)

return result
Loading

0 comments on commit 6faf6b9

Please sign in to comment.