Skip to content

Commit

Permalink
Add Pyhton module to implement Amazon Security Lake integration (#186)
Browse files Browse the repository at this point in the history
* Migrate from #147

* Update amazon-security-lake integration

- Improved documentation.
- Python code has been moved to `wazuh-indexer/integrations/amazon-security-lake/src`.
- Development environment now uses OpenSearch 2.12.0.
- The `wazuh.integration.security.lake` container now displays logs, by watching logstash's log file.
- [**NEEDS FIX**] As a temporary solution, the `INDEXER_USERNAME` and `INDEXER_PASSWORD` values have been added as an environment variable to the `wazuh.integration.security.lake` container. These values should be set at Dockerfile level, but isn't working, probably due to permission denied on invocation of the `setup.sh` script.
- [**NEEDS FIX**] As a temporary solution, the output file of the `indexer-to-file` pipeline as been moved to `/var/log/logstash/indexer-to-file`. Previous path `/usr/share/logstash/pipeline/indexer-to-file.json` results in permission denied.
- [**NEEDS FIX**] As a temporary solution, the input.opensearch.query has been replaced with `match_all`, as the previous one does not return any data, probably to the use of time filters `gt: now-1m`.
- Standard output enable for `/usr/share/logstash/pipeline/indexer-to-file.json`.
- [**NEEDS FIX**] ECS compatibility disabled: `echo "pipeline.ecs_compatibility: disabled" >> /etc/logstash/logstash.yml` -- to be included automatically
- Python3 environment path added to the `indexer-to-integrator` pipeline.

* Disable ECS compatibility (auto)

-  Adds pipeline.ecs_compatibility: disabled at Dockerfile level.
- Removes `INDEXER_USERNAME` and `INDEXER_PASSWORD` as environment variables on the `wazuh.integration.security.lake` container.

* Add @timestamp field to sample alerts

* Fix Logstash pipelines

* Add working indexer-to-s3 pipeline

* Add working Python script up to S3 upload

* Add latest changes

* Remove duplicated line
  • Loading branch information
AlexRuiz7 committed Jun 28, 2024
1 parent 2f9b727 commit 726f9b8
Show file tree
Hide file tree
Showing 20 changed files with 1,225 additions and 1,146 deletions.
27 changes: 24 additions & 3 deletions integrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,32 @@ docker compose -f ./docker/amazon-security-lake.yml up -d

This docker compose project will bring a *wazuh-indexer* node, a *wazuh-dashboard* node,
a *logstash* node and our event generator. On the one hand, the event generator will push events
constantly to the indexer. On the other hand, logstash will constantly query for new data and
deliver it to the integration Python program, also present in that node. Finally, the integration
module will prepare and send the data to the Amazon Security Lake's S3 bucket.
constantly to the indexer, on the `wazuh-alerts-4.x-sample` index by default (refer to the [events
generator](./tools/events-generator/README.md) documentation for customization options).
On the other hand, logstash will constantly query for new data and deliver it to the integration
Python program, also present in that node. Finally, the integration module will prepare and send the
data to the Amazon Security Lake's S3 bucket.
<!-- TODO continue with S3 credentials setup -->

Attach a terminal to the container and start the integration by starting logstash, as follows:

```console
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-integrator.conf --path.settings /etc/logstash
```

Unprocessed data can be sent to a file or to an S3 bucket.
```console
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-file.conf --path.settings /etc/logstash
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-s3.conf --path.settings /etc/logstash
```

All three pipelines are configured to fetch the latest data from the *wazuh-indexer* every minute. In
the case of `indexer-to-file`, the data is written at the same pace, whereas `indexer-to-s3`, data
is uploaded every 5 minutes.

For development or debugging purposes, you may want to enable hot-reload, test or debug on these files,
by using the `--config.reload.automatic`, `--config.test_and_exit` or `--debug` flags, respectively.

For production usage, follow the instructions in our documentation page about this matter.
(_when-its-done_)

Expand Down
9 changes: 7 additions & 2 deletions integrations/amazon-security-lake/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN pip install -r /app/requirements.txt
FROM python:3.9
ENV LOGSTASH_KEYSTORE_PASS="SecretPassword"
# Add the application source code.
COPY --chown=logstash:logstash . /home/app
COPY --chown=logstash:logstash ./src /home/app
# Add execution persmissions.
RUN chmod a+x /home/app/run.py
# Copy the application's dependencies.
Expand All @@ -38,4 +38,9 @@ RUN chown --recursive logstash:logstash /usr/share/logstash /etc/logstash /var/l
USER logstash
# Copy and run the setup.sh script to create and configure a keystore for Logstash.
COPY --chown=logstash:logstash logstash/setup.sh /usr/share/logstash/bin/setup.sh
RUN bash /usr/share/logstash/bin/setup.sh
RUN bash /usr/share/logstash/bin/setup.sh

# Disable ECS compatibility
RUN `echo "pipeline.ecs_compatibility: disabled" >> /etc/logstash/logstash.yml`

WORKDIR /home/app
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ input {
}
}
}'
target => "_source"
schedule => "* * * * *"
}
}


output {
stdout {
id => "output.stdout"
codec => json_lines
}
file {
path => "/usr/share/logstash/pipeline/indexer-to-file.json"
id => "output.file"
path => "/var/log/logstash/indexer-to-file-%{+YYYY-MM-dd-HH}.log"
file_mode => 0644
codec => json_lines
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@ input {
}
}
}'
target => "_source"
schedule => "* * * * *"
}
}

output {
pipe {
id => "securityLake"
message_format => "%{_source}"
ttl => "10"
command => "/usr/bin/env python3 /usr/local/bin/run.py -d"
}
stdout {
id => "output.stdout"
codec => json_lines
}
pipe {
id => "output.integrator"
ttl => "10"
command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py"
# command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@ input {
}
}
}'
target => "_source"
schedule => "* * * * *"
schedule => "5/* * * * *"
}
}

output {
stdout { codec => rubydebug }
stdout {
id => "output.stdout"
codec => json_lines
}
s3 {
access_key_id => "<aws-access-key>"
secret_access_key => "<aws-secret-key>"
region => "<your-region>"
id => "output.s3"
access_key_id => "${AWS_KEY}"
secret_access_key => "${AWS_SECRET}"
region => "${AWS_REGION}"
endpoint => "http://s3.ninja:9000"
bucket => "${AWS_BUCKET}"
codec => "json"
retry_count => 0
validate_credentials_on_root_bucket => false
prefix => "%{+YYYY}/%{+MM}/%{+dd}"
server_side_encryption => true
server_side_encryption_algorithm => "AES256"
bucket => "wazuh-indexer-amazon-security-lake-bucket"
canned_acl => "bucket-owner-full-control"
codec => "json"
additional_settings => {
"force_path_style" => true
}
time_file => 5
}
}
3 changes: 2 additions & 1 deletion integrations/amazon-security-lake/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pyarrow>=10.0.1
parquet-tools>=0.2.15
pydantic==2.6.1
pydantic==2.6.1
boto3==1.34.46
26 changes: 0 additions & 26 deletions integrations/amazon-security-lake/run.py

This file was deleted.

1 change: 1 addition & 0 deletions integrations/amazon-security-lake/src/parquet/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import parquet.parquet
122 changes: 122 additions & 0 deletions integrations/amazon-security-lake/src/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/env/bin/python3
# vim: bkc=yes bk wb

import sys
import os
import datetime
import transform
import pyarrow as pa
import pyarrow.parquet as pq
import logging
import boto3
from botocore.exceptions import ClientError

# NOTE work in progress
def upload_file(table, file_name, bucket, object_name=None):
"""Upload a file to an S3 bucket
:param table: PyArrow table with events data
:param file_name: File to upload
:param bucket: Bucket to upload to
:param object_name: S3 object name. If not specified then file_name is used
:return: True if file was uploaded, else False
"""

client = boto3.client(
service_name='s3',
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region_name=os.environ['AWS_REGION'],
endpoint_url='http://s3.ninja:9000',
)

# If S3 object_name was not specified, use file_name
if object_name is None:
object_name = os.path.basename(file_name)

# Upload the file
try:
client.put_object(Bucket=bucket, Key=file_name, Body=open(file_name, 'rb'))
except ClientError as e:
logging.error(e)
return False
return True


def main():
'''Main function'''
# Get current timestamp
timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat()

# Generate filenames
filename_raw = f"/tmp/integrator-raw-{timestamp}.json"
filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json"
filename_parquet = f"/tmp/integrator-ocsf-{timestamp}.parquet"

# 1. Extract data
# ================
raw_data = []
for line in sys.stdin:
raw_data.append(line)

# Echo piped data
with open(filename_raw, "a") as fd:
fd.write(line)

# 2. Transform data
# ================
# a. Transform to OCSF
ocsf_data = []
for line in raw_data:
try:
event = transform.converter.from_json(line)
ocsf_event = transform.converter.to_detection_finding(event)
ocsf_data.append(ocsf_event.model_dump())

# Temporal disk storage
with open(filename_ocsf, "a") as fd:
fd.write(str(ocsf_event) + "\n")
except AttributeError as e:
print("Error transforming line to OCSF")
print(event)
print(e)

# b. Encode as Parquet
try:
table = pa.Table.from_pylist(ocsf_data)
pq.write_table(table, filename_parquet)
except AttributeError as e:
print("Error encoding data to parquet")
print(e)

# 3. Load data (upload to S3)
# ================
if upload_file(table, filename_parquet, os.environ['AWS_BUCKET']):
# Remove /tmp files
pass


def _test():
ocsf_event = {}
with open("./wazuh-event.sample.json", "r") as fd:
# Load from file descriptor
for raw_event in fd:
try:
event = transform.converter.from_json(raw_event)
print("")
print("-- Wazuh event Pydantic model")
print("")
print(event.model_dump())
ocsf_event = transform.converter.to_detection_finding(event)
print("")
print("-- Converted to OCSF")
print("")
print(ocsf_event.model_dump())

except KeyError as e:
raise (e)


if __name__ == '__main__':
main()
# _test()
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,16 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
)


def from_json(event: dict) -> models.wazuh.Event:
# def from_json(event: dict) -> models.wazuh.Event:
# # Needs to a string, bytes or bytearray
# try:
# return models.wazuh.Event.model_validate_json(json.dumps(event))
# except pydantic.ValidationError as e:
# print(e)

def from_json(event: str) -> models.wazuh.Event:
# Needs to a string, bytes or bytearray
try:
return models.wazuh.Event.model_validate_json(json.dumps(event))
return models.wazuh.Event.model_validate_json(event)
except pydantic.ValidationError as e:
print(e)
print(e)
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,27 @@


class Mitre(pydantic.BaseModel):
technique: typing.List[str] = []
id: typing.List[str] = ""
tactic: typing.List[str] = []
technique: typing.List[str] = ["N/A"]
id: typing.List[str] = ["N/A"]
tactic: typing.List[str] = ["N/A"]


class Rule(pydantic.BaseModel):
firedtimes: int = 0
description: str = ""
description: str = "N/A"
groups: typing.List[str] = []
id: str = ""
id: str = "N/A"
mitre: Mitre = Mitre()
level: int = 0
nist_800_53: typing.List[str] = []


class Decoder(pydantic.BaseModel):
name: str
name: str = "N/A"


class Input(pydantic.BaseModel):
type: str
type: str = "N/A"


class Agent(pydantic.BaseModel):
Expand All @@ -39,9 +39,9 @@ class Manager(pydantic.BaseModel):


class Event(pydantic.BaseModel):
rule: Rule = {}
decoder: Decoder = {}
input: Input = {}
rule: Rule = Rule()
decoder: Decoder = Decoder()
input: Input = Input()
id: str = ""
full_log: str = ""
agent: Agent = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cluster":{"name":"wazuh-cluster","node":"wazuh-manager"},"agent":{"id":"003","ip":"10.0.0.180","name":"ip-10-0-0-180.us-west-1.compute.internal"},"@timestamp":"2024-03-14T12:57:05.730Z","data":{"audit":{"exe":"/usr/sbin/sshd","type":"NORMAL","cwd":"/home/wazuh","file":{"name":"/var/sample"},"success":"yes","command":"ssh"}},"@version":"1","manager":{"name":"wazuh-manager"},"location":"","decoder":{},"id":"1580123327.49031","predecoder":{},"timestamp":"2024-03-14T12:57:05.730+0000","rule":{"description":"Audit: Command: /usr/sbin/ssh","firedtimes":3,"level":3,"id":"80791","mail":false,"groups":["audit","audit_command"],"gdpr":["IV_30.1.g"]}}
Loading

0 comments on commit 726f9b8

Please sign in to comment.