Skip to content

Commit

Permalink
Map events to OCSF's Security Finding class (#221)
Browse files Browse the repository at this point in the history
* Map events to OCSF's Security Finding class

* Improve models (inheritance). Add OCSF_CLASS env variable

* Move constants to the models

* Fix validation error
  • Loading branch information
AlexRuiz7 committed Nov 18, 2024
1 parent c4ed091 commit 5449aff
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 30 deletions.
3 changes: 2 additions & 1 deletion integrations/amazon-security-lake/src/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def lambda_handler(event, context):
account_id = os.environ['ACCOUNT_ID']
region = os.environ['AWS_REGION']
ocsf_bucket = os.environ.get('S3_BUCKET_OCSF')
ocsf_class = os.environ.get('OCSF_CLASS', 'SECURITY_FINDING')

# Extract bucket and key from S3 event
src_bucket = event['Records'][0]['s3']['bucket']['name']
Expand All @@ -150,7 +151,7 @@ def lambda_handler(event, context):
raw_events = get_events(src_bucket, key)

# Transform events to OCSF format
ocsf_events = wazuh_ocsf_converter.transform_events(raw_events)
ocsf_events = wazuh_ocsf_converter.transform_events(raw_events, ocsf_class)

# Upload event in OCSF format
ocsf_upload_success = False
Expand Down
62 changes: 50 additions & 12 deletions integrations/amazon-security-lake/src/models/ocsf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import pydantic
import typing
import abc


class AnalyticInfo(pydantic.BaseModel):
category: str
name: str
type_id: int
type_id: int = 1
uid: str


# Deprecated since v1.1.0. Use AnalyticInfo instead.
class Analytic(pydantic.BaseModel):
category: str
name: str
type: str = "Rule"
type_id: int = 1
uid: str


Expand All @@ -17,7 +27,7 @@ class TechniqueInfo(pydantic.BaseModel):
class AttackInfo(pydantic.BaseModel):
tactic: TechniqueInfo
technique: TechniqueInfo
version: str
version: str = "v13.1"


class FindingInfo(pydantic.BaseModel):
Expand All @@ -28,39 +38,67 @@ class FindingInfo(pydantic.BaseModel):
uid: str


# Deprecated since v1.1.0. Use FindingInfo instead.
class Finding(pydantic.BaseModel):
title: str
types: typing.List[str]
uid: str


class ProductInfo(pydantic.BaseModel):
name: str
lang: str
vendor_name: str


class Metadata(pydantic.BaseModel):
log_name: str
log_provider: str
product: ProductInfo
version: str
log_name: str = "Security events"
log_provider: str = "Wazuh"
product: ProductInfo = ProductInfo(
name="Wazuh",
lang="en",
vendor_name="Wazuh, Inc,."
)
version: str = "1.1.0"


class Resource(pydantic.BaseModel):
name: str
uid: str


class DetectionFinding(pydantic.BaseModel):
class FindingABC(pydantic.BaseModel, abc.ABC):
activity_id: int = 1
category_name: str = "Findings"
category_uid: int = 2
class_name: str = "Detection Finding"
class_uid: int = 2004
class_name: str
class_uid: int
count: int
message: str
finding_info: FindingInfo
metadata: Metadata
metadata: Metadata = Metadata()
raw_data: str
resources: typing.List[Resource]
risk_score: int
severity_id: int
status_id: int = 99
time: int
type_uid: int = 200401
type_uid: int
unmapped: typing.Dict[str, typing.List[str]] = pydantic.Field()


class DetectionFinding(FindingABC):
class_name: str = "Detection Finding"
class_uid: int = 2004
finding_info: FindingInfo
type_uid: int = 200401


# Deprecated since v1.1.0. Use DetectionFinding instead.
class SecurityFinding(FindingABC):
analytic: Analytic
attacks: typing.List[AttackInfo]
class_name: str = "Security Finding"
class_uid: int = 2001
finding: Finding
state_id: int = 1
type_uid: int = 200101
88 changes: 71 additions & 17 deletions integrations/amazon-security-lake/src/wazuh_ocsf_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import logging
from datetime import datetime


timestamp_pattern = "%Y-%m-%dT%H:%M:%S.%f%z"


def normalize(level: int) -> int:
"""
Normalizes rule level into the 0-6 range, required by OCSF.
Expand Down Expand Up @@ -39,7 +41,6 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
analytic=models.ocsf.AnalyticInfo(
category=", ".join(event.rule.groups),
name=event.decoder.name,
type_id=1,
uid=event.rule.id
),
attacks=[
Expand All @@ -51,26 +52,14 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
technique=models.ocsf.TechniqueInfo(
name=", ".join(event.rule.mitre.technique),
uid=", ".join(event.rule.mitre.id)
),
version="v13.1"
)
)
],
title=event.rule.description,
types=[event.input.type],
uid=event.id
)

metadata = models.ocsf.Metadata(
log_name="Security events",
log_provider="Wazuh",
product=models.ocsf.ProductInfo(
name="Wazuh",
lang="en",
vendor_name="Wazuh, Inc,."
),
version="1.1.0"
)

resources = [models.ocsf.Resource(
name=event.agent.name, uid=event.agent.id)]

Expand All @@ -88,7 +77,6 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
count=event.rule.firedtimes,
message=event.rule.description,
finding_info=finding_info,
metadata=metadata,
raw_data=event.full_log,
resources=resources,
risk_score=event.rule.level,
Expand All @@ -100,6 +88,69 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
logging.error(f"Error transforming event: {e}")
return {}


def to_security_finding(event: models.wazuh.Event) -> models.ocsf.SecurityFinding:
"""
Convert Wazuh security event to OCSF's Security Finding class.
"""
try:

analytic = models.ocsf.Analytic(
category=", ".join(event.rule.groups),
name=event.decoder.name,
uid=event.rule.id
)

attacks = [
models.ocsf.AttackInfo(
tactic=models.ocsf.TechniqueInfo(
name=", ".join(event.rule.mitre.tactic),
uid=", ".join(event.rule.mitre.id)
),
technique=models.ocsf.TechniqueInfo(
name=", ".join(event.rule.mitre.technique),
uid=", ".join(event.rule.mitre.id)
)
)
]

finding = models.ocsf.Finding(
title=event.rule.description,
types=[event.input.type],
uid=event.id
)

resources = [models.ocsf.Resource(
name=event.agent.name, uid=event.agent.id)]

severity_id = normalize(event.rule.level)

unmapped = {
"data_sources": [
event.location,
event.manager.name
],
"nist": event.rule.nist_800_53 # Array
}

return models.ocsf.SecurityFinding(
analytic=analytic,
attacks=attacks,
count=event.rule.firedtimes,
message=event.rule.description,
finding=finding,
raw_data=event.full_log,
resources=resources,
risk_score=event.rule.level,
severity_id=severity_id,
time=to_epoch(event.timestamp),
unmapped=unmapped
)
except AttributeError as e:
logging.error(f"Error transforming event: {e}")
return {}


def to_epoch(timestamp: str) -> int:
return int(datetime.strptime(timestamp, timestamp_pattern).timestamp())

Expand All @@ -115,7 +166,7 @@ def from_json(json_line: str) -> models.wazuh.Event:
print(e)


def transform_events(events: list) -> list:
def transform_events(events: list, ocsf_class: str) -> list:
"""
Transform a list of Wazuh security events (json string) to OCSF format.
"""
Expand All @@ -124,7 +175,10 @@ def transform_events(events: list) -> list:
for event in events:
try:
wazuh_event = from_json(event)
ocsf_event = to_detection_finding(wazuh_event).model_dump()
if ocsf_class == 'DETECTION_FINDING':
ocsf_event = to_detection_finding(wazuh_event).model_dump()
else:
ocsf_event = to_security_finding(wazuh_event).model_dump()
ocsf_events.append(ocsf_event)
except Exception as e:
logging.error(f"Error transforming line to OCSF: {e}")
Expand Down
1 change: 1 addition & 0 deletions integrations/docker/amazon-security-lake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ services:
SOURCE_LOCATION: "wazuh"
ACCOUNT_ID: "111111111111"
IS_DEV: true
OCSF_CLASS: SECURITY_FINDING
volumes:
- ../amazon-security-lake/src:/var/task
ports:
Expand Down

0 comments on commit 5449aff

Please sign in to comment.