Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WD 05 SCO #285

Merged
merged 19 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions stix2/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import collections
import copy
import datetime as dt
import uuid

import simplejson as json
import six

from stix2.canonicalization.Canonicalize import canonicalize

from .exceptions import (
AtLeastOnePropertyError, CustomContentError, DependentPropertiesError,
ExtraPropertiesError, ImmutableError, InvalidObjRefError,
Expand Down Expand Up @@ -125,6 +128,11 @@ def _check_at_least_one_property(self, list_of_properties=None):
list_of_properties.remove('type')
current_properties = self.properties_populated()
list_of_properties_populated = set(list_of_properties).intersection(current_properties)

if list_of_properties_populated == set(['id']) and isinstance(self, _Observable):
# Do not count the auto-generated id as a user-specified property
list_of_properties_populated = None

khdesai marked this conversation as resolved.
Show resolved Hide resolved
if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == set(['extensions'])):
raise AtLeastOnePropertyError(self.__class__, list_of_properties)

Expand Down Expand Up @@ -309,6 +317,11 @@ def __init__(self, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)

if 'id' not in kwargs:
possible_id = self._generate_id(kwargs)
if possible_id is not None:
kwargs['id'] = possible_id

super(_Observable, self).__init__(**kwargs)

def _check_ref(self, ref, prop, prop_name):
Expand Down Expand Up @@ -347,6 +360,61 @@ def _check_property(self, prop_name, prop, kwargs):
for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name)

def _generate_id(self, kwargs):
required_prefix = self._type + "--"
namespace = uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7")

try:
properties_to_use = self._id_contributing_properties
if properties_to_use:
streamlined_object = {}
if "hashes" in kwargs and "hashes" in properties_to_use:
possible_hash = self._choose_one_hash(kwargs["hashes"])
if possible_hash:
streamlined_object["hashes"] = possible_hash
for key in kwargs.keys():
khdesai marked this conversation as resolved.
Show resolved Hide resolved
if key in properties_to_use and key != "hashes":
if type(kwargs[key]) is dict:
khdesai marked this conversation as resolved.
Show resolved Hide resolved
for otherKey in kwargs[key]:
if isinstance(kwargs[key][otherKey], _STIXBase):
streamlined_object[key] = self._embed_obj_to_json(kwargs[key][otherKey])
else:
streamlined_object[key] = kwargs[key]
else:
if isinstance(kwargs[key], _STIXBase):
streamlined_object[key] = self._embed_obj_to_json(kwargs[key])
else:
streamlined_object[key] = kwargs[key]

if streamlined_object:
data = canonicalize(str(streamlined_object), utf8=False)
khdesai marked this conversation as resolved.
Show resolved Hide resolved
return required_prefix + str(uuid.uuid5(namespace, str(data)))
khdesai marked this conversation as resolved.
Show resolved Hide resolved
return None
except AttributeError:
khdesai marked this conversation as resolved.
Show resolved Hide resolved
# We ideally end up here if handling a 2.0 SCO
return None

def _choose_one_hash(self, hash_dict):
if "MD5" in hash_dict:
return {"MD5": hash_dict["MD5"]}
elif "SHA-1" in hash_dict:
return {"SHA-1": hash_dict["SHA-1"]}
elif "SHA-256" in hash_dict:
return {"SHA-256": hash_dict["SHA-256"]}
elif "SHA-512" in hash_dict:
return {"SHA-512": hash_dict["SHA-512"]}
else:
# Cheesy way to pick the first item in the dictionary, since its not indexable
for (k, v) in hash_dict.items():
break
return {k: v}
khdesai marked this conversation as resolved.
Show resolved Hide resolved

def _embed_obj_to_json(self, obj):
khdesai marked this conversation as resolved.
Show resolved Hide resolved
tmp_obj = dict(copy.deepcopy(obj))
for prop_name in obj._defaulted_optional_properties:
del tmp_obj[prop_name]
return tmp_obj


class _Extension(_STIXBase):

Expand Down
Loading