diff --git a/google/auth/aws.py b/google/auth/aws.py index d3038b281..f651433f0 100644 --- a/google/auth/aws.py +++ b/google/auth/aws.py @@ -466,8 +466,12 @@ def retrieve_subject_token(self, request): Returns: str: The retrieved subject token. """ - # Fetch the session token required to make meta data endpoint calls to aws - if request is not None and self._imdsv2_session_token_url is not None: + # Fetch the session token required to make meta data endpoint calls to aws. + if ( + request is not None + and self._imdsv2_session_token_url is not None + and self._should_use_metadata_server() + ): headers = {"X-aws-ec2-metadata-token-ttl-seconds": "300"} imdsv2_session_token_response = request( @@ -738,6 +742,25 @@ def _get_metadata_role_name(self, request, imdsv2_session_token): return response_body + def _should_use_metadata_server(self): + # The AWS region can be provided through AWS_REGION or AWS_DEFAULT_REGION. + # The metadata server should be used if it cannot be retrieved from one of + # these environment variables. + if not os.environ.get(environment_vars.AWS_REGION) and not os.environ.get( + environment_vars.AWS_DEFAULT_REGION + ): + return True + + # AWS security credentials can be retrieved from the AWS_ACCESS_KEY_ID + # and AWS_SECRET_ACCESS_KEY environment variables. The metadata server + # should be used if either of these are not available. + if not os.environ.get(environment_vars.AWS_ACCESS_KEY_ID) or not os.environ.get( + environment_vars.AWS_SECRET_ACCESS_KEY + ): + return True + + return False + @classmethod def from_info(cls, info, **kwargs): """Creates an AWS Credentials instance from parsed external account info. diff --git a/tests/test_aws.py b/tests/test_aws.py index d059487f4..400412660 100644 --- a/tests/test_aws.py +++ b/tests/test_aws.py @@ -14,6 +14,7 @@ import datetime import json +import os import mock import pytest # type: ignore @@ -1224,6 +1225,7 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars( ) @mock.patch("google.auth._helpers.utcnow") + @mock.patch.dict(os.environ, {}) def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2( self, utcnow ): @@ -1300,7 +1302,7 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2( # Only 3 requests should be sent as the region is cached. assert len(new_request.call_args_list) == 3 - # Assert session token request + # Assert session token request. self.assert_aws_metadata_request_kwargs( request.call_args_list[0][1], IMDSV2_SESSION_TOKEN_URL, @@ -1323,6 +1325,210 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2( }, ) + @mock.patch("google.auth._helpers.utcnow") + @mock.patch.dict( + os.environ, + { + environment_vars.AWS_REGION: AWS_REGION, + environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID, + }, + ) + def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_secret_access_key_idmsv2( + self, utcnow + ): + utcnow.return_value = datetime.datetime.strptime( + self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" + ) + request = self.make_mock_request( + role_status=http_client.OK, + role_name=self.AWS_ROLE, + security_credentials_status=http_client.OK, + security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, + imdsv2_session_token_status=http_client.OK, + imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN, + ) + credential_source_token_url = self.CREDENTIAL_SOURCE.copy() + credential_source_token_url[ + "imdsv2_session_token_url" + ] = IMDSV2_SESSION_TOKEN_URL + credentials = self.make_credentials( + credential_source=credential_source_token_url + ) + + subject_token = credentials.retrieve_subject_token(request) + assert subject_token == self.make_serialized_aws_signed_request( + { + "access_key_id": ACCESS_KEY_ID, + "secret_access_key": SECRET_ACCESS_KEY, + "security_token": TOKEN, + } + ) + # Assert session token request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[0][1], + IMDSV2_SESSION_TOKEN_URL, + {"X-aws-ec2-metadata-token-ttl-seconds": "300"}, + "PUT", + ) + # Assert role request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[1][1], + SECURITY_CREDS_URL, + {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN}, + ) + # Assert security credentials request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[2][1], + "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE), + { + "Content-Type": "application/json", + "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN, + }, + ) + + @mock.patch("google.auth._helpers.utcnow") + @mock.patch.dict( + os.environ, + { + environment_vars.AWS_REGION: AWS_REGION, + environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY, + }, + ) + def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_access_key_id_idmsv2( + self, utcnow + ): + utcnow.return_value = datetime.datetime.strptime( + self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" + ) + request = self.make_mock_request( + role_status=http_client.OK, + role_name=self.AWS_ROLE, + security_credentials_status=http_client.OK, + security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, + imdsv2_session_token_status=http_client.OK, + imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN, + ) + credential_source_token_url = self.CREDENTIAL_SOURCE.copy() + credential_source_token_url[ + "imdsv2_session_token_url" + ] = IMDSV2_SESSION_TOKEN_URL + credentials = self.make_credentials( + credential_source=credential_source_token_url + ) + + subject_token = credentials.retrieve_subject_token(request) + assert subject_token == self.make_serialized_aws_signed_request( + { + "access_key_id": ACCESS_KEY_ID, + "secret_access_key": SECRET_ACCESS_KEY, + "security_token": TOKEN, + } + ) + # Assert session token request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[0][1], + IMDSV2_SESSION_TOKEN_URL, + {"X-aws-ec2-metadata-token-ttl-seconds": "300"}, + "PUT", + ) + # Assert role request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[1][1], + SECURITY_CREDS_URL, + {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN}, + ) + # Assert security credentials request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[2][1], + "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE), + { + "Content-Type": "application/json", + "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN, + }, + ) + + @mock.patch("google.auth._helpers.utcnow") + @mock.patch.dict(os.environ, {environment_vars.AWS_REGION: AWS_REGION}) + def test_retrieve_subject_token_success_temp_creds_environment_vars_missing_creds_idmsv2( + self, utcnow + ): + utcnow.return_value = datetime.datetime.strptime( + self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" + ) + request = self.make_mock_request( + role_status=http_client.OK, + role_name=self.AWS_ROLE, + security_credentials_status=http_client.OK, + security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE, + imdsv2_session_token_status=http_client.OK, + imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN, + ) + credential_source_token_url = self.CREDENTIAL_SOURCE.copy() + credential_source_token_url[ + "imdsv2_session_token_url" + ] = IMDSV2_SESSION_TOKEN_URL + credentials = self.make_credentials( + credential_source=credential_source_token_url + ) + + subject_token = credentials.retrieve_subject_token(request) + assert subject_token == self.make_serialized_aws_signed_request( + { + "access_key_id": ACCESS_KEY_ID, + "secret_access_key": SECRET_ACCESS_KEY, + "security_token": TOKEN, + } + ) + # Assert session token request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[0][1], + IMDSV2_SESSION_TOKEN_URL, + {"X-aws-ec2-metadata-token-ttl-seconds": "300"}, + "PUT", + ) + # Assert role request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[1][1], + SECURITY_CREDS_URL, + {"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN}, + ) + # Assert security credentials request. + self.assert_aws_metadata_request_kwargs( + request.call_args_list[2][1], + "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE), + { + "Content-Type": "application/json", + "X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN, + }, + ) + + @mock.patch("google.auth._helpers.utcnow") + @mock.patch.dict( + os.environ, + { + environment_vars.AWS_REGION: AWS_REGION, + environment_vars.AWS_ACCESS_KEY_ID: ACCESS_KEY_ID, + environment_vars.AWS_SECRET_ACCESS_KEY: SECRET_ACCESS_KEY, + }, + ) + def test_retrieve_subject_token_success_temp_creds_idmsv2(self, utcnow): + utcnow.return_value = datetime.datetime.strptime( + self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ" + ) + request = self.make_mock_request( + role_status=http_client.OK, role_name=self.AWS_ROLE + ) + credential_source_token_url = self.CREDENTIAL_SOURCE.copy() + credential_source_token_url[ + "imdsv2_session_token_url" + ] = IMDSV2_SESSION_TOKEN_URL + credentials = self.make_credentials( + credential_source=credential_source_token_url + ) + + credentials.retrieve_subject_token(request) + assert not request.called + def test_validate_metadata_server_url_if_any(self): aws.Credentials.validate_metadata_server_url_if_any( "http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone", "url"