-
Notifications
You must be signed in to change notification settings - Fork 17
/
update_lambda.py
120 lines (104 loc) · 4.27 KB
/
update_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import json
import os
import urllib.request
import boto3
import cfnresponse
from netaddr import cidr_merge
def lambda_handler(event, context):
try:
# Get current region
session = boto3.session.Session()
current_region = session.region_name
client = boto3.client('iam')
print(f"Current region in {current_region}")
cidr_list = get_region_cidrs(current_region)
# Get the base policy and add IP list as a condition
new_policy = get_base_policy(os.getenv("prefix"))
new_policy["Statement"][0]["Condition"] = {"IpAddress": {"aws:SourceIp": cidr_list}}
# Clear out any pre-existing roles:
RoleName = os.getenv('iam_role_name')
response = client.list_role_policies(RoleName=RoleName)
if 'PolicyNames' in response:
for PolicyName in response['PolicyNames']:
print(f"Removing old Policy {PolicyName} from Role {RoleName}")
response = client.delete_role_policy(RoleName=RoleName, PolicyName=PolicyName)
# Put the new policy
response = client.put_role_policy(RoleName=RoleName, PolicyName=os.getenv('policy_name'),
PolicyDocument=json.dumps(new_policy))
# Check if response is coming from CloudFormation
if 'ResponseURL' in event:
print("Sending success message to callback URL {0}".format(event['ResponseURL']))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': "Good"})
return response
except Exception as e:
error_string = "There was a problem updating policy {0} for Role {1} in region {2}: {3}".format(
os.getenv('policy_name'), os.getenv('iam_role_name'), current_region, e)
print(error_string)
if 'ResponseURL' in event:
print("Sending FAILURE message to callback URL {0}".format(event['ResponseURL']))
cfnresponse.send(event, context, cfnresponse.FAILED, {'Data': error_string})
return False
def get_region_cidrs(current_region):
# Bandit complains with B310 on the line below. We know the URL, this is safe!
output = urllib.request.urlopen('https://ip-ranges.amazonaws.com/ip-ranges.json').read().decode('utf-8') # nosec
ip_ranges = json.loads(output)['prefixes']
in_region_amazon_ips = [item['ip_prefix'] for item in ip_ranges if
item["service"] == "AMAZON" and item["region"] == current_region]
# It's important to filter down the CIDR range as much as possible. Too large can cause the role creation to fail.
in_region_amazon_ips = [str(ip) for ip in cidr_merge(in_region_amazon_ips)]
# Add in Private IP Space
in_region_amazon_ips.append('10.0.0.0/8')
return in_region_amazon_ips
def get_base_policy(prefix):
vpcid = os.getenv('vpcid')
return {
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:GetObject",
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
f"arn:aws:s3:::{prefix}*/*",
f"arn:aws:s3:::{prefix}*"
],
"Effect": "Allow"
},
{
"Action": [
"s3:GetObject",
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
f"arn:aws:s3:::{prefix}*/*",
f"arn:aws:s3:::{prefix}*"
],
"Effect": "Allow",
"Condition": {
"StringEquals": {
"aws:SourceVpc": f"{vpcid}"
}
}
},
{
"Action": [
"s3:GetObject",
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
f"arn:aws:s3:::{prefix}*/*",
f"arn:aws:s3:::{prefix}*"
],
"Effect": "Allow",
"Condition": {
"StringLike": {
"aws:SourceVpc": "vpc-*"
}
}
}
]
}