Skip to content

Commit

Permalink
[ZeroFox] add created_by_ref and opencti_observable_main_type to stix…
Browse files Browse the repository at this point in the history
… objects (#2625)
  • Loading branch information
DNRRomero authored Sep 15, 2024
1 parent 0179c54 commit 0f628eb
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 45 deletions.
6 changes: 4 additions & 2 deletions external-import/zerofox/src/collectors/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ def __init__(self, endpoint, mapper, client: ZeroFox):
self.mapper = mapper
self.client = client

def collect_intelligence(self, now: datetime, last_run_date: datetime, logger):
def collect_intelligence(
self, created_by, now: datetime, last_run_date: datetime, logger
):
stix_objects = []
missed_entries = 0
for entry in self.client.fetch_feed(self.endpoint, last_run_date):
try:
stix_data = self.mapper(now, entry)
stix_data = self.mapper(now, entry, created_by)
stix_objects += stix_data
except Exception as ex:
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from typing import List, Union

from open_cti import domain_object
from stix2 import Infrastructure, Location, Relationship
from stix2.v21.vocab import INFRASTRUCTURE_TYPE_COMMAND_AND_CONTROL
from zerofox.domain.botnet import Botnet


def botnet_to_infrastructure(
now: str, entry: Botnet
created_by, now: str, entry: Botnet
) -> List[Union[Infrastructure, Location, Relationship]]:
"""
Creates a STIX Infrastructure/botnet object from a ZeroFOX Botnet object,
Expand All @@ -18,7 +19,9 @@ def botnet_to_infrastructure(
"""
objects = []

botnet = Infrastructure(
botnet = domain_object(
created_by=created_by,
cls=Infrastructure,
name=f"{entry.bot_name}",
labels=entry.tags,
created=now,
Expand All @@ -27,7 +30,9 @@ def botnet_to_infrastructure(
)
objects.append(botnet)

ip_address = Infrastructure(
ip_address = domain_object(
created_by=created_by,
cls=Infrastructure,
name=f"{entry.ip_address}",
infrastructure_types="botnet",
)
Expand All @@ -42,17 +47,19 @@ def botnet_to_infrastructure(
)

if entry.c2_domain:
objects += get_c2_objects(entry, botnet)
objects += get_c2_objects(created_by, entry, botnet)

if entry.country_code:
objects += get_location_objects(entry, ip_address)
objects += get_location_objects(created_by, entry, ip_address)

# Return all created objects
return objects


def get_location_objects(entry, ip_address):
country = Location(
def get_location_objects(created_by, entry, ip_address):
country = domain_object(
created_by=created_by,
cls=Location,
country=entry.country_code,
postal_code=entry.zip_code if entry.zip_code else "",
)
Expand All @@ -65,12 +72,16 @@ def get_location_objects(entry, ip_address):
return [country, rel]


def get_c2_objects(entry: Botnet, botnet):
c2_domain = Infrastructure(
def get_c2_objects(created_by, entry: Botnet, botnet):
c2_domain = domain_object(
created_by=created_by,
cls=Infrastructure,
name=f"{entry.c2_domain}",
infrastructure_types=INFRASTRUCTURE_TYPE_COMMAND_AND_CONTROL,
)
c2_ip = Infrastructure(
c2_ip = domain_object(
created_by=created_by,
cls=Infrastructure,
name=f"{entry.c2_ip_address}",
infrastructure_types=INFRASTRUCTURE_TYPE_COMMAND_AND_CONTROL,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import ipaddress
from typing import List, Tuple, Union

from open_cti import observable
from open_cti.domain_objects import domain_object
from stix2 import Infrastructure, IPv4Address, IPv6Address, Relationship, Software
from zerofox.domain.c2Domains import C2Domain


def c2_domains_to_infrastructure(
now: str, entry: C2Domain
created_by: str,
now: str,
entry: C2Domain,
) -> List[Union[Infrastructure, Relationship, IPv4Address, IPv6Address]]:
"""
Creates a STIX Infrastructure/command-and-conrtol object from a ZeroFOX C2Domain object, along with
Expand All @@ -15,7 +19,9 @@ def c2_domains_to_infrastructure(
entry_tags = entry.tags
tags, tags_obtained_observables = _parse_tag_observables(entry_tags)

infrastructure = Infrastructure(
infrastructure = domain_object(
created_by=created_by,
cls=Infrastructure,
name=f"{entry.domain}",
labels=tags,
created=now,
Expand All @@ -28,7 +34,7 @@ def c2_domains_to_infrastructure(
entry, infrastructure, tags_obtained_observables
)
ip_addresses = (
[build_ip_stix_object(ip) for ip in entry.ip_addresses]
[build_ip_stix_object(created_by, ip) for ip in entry.ip_addresses]
if entry.ip_addresses
else []
)
Expand Down Expand Up @@ -72,19 +78,19 @@ def _get_observables_relationships(
return [
Relationship(
source_ref=infra.id,
target_ref=observable.id,
target_ref=obs.id,
relationship_type="consists-of",
start_time=entry.created_at,
)
for observable in observables
for obs in observables
]


def build_ip_stix_object(ip):
def build_ip_stix_object(created_by, ip):
version = ipaddress.ip_address(ip).version
if version == 4:
return IPv4Address(value=ip)
return observable(created_by=created_by, cls=IPv4Address, value=ip)
elif version == 6:
return IPv6Address(value=ip)
return observable(created_by=created_by, cls=IPv6Address, value=ip)
else:
raise ValueError(f"Invalid IP address: {ip}")
11 changes: 8 additions & 3 deletions external-import/zerofox/src/collectors/mappers/exploitToTool.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from typing import List, Union

from open_cti import domain_object
from stix2 import ExternalReference, Relationship, Tool, Vulnerability
from zerofox.domain.exploits import Exploit


def exploit_to_tool(
now: str, entry: Exploit
created_by, now: str, entry: Exploit
) -> List[Union[Tool, Vulnerability, Relationship]]:
"""
Creates a STIX Tool/exploitation object from a ZeroFOX Exploit object, along with
- A Vulnerability object for the tagetted CVE
"""
tool = Tool(
tool = domain_object(
created_by=created_by,
cls=Tool,
name=f"{entry.cve}",
description=f"```{entry.exploit}```",
created=now,
Expand All @@ -20,7 +23,9 @@ def exploit_to_tool(
],
tool_types=["exploitation"],
)
vulnerability = Vulnerability(name=entry.cve)
vulnerability = domain_object(
created_by=created_by, cls=Vulnerability, name=entry.cve
)

return [
tool,
Expand Down
24 changes: 19 additions & 5 deletions external-import/zerofox/src/collectors/mappers/malwareToMalware.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Union

from open_cti import domain_object, observable
from stix2 import URL, File, Indicator
from stix2 import Malware as stixMalware
from stix2 import Relationship
Expand All @@ -8,7 +9,7 @@


def malware_to_malware(
now: str, entry: Malware
created_by, _: str, entry: Malware
) -> List[Union[URL, File, Indicator, Relationship, stixMalware]]:
"""
Based on a ZeroFox Malware object, creates the following STIX objects:
Expand All @@ -17,9 +18,18 @@ def malware_to_malware(
- A set of Malware family objects for each of the malware families associated with the malware entry
- An Indicator observable object pointing the file hash to each of the malware families it is based on
"""
urls = [URL(value=c2) for c2 in entry.c2] if entry.c2 else []
urls = (
[observable(created_by=created_by, cls=URL, value=c2) for c2 in entry.c2]
if entry.c2
else []
)
malware_families = (
[stixMalware(name=family, is_family=True) for family in entry.family]
[
domain_object(
created_by=created_by, cls=stixMalware, name=family, is_family=True
)
for family in entry.family
]
if entry.family
else []
)
Expand All @@ -32,7 +42,9 @@ def malware_to_malware(
if entry.md5:
present_hashes["MD5"] = entry.md5

file = File(
file = observable(
created_by=created_by,
cls=File,
name=entry.sha256,
hashes={
"SHA-256": entry.sha256,
Expand All @@ -45,7 +57,9 @@ def malware_to_malware(
pattern_string += f" OR file:hashes.'{k}' = '{v}'"
pattern_string += "]"

indicator = Indicator(
indicator = domain_object(
created_by=created_by,
cls=Indicator,
name=entry.sha256,
pattern_type=PATTERN_TYPE_STIX,
pattern=pattern_string,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Union

from open_cti import domain_object, observable
from stix2 import (
URL,
AutonomousSystem,
Expand All @@ -11,7 +12,7 @@
from zerofox.domain.phishing import Phishing


def phishing_to_infrastructure(now: str, entry: Phishing) -> List[
def phishing_to_infrastructure(created_by, now: str, entry: Phishing) -> List[
Union[
Infrastructure,
Relationship,
Expand All @@ -29,16 +30,20 @@ def phishing_to_infrastructure(now: str, entry: Phishing) -> List[
- a X509Certificate object for the certificate authority and fingerprint, if present.
"""
phishing = Infrastructure(
phishing = domain_object(
created_by=created_by,
cls=Infrastructure,
name=f"{entry.domain}",
created=now,
infrastructure_types=["phishing"],
first_seen=entry.scanned,
external_references=[],
)
certificate_objects = build_certificate_objects(entry, phishing)
certificate_objects = build_certificate_objects(created_by, entry, phishing)

url = URL(
url = observable(
created_by=created_by,
cls=URL,
value=entry.url,
)

Expand All @@ -49,7 +54,9 @@ def phishing_to_infrastructure(now: str, entry: Phishing) -> List[
start_time=entry.scanned,
)

ip = IPv4Address(
ip = observable(
created_by=created_by,
cls=IPv4Address,
value=entry.host.ip,
)

Expand All @@ -60,7 +67,9 @@ def phishing_to_infrastructure(now: str, entry: Phishing) -> List[
start_time=entry.scanned,
)

asn = AutonomousSystem(
asn = observable(
created_by=created_by,
cls=AutonomousSystem,
number=entry.host.asn,
)

Expand All @@ -82,10 +91,12 @@ def phishing_to_infrastructure(now: str, entry: Phishing) -> List[
] + certificate_objects


def build_certificate_objects(entry: Phishing, stix_phishing):
def build_certificate_objects(created_by, entry: Phishing, stix_phishing):
if not entry.cert or not entry.cert.authority:
return []
certificate = X509Certificate(
certificate = observable(
created_by=created_by,
cls=X509Certificate,
issuer=entry.cert.authority,
hashes={"SHA-1": entry.cert.fingerprint} if entry.cert.fingerprint else None,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Union

from open_cti import domain_object, observable
from stix2 import File, Indicator
from stix2 import Malware as stixMalware
from stix2 import Relationship
Expand All @@ -8,7 +9,7 @@


def ransomware_to_malware(
now: str, entry: Ransomware
created_by, now: str, entry: Ransomware
) -> List[Union[Relationship, Indicator, File, stixMalware]]:
"""
Based on a ZeroFox Ransomware object, creates the following STIX objects:
Expand All @@ -25,7 +26,9 @@ def ransomware_to_malware(
else:
ransomware_name = entry.sha256

malware = stixMalware(
malware = domain_object(
created_by=created_by,
cls=stixMalware,
name=f"{ransomware_name}",
description=f"```{entry.ransom_note}```",
labels=[tag for tag in entry.tags if not tag.startswith("family:")],
Expand All @@ -44,7 +47,9 @@ def ransomware_to_malware(
if entry.md5:
present_hashes["MD5"] = entry.md5

file = File(
file = observable(
created_by=created_by,
cls=File,
name=entry.sha256,
hashes={
"SHA-256": entry.sha256,
Expand All @@ -57,7 +62,9 @@ def ransomware_to_malware(
pattern_string += f" OR file:hashes.'{k}' = '{v}'"
pattern_string += "]"

indicator = Indicator(
indicator = domain_object(
created_by=created_by,
cls=Indicator,
name=entry.sha256,
pattern_type=PATTERN_TYPE_STIX,
pattern=pattern_string,
Expand Down
Loading

0 comments on commit 0f628eb

Please sign in to comment.