Skip to content

Commit

Permalink
[Sekoia/Crowdstrike/Mandiant/AlienVault/RecordedFuture/CisaKEV] Modif…
Browse files Browse the repository at this point in the history
…ication on connector to use the new pycti connector helper scheduler (#2459)
  • Loading branch information
helene-nguyen authored Sep 10, 2024
1 parent 787ba95 commit 78abfef
Show file tree
Hide file tree
Showing 39 changed files with 791 additions and 779 deletions.
60 changes: 40 additions & 20 deletions external-import/alienvault/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,43 @@ integrated to the global `docker-compose.yml` file of OpenCTI.
If you are using it independently, remember that the connector will try to connect to
the RabbitMQ on the port configured in the OpenCTI platform.

## Configuration

The connector can be configured with the following variables:

| Config Parameter | Docker env var | Default | Description |
|------------------------------------|-----------------------------------------------|------------------------------|---------------------------------------------------------------------------------------------------------|
| `base_url` | `ALIENVAULT_BASE_URL` | `https://otx.alienvault.com` | The base URL for the OTX DirectConnect API. |
| `api_key` | `ALIENVAULT_API_KEY` | `ChangeMe` | The OTX Key. |
| `tlp` | `ALIENVAULT_TLP` | `White` | The default TLP marking used if the Pulse does not define TLP. |
| `create_observables` | `ALIENVAULT_CREATE_OBSERVABLES` | `true` | If true then observables will be created from Pulse indicators and added to the report. |
| `create_indicators` | `ALIENVAULT_CREATE_INDICATORS` | `true` | If true then indicators will be created from Pulse indicators and added to the report. |
| `pulse_start_timestamp` | `ALIENVAULT_PULSE_START_TIMESTAMP` | `2020-05-01T00:00:00` | The Pulses modified after this timestamp will be imported. Timestamp in ISO 8601 format, UTC. |
| `report_status` | `ALIENVAULT_REPORT_STATUS` | `New` | The status of imported reports in the OpenCTI. |
| `report_type` | `ALIENVAULT_REPORT_TYPE` | `threat-report` | The type of imported reports in the OpenCTI. |
| `guess_malware` | `ALIENVAULT_GUESS_MALWARE` | `false` | The Pulse tags are used to guess (queries malwares in the OpenCTI) malwares related to the given Pulse. |
| `guess_cve` | `ALIENVAULT_GUESS_CVE` | `false` | The Pulse tags are used to guess (checks whether tag matches (CVE-\d{4}-\d{4,7})) vulnerabilities. |
| `excluded_pulse_indicator_types` | `ALIENVAULT_EXCLUDED_PULSE_INDICATOR_TYPES` | `FileHash-MD5,FileHash-SHA1` | The Pulse indicator types that will be excluded from the import. |
| `enable_relationships` | `ALIENVAULT_ENABLE_RELATIONSHIPS` | `true` | If true then the relationships will be created between SDOs. |
| `enable_attack_patterns_indicates` | `ALIENVAULT_ENABLE_ATTACK_PATTERNS_INDICATES` | `true` | If true then the relationships `indicates` will be created between indicators and attack patterns. |
| `interval_sec` | `ALIENVAULT_INTERVAL_SEC` | `1800` | The import interval in seconds. |
### Configuration variables

Below are the parameters you'll need to set for OpenCTI:

| Parameter `OpenCTI` | config.yml | Docker environment variable | Mandatory | Description |
|---------------------|------------|-----------------------------|-----------|------------------------------------------------------|
| URL | url | `OPENCTI_URL` | Yes | The URL of the OpenCTI platform. |
| Token | token | `OPENCTI_TOKEN` | Yes | The default admin token set in the OpenCTI platform. |

Below are the parameters you'll need to set for running the connector properly:

| Parameter `Connector` | config.yml | Docker environment variable | Default | Mandatory | Description |
|-----------------------|---------------------|-------------------------------|--------------|-----------|--------------------------------------------------------------------------------------------------|
| ID | `id` | `CONNECTOR_ID` | / | Yes | A unique `UUIDv4` identifier for this connector instance. |
| Name | `name` | `CONNECTOR_NAME` | `AlienVault` | Yes | Full name of the connector : `AlienVault`. |
| Scope | `scope` | `CONNECTOR_SCOPE` | `alienvault` | Yes | Must be `alienvault`, not used in this connector. |
| Run and Terminate | `run_and_terminate` | `CONNECTOR_RUN_AND_TERMINATE` | `False` | No | Launch the connector once if set to True. Takes 2 available values: `True` or `False`. |
| Duration Period | `duration_period` | `CONNECTOR_DURATION_PERIOD` | / | Yes | Determines the time interval between each launch of the connector in ISO 8601, ex: `PT30M`. |
| Queue Threshold | `queue_threshold` | `CONNECTOR_QUEUE_THRESHOLD` | `500` | No | Used to determine the limit (RabbitMQ) in MB at which the connector must go into buffering mode. |
| Log Level | `log_level` | `CONNECTOR_LOG_LEVEL` | / | Yes | Determines the verbosity of the logs. Options are `debug`, `info`, `warn`, or `error`. |

Below are the parameters you'll need to set for AlienVault connector:

| Parameter `AlienVault` | config.yml | Docker environment variable | Default | Mandatory | Description |
|----------------------------------|------------------------------------|-----------------------------------------------|-------------------------------|-----------|--------------------------------------------------------------------------------------------------------------------------------|
| Base Url | `base_url` | `ALIENVAULT_BASE_URL` | `https://otx.alienvault.com` | Yes | The base URL for the OTX DirectConnect API. |
| Api Key | `api_key` | `ALIENVAULT_API_KEY` | `ChangeMe` | No | The OTX Key. |
| TLP | `tlp` | `ALIENVAULT_TLP` | `White` | Yes | The default TLP marking used if the Pulse does not define TLP. |
| Create Observables | `create_observables` | `ALIENVAULT_CREATE_OBSERVABLES` | `True` | No | If true then observables will be created from Pulse indicators and added to the report. |
| Create Indicators | `create_indicators` | `ALIENVAULT_CREATE_INDICATORS` | `True` | No | If true then indicators will be created from Pulse indicators and added to the report. |
| Pulse Start Timestamp | `pulse_start_timestamp` | `ALIENVAULT_PULSE_START_TIMESTAMP` | `2020-05-01T00:00:00` | Yes | The Pulses modified after this timestamp will be imported. Timestamp in ISO 8601 format, UTC. |
| Report Status | `report_status` | `ALIENVAULT_REPORT_STATUS` | `New` | Yes | The status of imported reports in the OpenCTI. |
| Report Type | `report_type` | `ALIENVAULT_REPORT_TYPE` | `threat-report` | No | The type of imported reports in the OpenCTI. |
| Guess Malware | `guess_malware` | `ALIENVAULT_GUESS_MALWARE` | `False` | Yes | The Pulse tags are used to guess (queries malwares in the OpenCTI) malwares related to the given Pulse. |
| Guess CVE | `guess_cve` | `ALIENVAULT_GUESS_CVE` | `False` | Yes | The Pulse tags are used to guess (checks whether tag matches (CVE-\d{4}-\d{4,7})) vulnerabilities. |
| Excluded Pulse Indicator Types | `excluded_pulse_indicator_types` | `ALIENVAULT_EXCLUDED_PULSE_INDICATOR_TYPES` | `FileHash-MD5,FileHash-SHA1` | Yes | The Pulse indicator types that will be excluded from the import. |
| Enable Relationships | `enable_relationships` | `ALIENVAULT_ENABLE_RELATIONSHIPS` | `True` | No | If true then the relationships will be created between SDOs. |
| Enable Attack Patterns Indicates | `enable_attack_patterns_indicates` | `ALIENVAULT_ENABLE_ATTACK_PATTERNS_INDICATES` | `True` | No | If true then the relationships `indicates` will be created between indicators and attack patterns. |
| Filter Indicators | `filter_indicators` | `ALIENVAULT_FILTER_INDICATORS` | `True` | No | This boolean filters out indicators created before the latest pulse datetime, ensuring only recent indicators are processed. |

2 changes: 1 addition & 1 deletion external-import/alienvault/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
- CONNECTOR_NAME=AlienVault
- CONNECTOR_SCOPE=alienvault
- CONNECTOR_LOG_LEVEL=error
- CONNECTOR_DURATION_PERIOD=PT30M # In ISO8601 Format starting with "P" for Period ex: "PT30M" = Period time of 30 minutes
- ALIENVAULT_BASE_URL=https://otx.alienvault.com
- ALIENVAULT_API_KEY=ChangeMe
- ALIENVAULT_TLP=White
Expand All @@ -22,5 +23,4 @@ services:
- ALIENVAULT_EXCLUDED_PULSE_INDICATOR_TYPES=FileHash-MD5,FileHash-SHA1 # Excluded Pulse indicator types.
- ALIENVAULT_ENABLE_RELATIONSHIPS=true # Enable/Disable relationship creation between SDOs.
- ALIENVAULT_ENABLE_ATTACK_PATTERNS_INDICATES=false # Enable/Disable "indicates" relationships between indicators and attack patterns
- ALIENVAULT_INTERVAL_SEC=1800
restart: always
143 changes: 62 additions & 81 deletions external-import/alienvault/src/alienvault/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
class AlienVault:
"""AlienVault connector."""

_CONFIG_CONNECTOR_NAMESPACE = "connector"
_CONFIG_NAMESPACE = "alienvault"

_CONFIG_DURATION_PERIOD = f"{_CONFIG_CONNECTOR_NAMESPACE}.duration_period"
_CONFIG_INTERVAL_SEC = f"{_CONFIG_NAMESPACE}.interval_sec"
_CONFIG_BASE_URL = f"{_CONFIG_NAMESPACE}.base_url"
_CONFIG_API_KEY = f"{_CONFIG_NAMESPACE}.api_key"
_CONFIG_TLP = f"{_CONFIG_NAMESPACE}.tlp"
Expand All @@ -45,9 +48,6 @@ class AlienVault:
f"{_CONFIG_NAMESPACE}.enable_attack_patterns_indicates"
)
_CONFIG_FILTER_INDICATORS = f"{_CONFIG_NAMESPACE}.filter_indicators"
_CONFIG_INTERVAL_SEC = f"{_CONFIG_NAMESPACE}.interval_sec"

_CONFIG_UPDATE_EXISTING_DATA = "connector.update_existing_data"

_CONFIG_REPORT_STATUS_MAPPING = {
"new": 0,
Expand All @@ -62,8 +62,7 @@ class AlienVault:
_DEFAULT_REPORT_TYPE = "threat-report"
_DEFAULT_ENABLE_RELATIONSHIPS = True
_DEFAULT_ENABLE_ATTACK_PATTERNS_INDICATES = True

_CONNECTOR_RUN_INTERVAL_SEC = 60
_DEFAULT_INTERVAL_SEC = 1800

_STATE_LAST_RUN = "last_run"

Expand Down Expand Up @@ -149,17 +148,21 @@ def __init__(self) -> None:
else:
enable_attack_patterns_indicates = bool(enable_attack_patterns_indicates)

self.interval_sec = self._get_configuration(
config, self._CONFIG_INTERVAL_SEC, is_number=True
)

update_existing_data = bool(
self._get_configuration(config, self._CONFIG_UPDATE_EXISTING_DATA)
)

# Create OpenCTI connector helper
self.helper = OpenCTIConnectorHelper(config)

# Get OpenCTI connector duration period
self.duration_period = self._get_configuration(
config, self._CONFIG_DURATION_PERIOD
)
self.interval_sec = self._get_configuration(
config,
self._CONFIG_INTERVAL_SEC,
is_number=True,
)
if not self.interval_sec:
self.interval_sec = self._DEFAULT_INTERVAL_SEC

# Create AlienVault author
author = self._create_author()

Expand All @@ -174,7 +177,6 @@ def __init__(self) -> None:
tlp_marking=tlp_marking,
create_observables=create_observables,
create_indicators=create_indicators,
update_existing_data=update_existing_data,
default_latest_timestamp=default_latest_pulse_timestamp,
report_status=report_status,
report_type=report_type,
Expand Down Expand Up @@ -231,66 +233,55 @@ def _convert_report_status_str_to_report_status_int(cls, report_status: str) ->
return cls._CONFIG_REPORT_STATUS_MAPPING[report_status.lower()]

def run(self):

if self.duration_period:
self.helper.schedule_iso(
message_callback=self.process_message,
duration_period=self.duration_period,
)
else:
self.helper.schedule_unit(
message_callback=self.process_message,
duration_period=self.interval_sec,
time_unit=self.helper.TimeUnit.SECONDS,
)

def process_message(self):
"""Run AlienVault connector."""
self._info("Starting AlienVault connector...")
while True:
self._info("Running AlienVault connector...")
run_interval = self._CONNECTOR_RUN_INTERVAL_SEC

try:
timestamp = self._current_unix_timestamp()
current_state = self._load_state()

self._info("Loaded state: {0}", current_state)

last_run = self._get_state_value(current_state, self._STATE_LAST_RUN)
if self._is_scheduled(last_run, timestamp):
now = datetime.datetime.utcfromtimestamp(timestamp)
friendly_name = "AlienVault run @ " + now.strftime(
"%Y-%m-%d %H:%M:%S"
)
work_id = self.helper.api.work.initiate_work(
self.helper.connect_id, friendly_name
)
pulse_import_state = self.pulse_importer.run(current_state, work_id)
new_state = current_state.copy()
new_state.update(pulse_import_state)
new_state[self._STATE_LAST_RUN] = self._current_unix_timestamp()

self._info("Storing new state: {0}", new_state)
self.helper.set_state(new_state)
message = (
"State stored, next run in: "
+ str(self._get_interval())
+ " seconds"
)
self.helper.api.work.to_processed(work_id, message)
self._info(message)
else:
next_run = self._get_interval() - (timestamp - last_run)
run_interval = min(run_interval, next_run)

self._info(
"Connector will not run, next run in: {0} seconds", next_run
)

except (KeyboardInterrupt, SystemExit):
self._info("Connector stop")
sys.exit(0)

if self.helper.connect_run_and_terminate:
self.helper.log_info("Connector stop")
self.helper.force_ping()
sys.exit(0)

self._sleep(delay_sec=run_interval)

@classmethod
def _sleep(cls, delay_sec: Optional[int] = None) -> None:
sleep_delay = (
delay_sec if delay_sec is not None else cls._CONNECTOR_RUN_INTERVAL_SEC
)
time.sleep(sleep_delay)
try:
timestamp = self._current_unix_timestamp()
current_state = self._load_state()

self._info("Loaded state: {0}", current_state)

now = datetime.datetime.utcfromtimestamp(timestamp)
friendly_name = "AlienVault run @ " + now.strftime("%Y-%m-%d %H:%M:%S")
work_id = self.helper.api.work.initiate_work(
self.helper.connect_id, friendly_name
)
pulse_import_state = self.pulse_importer.run(current_state, work_id)
new_state = current_state.copy()
new_state.update(pulse_import_state)
new_state[self._STATE_LAST_RUN] = self._current_unix_timestamp()

self._info("Storing new state: {0}", new_state)
self.helper.set_state(new_state)

message = (
f"{self.helper.connect_name} connector successfully run, storing last_run as "
+ str(timestamp)
)
self.helper.api.work.to_processed(work_id, message)
self._info(message)

except (KeyboardInterrupt, SystemExit):
self._info("Connector stopping...")
sys.exit(0)

except Exception as e: # noqa: B902
self._error("AlienVault connector internal error: {0}", str(e))

@staticmethod
def _current_unix_timestamp() -> int:
Expand All @@ -310,16 +301,6 @@ def _get_state_value(
return state.get(key, default)
return default

def _is_scheduled(self, last_run: Optional[int], current_time: int) -> bool:
if last_run is None:
self._info("Connector first run")
return True
time_diff = current_time - last_run
return time_diff >= self._get_interval()

def _get_interval(self) -> int:
return int(self.interval_sec)

def _info(self, msg: str, *args: Any) -> None:
fmt_msg = msg.format(*args)
self.helper.log_info(fmt_msg)
Expand Down
Loading

0 comments on commit 78abfef

Please sign in to comment.