Skip to content

Commit

Permalink
Fix external references for data components and techniques
Browse files Browse the repository at this point in the history
  • Loading branch information
reuteras committed May 17, 2024
1 parent 034e65e commit 77e2e80
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/markdown_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ def create_data_source_notes(self):

for technique in related_data_source['techniques_used']:
detects = fix_description(technique['description'])
detects = description.replace('\n', '<br />')
detects = detects.replace('\n', '<br />')
content += f"| {technique['domain'][0]} | [[{technique['technique_name']} - {technique['technique_id']}\\|{technique['technique_id']}]] | [[{technique['technique_name']} - {technique['technique_id']}\\|{technique['technique_name']}]] | {detects} |\n"

content = convert_to_local_links(content)
Expand Down
35 changes: 25 additions & 10 deletions src/stix_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,20 +991,20 @@ def _get_data_sources(self):
data_source_obj.platforms = data_source.get('x_mitre_platforms', [])
data_source_obj.collection_layers = data_source.get('x_mitre_collection_layers', [])

# Get external references
# Get external references for the data source
ext_refs = data_source.get('external_references', [])

for ext_ref in ext_refs:
if ext_ref['source_name'] == 'mitre-attack':
data_source_obj.id = ext_ref['external_id']
data_source_obj.url = ext_ref['url']
elif 'url' in ext_ref:
elif 'url' in ext_ref and 'description' in ext_ref:
item = {'name': ext_ref['source_name'], 'url': ext_ref['url'], 'description': ext_ref['description']}
if ext_ref['source_name'] not in ext_ref_added:
data_source_obj.external_references = item
ext_ref_added.append(ext_ref['source_name'])

# Get techniques used by data source
# Get data components used by data source
data_source_relationships_enterprise = self.enterprise_attack.query([ Filter('x_mitre_data_source_ref', '=', data_source_obj.internal_id)])
data_source_relationships_mobile = self.mobile_attack.query([ Filter('x_mitre_data_source_ref', '=', data_source_obj.internal_id)])
data_source_relationships_ics = self.ics_attack.query([ Filter('x_mitre_data_source_ref', '=', data_source_obj.internal_id)])
Expand All @@ -1018,6 +1018,16 @@ def _get_data_sources(self):
data_component_description = relationship.get('description', '')
data_component_parent = data_source_obj.name

# Get external references for the data component
ext_refs = relationship.get('external_references', [])

for ext_ref in ext_refs:
if 'url' in ext_ref and 'description' in ext_ref:
item = {'name': ext_ref['source_name'], 'url': ext_ref['url'], 'description': ext_ref['description']}
if ext_ref['source_name'] not in ext_ref_added:
data_source_obj.external_references = item
ext_ref_added.append(ext_ref['source_name'])

# Get techniques used by data source
enterprise_technique_stix= self.enterprise_attack.query([Filter('type', '=', 'relationship'), Filter('relationship_type', '=', 'detects'), Filter('source_ref', '=', relationship['id'])])
mobile_technique_stix = self.mobile_attack.query([Filter('type', '=', 'relationship'), Filter('relationship_type', '=', 'detects'), Filter('source_ref', '=', relationship['id'])])
Expand All @@ -1028,6 +1038,17 @@ def _get_data_sources(self):
for techniques_relationship in techniques_used_stix:
technique_description = techniques_relationship.get('description', '')

# Get external references for the technique
ext_refs = techniques_relationship.get('external_references', [])

for ext_ref in ext_refs:
if 'url' in ext_ref and 'description' in ext_ref:
item = {'name': ext_ref['source_name'], 'url': ext_ref['url'], 'description': ext_ref['description']}
if ext_ref['source_name'] not in ext_ref_added:
data_source_obj.external_references = item
ext_ref_added.append(ext_ref['source_name'])

# Get technique name and id
if 'enterprise-attack' in techniques_relationship['x_mitre_domains']:
technique_stix = self.enterprise_attack.query([Filter('id', '=', techniques_relationship['target_ref'])])
elif 'mobile-attack' in techniques_relationship['x_mitre_domains']:
Expand All @@ -1037,17 +1058,11 @@ def _get_data_sources(self):

if technique_stix:
technique = technique_stix[0]
ext_refs = technique.get('external_references', [])
technique_name = technique['name']
ext_refs = technique.get('external_references', [])
for ext_ref in ext_refs:
if ext_ref['source_name'] == 'mitre-attack':
technique_id = ext_ref['external_id']
ext_refs = techniques_relationship.get('external_references', [])
if 'url' in ext_ref and 'description' in ext_ref:
item = {'name': ext_ref['source_name'].replace("/", "/"), 'url': ext_ref['url'], 'description': ext_ref['description']}
if ext_ref['source_name'] not in ext_ref_added:
data_source_obj.external_references = item
ext_ref_added.append(ext_ref['source_name'])

item = {'technique_name': technique_name.replace("/", "/"), 'technique_id': technique_id, 'description': technique_description, 'domain': techniques_relationship.get('x_mitre_domains', '')}
techniques_used.append(item)
Expand Down

0 comments on commit 77e2e80

Please sign in to comment.