Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AwsCredentials should not call metadata server if security creds and region are retrievable through the environment variables #1195

Merged
merged 8 commits into from
Dec 7, 2022
27 changes: 25 additions & 2 deletions google/auth/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,12 @@ def retrieve_subject_token(self, request):
Returns:
str: The retrieved subject token.
"""
# Fetch the session token required to make meta data endpoint calls to aws
if request is not None and self._imdsv2_session_token_url is not None:
# Fetch the session token required to make meta data endpoint calls to aws.
if (
request is not None
and self._imdsv2_session_token_url is not None
and self._should_use_metadata_server()
):
headers = {"X-aws-ec2-metadata-token-ttl-seconds": "300"}

imdsv2_session_token_response = request(
Expand Down Expand Up @@ -738,6 +742,25 @@ def _get_metadata_role_name(self, request, imdsv2_session_token):

return response_body

def _should_use_metadata_server(self):
# The AWS region can be provided through AWS_REGION or AWS_DEFAULT_REGION.
# The metadata server should be used if it cannot be retrieved from one of
# these environment variables.
if not os.environ.get(environment_vars.AWS_REGION) and not os.environ.get(
environment_vars.AWS_DEFAULT_REGION
):
return True

# AWS security credentials can be retrieved from the AWS_ACCESS_KEY_ID
# and AWS_SECRET_ACCESS_KEY environment variables. The metadata server
# should be used if either of these are not available.
if not os.environ.get(environment_vars.AWS_ACCESS_KEY_ID) or not os.environ.get(
environment_vars.AWS_SECRET_ACCESS_KEY
):
return True

return False

@classmethod
def from_info(cls, info, **kwargs):
"""Creates an AWS Credentials instance from parsed external account info.
Expand Down
208 changes: 207 additions & 1 deletion tests/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datetime
import json
import os

import mock
import pytest # type: ignore
Expand Down Expand Up @@ -1224,6 +1225,7 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(os.environ, {})
def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(
self, utcnow
):
Expand Down Expand Up @@ -1300,7 +1302,7 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(

# Only 3 requests should be sent as the region is cached.
assert len(new_request.call_args_list) == 3
# Assert session token request
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
Expand All @@ -1323,6 +1325,210 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ,
{
environment_vars.AWS_REGION: AWS_REGION,
environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
},
)
def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_secret_access_key_idmsv2(
self, utcnow
):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
imdsv2_session_token_status=http_client.OK,
imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
{
"access_key_id": ACCESS_KEY_ID,
"secret_access_key": SECRET_ACCESS_KEY,
"security_token": TOKEN,
}
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
{"X-aws-ec2-metadata-token-ttl-seconds": "300"},
"PUT",
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
SECURITY_CREDS_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{
"Content-Type": "application/json",
"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ,
{
environment_vars.AWS_REGION: AWS_REGION,
environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
},
)
def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_access_key_id_idmsv2(
self, utcnow
):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
imdsv2_session_token_status=http_client.OK,
imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
{
"access_key_id": ACCESS_KEY_ID,
"secret_access_key": SECRET_ACCESS_KEY,
"security_token": TOKEN,
}
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
{"X-aws-ec2-metadata-token-ttl-seconds": "300"},
"PUT",
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
SECURITY_CREDS_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{
"Content-Type": "application/json",
"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(os.environ, {environment_vars.AWS_REGION: AWS_REGION})
def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_creds_idmsv2(
lsirac marked this conversation as resolved.
Show resolved Hide resolved
self, utcnow
):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
imdsv2_session_token_status=http_client.OK,
imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

subject_token = credentials.retrieve_subject_token(request)
assert subject_token == self.make_serialized_aws_signed_request(
{
"access_key_id": ACCESS_KEY_ID,
"secret_access_key": SECRET_ACCESS_KEY,
"security_token": TOKEN,
}
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL,
{"X-aws-ec2-metadata-token-ttl-seconds": "300"},
"PUT",
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
SECURITY_CREDS_URL,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{
"Content-Type": "application/json",
"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
},
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ,
{
environment_vars.AWS_REGION: AWS_REGION,
environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID,
environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY,
},
)
def test_retrieve_subject_token_success_temp_creds_idmsv2(self, utcnow):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
BigTailWolf marked this conversation as resolved.
Show resolved Hide resolved
role_status=http_client.OK, role_name=self.AWS_ROLE
)
credential_source_token_url = self.CREDENTIAL_SOURCE.copy()
credential_source_token_url[
"imdsv2_session_token_url"
] = IMDSV2_SESSION_TOKEN_URL
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

credentials.retrieve_subject_token(request)
assert not request.called

BigTailWolf marked this conversation as resolved.
Show resolved Hide resolved
def test_validate_metadata_server_url_if_any(self):
aws.Credentials.validate_metadata_server_url_if_any(
"http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone", "url"
Expand Down