-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlambda_function.py
108 lines (90 loc) · 2.89 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import json
import logging
import os
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# The log level
log_level = os.environ['log_level'].upper()
alert_level = os.environ['alert_level']
high_hook_url = os.environ['high_hook_url']
medium_hook_url = os.environ['medium_hook_url']
low_hook_url = os.environ['low_hook_url']
high_channel = os.environ['high_channel']
medium_channel = os.environ['medium_channel']
low_channel = os.environ['low_channel']
logger = logging.getLogger()
logger.setLevel(log_level)
def push_slack(slack_request):
hook_url = slack_request['hook_url']
slack_message =slack_request['msg']
req = Request(hook_url, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted to %s", slack_message['channel'])
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
def check_event(detail):
slack_request = {}
event_name = detail['eventName']
login_type = detail['userIdentity']['type']
arn = detail['userIdentity']['arn']
parameters = detail['requestParameters']
bucket_name = parameters['bucketName']
if alert_level == "High":
hook_url = high_hook_url
channel = high_channel
color = "#8b0000"
elif alert_level == "Medium":
hook_url = medium_hook_url
channel = medium_channel
color = "#ff8c00"
elif alert_level == "Low":
hook_url = low_hook_url
channel = low_channel
color = "#fafad2"
slack_request = {
"hook_url" : hook_url,
"msg" : {
"channel" : channel,
"username" : "AWS:S3 Bucket alert",
"text" : "*Action: %s*" % (event_name),
"attachments": [
{
"color": color,
"fields": [
{
"title": "User ARN",
"value": arn,
"short": True
},
{
"title": "Bucket name",
"value": bucket_name,
"short": True
},
{
"title": "Detail parameters",
"value": str(parameters)
}
],
}
]
}
}
return slack_request
def lambda_handler(event, context):
logger.info("Event: " + str(event))
detail = event['detail']
logger.info("Detail: " + str(detail))
slack_request = check_event(detail)
if slack_request == None:
pass
else:
push_slack(slack_request)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}