Skip to content

Commit

Permalink
lots of tweaks for tabe creation, input_creation intial success
Browse files Browse the repository at this point in the history
  • Loading branch information
rpiazza committed Mar 29, 2024
1 parent 5f23033 commit cb2aacb
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 142 deletions.
282 changes: 154 additions & 128 deletions stix2/datastore/relational_db/input_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,126 +7,153 @@
SCO_COMMON_PROPERTIES, SDO_COMMON_PROPERTIES, canonicalize_table_name,
)
from stix2.properties import (
DictionaryProperty, EmbeddedObjectProperty, EnumProperty,
ExtensionsProperty, FloatProperty, IntegerProperty, ListProperty,
ReferenceProperty, StringProperty,
BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, EnumProperty,
ExtensionsProperty, FloatProperty, HashesProperty, HexProperty, IDProperty, IntegerProperty, ListProperty,
Property, ReferenceProperty, StringProperty, TimestampProperty,
)


def single_value(p):
return not isinstance(
p, (
EmbeddedObjectProperty,
ListProperty,
DictionaryProperty,
),
)
@add_method(Property)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
pass


def table_property(prop, name, core_properties):
if isinstance(prop, ListProperty) and name not in core_properties:
contained_property = prop.contained
return not isinstance(contained_property, (StringProperty, IntegerProperty, FloatProperty))
elif isinstance(prop, DictionaryProperty) and name not in core_properties:
return True
else:
return False
@add_method(BinaryProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}


def embedded_object_list_property(prop, name, core_properties):
if isinstance(prop, ListProperty) and name not in core_properties:
contained_property = prop.contained
return isinstance(contained_property, EmbeddedObjectProperty)
else:
return False
@add_method(BooleanProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}


def array_property(prop, name, core_properties):
if isinstance(prop, ListProperty) and name not in core_properties:
contained_property = prop.contained
return isinstance(contained_property, (StringProperty, IntegerProperty, FloatProperty, EnumProperty))
else:
return False
@add_method(DictionaryProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
bindings = {"id": stix_object["id"]}
table = data_sink.tables_dictionary[canonicalize_table_name(table_name + "_" + name,
schema_name)]
for idx, (name, value) in enumerate(stix_object.items()):
name_binding = f"name{idx}"
if len(self.value_types) == 1:
value_binding = f"value{idx}"
elif isinstance(value, int):
value_binding = f"integer_value{idx}"
else:
value_binding = f"string_value{idx}"

bindings[name_binding] = name
bindings[value_binding] = value

def derive_column_name(prop):
contained_property = prop.contained
if isinstance(contained_property, ReferenceProperty):
return "ref_id"
elif isinstance(contained_property, StringProperty):
return "value"
return [insert(table).values(bindings)]


def generate_insert_for_array_in_table(table, values, foreign_key_value):
@add_method(EmbeddedObjectProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return generate_insert_for_embedded_object(data_sink, stix_object[name], name, schema_name)

bindings = {
"id": foreign_key_value,
}

for idx, item in enumerate(values):
item_binding_name = f"item{idx}"
@add_method(EnumProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}

bindings[item_binding_name] = item

return [insert(table).values(bindings)]
@add_method(FloatProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}


def generate_single_values(stix_object, properties, core_properties=[]):
bindings = OrderedDict()
for name, prop in properties.items():
if (
single_value(prop) and (name == 'id' or name not in core_properties) or
array_property(prop, name, core_properties)
):
if name in stix_object and name != "type":
bindings[name] = stix_object[name] if not array_property(prop, name, core_properties) else "{" + ",".join(
['"' + x + '"' for x in stix_object[name]],
) + "}"
return bindings
@add_method(HexProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}


def generate_insert_for_embedded_object(type_name, item, foreign_key_value):
bindings = generate_single_values(item, item._properties)
bindings["id"] = foreign_key_value
return bindings
def generate_insert_for_hashes(data_sink, name, stix_object, table_name, schema_name):
bindings = {"id": stix_object["id"]}
table = data_sink.tables_dictionary[canonicalize_table_name(table_name + "_" + name,
schema_name)]

for idx, (hash_name, hash_value) in enumerate(stix_object["hashes".items()]):
hash_name_binding_name = "hash_name" + str(idx)
hash_value_binding_name = "hash_value" + str(idx)

def generate_insert_for_dictionary(item, dictionary_table, foreign_key_value, value_types):
bindings = {"id": foreign_key_value}
bindings[hash_name_binding_name] = hash_name
bindings[hash_value_binding_name] = hash_value

for idx, (name, value) in enumerate(item.items()):
name_binding = f"name{idx}"
if len(value_types) == 1:
value_binding = f"value{idx}"
elif isinstance(value, int):
value_binding = f"integer_value{idx}"
else:
value_binding = f"string_value{idx}"
return [insert(table).values(bindings)]

bindings[name_binding] = name
bindings[value_binding] = value

return [insert(dictionary_table).values(bindings)]
@add_method(HashesProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return generate_insert_for_hashes(data_sink, name, stix_object, table_name, schema_name)

@add_method(IDProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}


@add_method(IntegerProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}


@add_method(ListProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
if isinstance(self.contained, ReferenceProperty):
insert_statements = list()

table = data_sink.tables_dictionary[canonicalize_table_name(table_name + "_" + name)]
for idx, item in enumerate(stix_object[name]):
bindings = {
"id": stix_object["id"],
"ref_id": item
}
insert_statements.append(insert(table).values(bindings))
return insert_statements
elif isinstance(self.contained, EmbeddedObjectProperty):
insert_statements = list()
for value in stix_object[name]:
insert_statements.extend(generate_insert_for_embedded_object(data_sink, value, table_name + "_" + name, None))
table = data_sink.tables_dictionary[canonicalize_table_name(table_name + "_" + name, None)]
insert_statements.append(insert(table).values({"id": stix_object["id"], "ref_id": 1}))
return insert_statements
else:
return {name: stix_object[name]}


def generate_insert_for_embedded_objects(table, type_name, values, foreign_key_value):
bindings = dict()
for item in values:
bindings.extend(generate_insert_for_embedded_object(type_name, item, foreign_key_value))
return [insert(table).values(bindings)]
@add_method(ReferenceProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}


def generate_insert_for_hashes(hashes, hashes_table, foreign_key_value):
bindings = {"id": foreign_key_value}
@add_method(StringProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}

for idx, (hash_name, hash_value) in enumerate(hashes.items()):
hash_name_binding_name = "hash_name" + str(idx)
hash_value_binding_name = "hash_value" + str(idx)

bindings[hash_name_binding_name] = hash_name
bindings[hash_value_binding_name] = hash_value
@add_method(TimestampProperty)
def generate_insert_information(self, data_sink, name, stix_object, table_name, schema_name):
return {name: stix_object[name]}

return [insert(hashes_table).values(bindings)]

def derive_column_name(prop):
contained_property = prop.contained
if isinstance(contained_property, ReferenceProperty):
return "ref_id"
elif isinstance(contained_property, StringProperty):
return "value"


def generate_insert_for_array_in_table(table, values, foreign_key_value):
insert_statements = list()
for idx, item in enumerate(values):
bindings = {
"id": foreign_key_value,
"ref_id": item
}
insert_statements.append(insert(table).values(bindings))

return insert_statements


def generate_insert_for_external_references(data_sink, stix_object):
Expand All @@ -141,14 +168,8 @@ def generate_insert_for_external_references(data_sink, stix_object):
insert_statements.append(er_insert_statement)

if "hashes" in er:
hashes_table = data_sink.tables_dictionary[canonicalize_table_name("external_references_hashes", "sdo")]
insert_statements.extend(
generate_insert_for_hashes(
er["hashes"],
hashes_table,
stix_object["id"],
),
)
generate_insert_for_hashes(data_sink, "hashes", er["hashes"], "external_references_hashes", "sdo"))

return insert_statements

Expand Down Expand Up @@ -196,7 +217,7 @@ def generate_insert_for_core(data_sink, stix_object, core_properties, schema_nam
for prop_name, value in stix_object.items():

if prop_name in core_properties:
# stored in separate tables, skip here
# stored in separate tables below, skip here
if prop_name not in {"object_marking_refs", "granular_markings", "external_references", "type"}:
core_bindings[prop_name] = value

Expand Down Expand Up @@ -231,8 +252,30 @@ def generate_insert_for_core(data_sink, stix_object, core_properties, schema_nam
return insert_statements


def generate_insert_for_object(data_sink, stix_object, schema_name, foreign_key_value=None):
def generate_insert_for_embedded_object(data_sink, stix_object, type_name, schema_name):
insert_statements = list()
bindings = dict()
table_name = canonicalize_table_name(type_name, schema_name)
object_table = data_sink.tables_dictionary[table_name]
sub_insert_statements = list()
for name, prop in stix_object._properties.items():
if name in stix_object:
result = prop.generate_insert_information(data_sink, name, stix_object, table_name, schema_name)
if isinstance(result,dict):
bindings.update(result)
elif isinstance(result,list):
sub_insert_statements.extend(result)
else:
raise(ValueError(result))

insert_statements.append(insert(object_table).values(bindings))
insert_statements.extend(sub_insert_statements)
return insert_statements


def generate_insert_for_object(data_sink, stix_object, schema_name):
insert_statements = list()
bindings = dict()
stix_id = stix_object["id"]
if schema_name == "sco":
core_properties = SCO_COMMON_PROPERTIES
Expand All @@ -241,45 +284,28 @@ def generate_insert_for_object(data_sink, stix_object, schema_name, foreign_key_
type_name = stix_object["type"]
table_name = canonicalize_table_name(type_name, schema_name)
object_table = data_sink.tables_dictionary[table_name]
properties = stix_object._properties
insert_statements.extend(generate_insert_for_core(data_sink, stix_object, core_properties, schema_name))

bindings = generate_single_values(stix_object, properties, core_properties)
object_insert_statement = insert(object_table).values(bindings)
insert_statements.append(object_insert_statement)

sub_insert_statements = list()
for name, prop in stix_object._properties.items():
if isinstance(prop, DictionaryProperty) and not name == "extensions":
dictionary_table_name = canonicalize_table_name(type_name + "_" + name, schema_name)
dictionary_table = data_sink.tables_dictionary[dictionary_table_name]
insert_statements.extend(generate_insert_for_dictionary(stix_object[name], dictionary_table, stix_id))
if name == 'id' or name not in core_properties:
if name in stix_object:
result = prop.generate_insert_information(data_sink, name, stix_object, table_name, schema_name)
if isinstance(result,dict):
bindings.update(result)
elif isinstance(result,list):
sub_insert_statements.extend(result)
else:
raise(ValueError(result))

insert_statements.append(insert(object_table).values(bindings))
insert_statements.extend(sub_insert_statements)

if "external_references" in stix_object:
insert_statements.extend(generate_insert_for_external_references(data_sink, stix_object, "sdo"))
insert_statements.extend(generate_insert_for_external_references(data_sink, stix_object))

if "extensions" in stix_object:
for ex in stix_object["extensions"]:
insert_statements.extend(generate_insert_for_object(data_sink, ex, schema_name, stix_id))
for name, prop in properties.items():
if table_property(prop, name, core_properties):
if name in stix_object:
if embedded_object_list_property(prop, name, core_properties):
insert_statements.extend(
generate_insert_for_embedded_objects(
name,
stix_object[name],
stix_object["id"],
),
)
elif isinstance(prop, ExtensionsProperty):
pass
else:
insert_statements.extend(
generate_insert_for_array_in_table(
stix_object["type"],
name,
stix_object[name],
properties[name],
stix_object["id"], ),
)

return insert_statements
Loading

0 comments on commit cb2aacb

Please sign in to comment.