-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
cwe.py
150 lines (122 loc) · 5.24 KB
/
cwe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# Copyright The Cloud Custodian Authors.
# SPDX-License-Identifier: Apache-2.0
from c7n.utils import jmespath_search, jmespath_compile
class CloudWatchEvents:
"""A mapping of events to resource types."""
# **These are just shortcuts**, you can use the policy definition to
# subscribe to any arbitrary cloud trail event that corresponds to
# a custodian resource.
# For common events that we want to match, just keep a short mapping.
# Users can specify arbitrary cloud watch events by specifying these
# values in their config, but keep the common case simple.
trail_events = {
# event source, resource type as keys, mapping to api call and
# jmespath expression
'ConsoleLogin': {
'ids': 'userIdentity.arn',
'source': 'signin.amazonaws.com'},
'CreateAutoScalingGroup': {
'ids': 'requestParameters.autoScalingGroupName',
'source': 'autoscaling.amazonaws.com'},
'UpdateAutoScalingGroup': {
'ids': 'requestParameters.autoScalingGroupName',
'source': 'autoscaling.amazonaws.com'},
'CreateBucket': {
'ids': 'requestParameters.bucketName',
'source': 's3.amazonaws.com'},
'CreateCluster': {
'ids': 'requestParameters.clusterIdentifier',
'source': 'redshift.amazonaws.com'},
'CreateLoadBalancer': {
'ids': 'requestParameters.loadBalancerName',
'source': 'elasticloadbalancing.amazonaws.com'},
'CreateLoadBalancerPolicy': {
'ids': 'requestParameters.loadBalancerName',
'source': 'elasticloadbalancing.amazonaws.com'},
'CreateDBInstance': {
'ids': 'requestParameters.dBInstanceIdentifier',
'source': 'rds.amazonaws.com'},
'CreateVolume': {
'ids': 'responseElements.volumeId',
'source': 'ec2.amazonaws.com'},
'SetLoadBalancerPoliciesOfListener': {
'ids': 'requestParameters.loadBalancerName',
'source': 'elasticloadbalancing.amazonaws.com'},
'CreateElasticsearchDomain': {
'ids': 'requestParameters.domainName',
'source': 'es.amazonaws.com'},
'CreateTable': {
'ids': 'requestParameters.tableName',
'source': 'dynamodb.amazonaws.com'},
'CreateFunction': {
'event': 'CreateFunction20150331',
'source': 'lambda.amazonaws.com',
'ids': 'requestParameters.functionName'},
'RunInstances': {
'ids': 'responseElements.instancesSet.items[].instanceId',
'source': 'ec2.amazonaws.com'}}
@classmethod
def get(cls, event_name):
return cls.trail_events.get(event_name)
@classmethod
def match(cls, event):
"""Match a given cwe event as cloudtrail with an api call
That has its information filled out.
"""
if 'detail' not in event:
return False
if 'eventName' not in event['detail']:
return False
k = event['detail']['eventName']
# We want callers to use a compiled expression, but want to avoid
# initialization cost of doing it without cause. Not thread safe,
# but usage context is lambda entry.
if k in cls.trail_events:
v = dict(cls.trail_events[k])
if isinstance(v['ids'], str):
v['ids'] = e = jmespath_compile('detail.%s' % v['ids'])
cls.trail_events[k]['ids'] = e
return v
return False
@classmethod
def get_trail_ids(cls, event, mode):
"""extract resources ids from a cloud trail event."""
resource_ids = ()
event_name = event['detail']['eventName']
event_source = event['detail']['eventSource']
for e in mode.get('events', []):
if not isinstance(e, dict):
# Check if we have a short cut / alias
info = CloudWatchEvents.match(event)
if info:
return info['ids'].search(event)
continue
if event_name != e.get('event'):
continue
if event_source != e.get('source'):
continue
id_query = e.get('ids')
if not id_query:
raise ValueError("No id query configured")
evt = event
# be forgiving for users specifying with details or without
if not id_query.startswith('detail.'):
evt = event.get('detail', {})
resource_ids = jmespath_search(id_query, evt)
if resource_ids:
break
return resource_ids
@classmethod
def get_ids(cls, event, mode):
mode_type = mode.get('type')
if mode_type == 'ec2-instance-state':
resource_ids = [event.get('detail', {}).get('instance-id')]
elif mode_type == 'asg-instance-state':
resource_ids = [event.get('detail', {}).get('AutoScalingGroupName')]
elif mode_type != 'cloudtrail':
return None
else:
resource_ids = cls.get_trail_ids(event, mode)
if not isinstance(resource_ids, (tuple, list)):
resource_ids = [resource_ids]
return list(filter(None, resource_ids))