forked from phantomcyber/playbooks
-
Notifications
You must be signed in to change notification settings - Fork 0
/
customer_firewall_request_parse_csv.py
98 lines (80 loc) · 4.6 KB
/
customer_firewall_request_parse_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
This playbook parses a .csv file with a list of firewall changes. The .csv file must contain an "action" column (with values equal to either "block_ip" or "unblock_ip") and either a "sourceAddress" column or a "destinationAddress" column. Once the .csv file is parsed this playbook will create one new artifact per row. The artifacts will have the label "customer_request" which can then be used in a subsequent playbook to take appropriate block or unblock actions.
"""
import phantom.rules as phantom
import json
from datetime import datetime, timedelta
def on_start(container):
phantom.debug('on_start() called')
# call 'filter_1' block
filter_1(container=container)
return
"""
The .csv file must be in the Vault and there must be an artifact with the cef.vaultId field.
"""
def filter_1(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('filter_1() called')
# collect filtered artifact ids for 'if' condition 1
matched_artifacts_1, matched_results_1 = phantom.condition(
container=container,
conditions=[
["artifact:*.cef.vaultId", "!=", ""],
],
name="filter_1:condition_1")
# call connected blocks if filtered artifacts or results
if matched_artifacts_1 or matched_results_1:
playbook_local_soc_fork_customer_request_1(action=action, success=success, container=container, results=results, handle=handle, filtered_artifacts=matched_artifacts_1, filtered_results=matched_results_1)
return
"""
Use custom code and the "csv" library to parse each row of the .csv file and create an artifact for each valid row.
"""
def playbook_local_soc_fork_customer_request_1(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('playbook_local_soc_fork_customer_request_1() called')
# ----- start of added code -----
import csv
# get container id
container_id = container.get('id', None)
# use the container id to get information about any files in the vault
vault_info = phantom.vault_info(container_id=container_id)
# filter info returned to find the path where the file is stored in the vault
file_path = vault_info[2][0]["path"]
phantom.debug('vault file path: {}'.format(file_path))
# read the .csv file, file and add artifacts with the label "customer_request" to container
raw_data = {}
reader = None
try:
with open(file_path, 'r') as f:
reader = csv.DictReader(f)
for cef_data in reader:
cef_data_keys = cef_data.keys()
if 'action' in cef_data_keys and ('sourceAddress' in cef_data_keys or 'destinationAddress' in cef_data_keys):
phantom.debug('adding artifact: {}'.format(cef_data))
success, message, artifact_id = phantom.add_artifact(container=container,
raw_data=raw_data,
cef_data=cef_data,
label='customer_request',
name='Parsed CSV Artifact',
severity='high',
identifier=None,
artifact_type='network')
if not success:
phantom.error("Adding Artifact failed: {}".format(message))
except Exception as e:
phantom.error("Exception Occurred: {}".format(e.args[1]))
return
# ----- end of added code -----
# call playbook "local/soc_fork_customer_request", returns the playbook_run_id
playbook_run_id = phantom.playbook("local/soc_fork_customer_request", container)
return
def on_finish(container, summary):
phantom.debug('on_finish() called')
# This function is called after all actions are completed.
# summary of all the action and/or all detals of actions
# can be collected here.
# summary_json = phantom.get_summary()
# if 'result' in summary_json:
# for action_result in summary_json['result']:
# if 'action_run_id' in action_result:
# action_results = phantom.get_action_results(action_run_id=action_result['action_run_id'], result_data=False, flatten=False)
# phantom.debug(action_results)
return