diff --git a/docs/user-guide.rst b/docs/user-guide.rst index e689b11c6..682b58a76 100644 --- a/docs/user-guide.rst +++ b/docs/user-guide.rst @@ -434,8 +434,10 @@ Response format fields summary: - ``version``: The version of the JSON output. Currently only version 1 is supported. - ``success``: The status of the response. - - When true, the response must contain the 3rd party token, token type, and expiration. The executable must also exit with exit code 0. - - When false, the response must contain the error code and message fields and exit with a non-zero value. + - When true, the response must contain the 3rd party token, token type, and + expiration. The executable must also exit with exit code 0. + - When false, the response must contain the error code and message fields + and exit with a non-zero value. - ``token_type``: The 3rd party subject token type. Must be - *urn:ietf:params:oauth:token-type:jwt* - *urn:ietf:params:oauth:token-type:id_token* @@ -450,7 +452,9 @@ Response format fields summary: All response types must include both the ``version`` and ``success`` fields. Successful responses must include the ``token_type``, and one of ``id_token`` or ``saml_response``. -If output file is specified, ``expiration_time`` is mandatory. +``expiration_time`` is optional. If the output file does not contain the +``expiration_time`` field, the response will be considered expired and the +executable will be called. Error responses must include both the ``code`` and ``message`` fields. The library will populate the following environment variables when the diff --git a/google/auth/pluggable.py b/google/auth/pluggable.py index 42f6bcd81..6be8222c1 100644 --- a/google/auth/pluggable.py +++ b/google/auth/pluggable.py @@ -38,6 +38,7 @@ import json import os import subprocess +import sys import time from google.auth import _helpers @@ -47,6 +48,14 @@ # The max supported executable spec version. EXECUTABLE_SUPPORTED_MAX_VERSION = 1 +EXECUTABLE_TIMEOUT_MILLIS_DEFAULT = 30 * 1000 # 30 seconds +EXECUTABLE_TIMEOUT_MILLIS_LOWER_BOUND = 5 * 1000 # 5 seconds +EXECUTABLE_TIMEOUT_MILLIS_UPPER_BOUND = 120 * 1000 # 2 minutes + +EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_DEFAULT = 5 * 60 * 1000 # 5 minutes +EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_LOWER_BOUND = 5 * 60 * 1000 # 5 minutes +EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_UPPER_BOUND = 30 * 60 * 1000 # 30 minutes + class Credentials(external_account.Credentials): """External account credentials sourced from executables.""" @@ -92,6 +101,7 @@ def __init__( :meth:`from_info` are used instead of calling the constructor directly. """ + self.interactive = kwargs.pop("interactive", False) super(Credentials, self).__init__( audience=audience, subject_token_type=subject_token_type, @@ -116,37 +126,51 @@ def __init__( self._credential_source_executable_timeout_millis = self._credential_source_executable.get( "timeout_millis" ) + self._credential_source_executable_interactive_timeout_millis = self._credential_source_executable.get( + "interactive_timeout_millis" + ) self._credential_source_executable_output_file = self._credential_source_executable.get( "output_file" ) + self._tokeninfo_username = kwargs.get("tokeninfo_username", "") # dummy value if not self._credential_source_executable_command: raise ValueError( "Missing command field. Executable command must be provided." ) if not self._credential_source_executable_timeout_millis: - self._credential_source_executable_timeout_millis = 30 * 1000 + self._credential_source_executable_timeout_millis = ( + EXECUTABLE_TIMEOUT_MILLIS_DEFAULT + ) elif ( - self._credential_source_executable_timeout_millis < 5 * 1000 - or self._credential_source_executable_timeout_millis > 120 * 1000 + self._credential_source_executable_timeout_millis + < EXECUTABLE_TIMEOUT_MILLIS_LOWER_BOUND + or self._credential_source_executable_timeout_millis + > EXECUTABLE_TIMEOUT_MILLIS_UPPER_BOUND ): raise ValueError("Timeout must be between 5 and 120 seconds.") + if not self._credential_source_executable_interactive_timeout_millis: + self._credential_source_executable_interactive_timeout_millis = ( + EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_DEFAULT + ) + elif ( + self._credential_source_executable_interactive_timeout_millis + < EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_LOWER_BOUND + or self._credential_source_executable_interactive_timeout_millis + > EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_UPPER_BOUND + ): + raise ValueError("Interactive timeout must be between 5 and 30 minutes.") + @_helpers.copy_docstring(external_account.Credentials) def retrieve_subject_token(self, request): - env_allow_executables = os.environ.get( - "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES" - ) - if env_allow_executables != "1": - raise ValueError( - "Executables need to be explicitly allowed (set GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES to '1') to run." - ) + self._validate_running_mode() # Check output file. if self._credential_source_executable_output_file is not None: try: with open( - self._credential_source_executable_output_file + self._credential_source_executable_output_file, encoding="utf-8" ) as output_file: response = json.load(output_file) except Exception: @@ -155,6 +179,10 @@ def retrieve_subject_token(self, request): try: # If the cached response is expired, _parse_subject_token will raise an error which will be ignored and we will call the executable again. subject_token = self._parse_subject_token(response) + if ( + "expiration_time" not in response + ): # Always treat missing expiration_time as expired and proceed to executable run. + raise exceptions.RefreshError except ValueError: raise except exceptions.RefreshError: @@ -169,46 +197,102 @@ def retrieve_subject_token(self, request): # Inject env vars. env = os.environ.copy() - env["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience - env["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type - env[ - "GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE" - ] = "0" # Always set to 0 until interactive mode is implemented. - if self._service_account_impersonation_url is not None: - env[ - "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL" - ] = self.service_account_email - if self._credential_source_executable_output_file is not None: - env[ - "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE" - ] = self._credential_source_executable_output_file + self._inject_env_variables(env) + env["GOOGLE_EXTERNAL_ACCOUNT_REVOKE"] = "0" - try: - result = subprocess.run( - self._credential_source_executable_command.split(), - timeout=self._credential_source_executable_timeout_millis / 1000, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - env=env, - ) - if result.returncode != 0: - raise exceptions.RefreshError( - "Executable exited with non-zero return code {}. Error: {}".format( - result.returncode, result.stdout - ) + # Run executable. + exe_timeout = ( + self._credential_source_executable_interactive_timeout_millis / 1000 + if self.interactive + else self._credential_source_executable_timeout_millis / 1000 + ) + exe_stdin = sys.stdin if self.interactive else None + exe_stdout = sys.stdout if self.interactive else subprocess.PIPE + exe_stderr = sys.stdout if self.interactive else subprocess.STDOUT + + result = subprocess.run( + self._credential_source_executable_command.split(), + timeout=exe_timeout, + stdin=exe_stdin, + stdout=exe_stdout, + stderr=exe_stderr, + env=env, + ) + if result.returncode != 0: + raise exceptions.RefreshError( + "Executable exited with non-zero return code {}. Error: {}".format( + result.returncode, result.stdout ) - except Exception: - raise - else: - try: - data = result.stdout.decode("utf-8") - response = json.loads(data) - subject_token = self._parse_subject_token(response) - except Exception: - raise + ) + + # Handle executable output. + response = json.loads(result.stdout.decode("utf-8")) if result.stdout else None + if not response and self._credential_source_executable_output_file is not None: + response = json.load( + open(self._credential_source_executable_output_file, encoding="utf-8") + ) + subject_token = self._parse_subject_token(response) return subject_token + def revoke(self, request): + """Revokes the subject token using the credential_source object. + + Args: + request (google.auth.transport.Request): A callable used to make + HTTP requests. + Raises: + google.auth.exceptions.RefreshError: If the executable revocation + not properly executed. + + """ + if not self.interactive: + raise ValueError("Revoke is only enabled under interactive mode.") + self._validate_running_mode() + + if not _helpers.is_python_3(): + raise exceptions.RefreshError( + "Pluggable auth is only supported for python 3.6+" + ) + + # Inject variables + env = os.environ.copy() + self._inject_env_variables(env) + env["GOOGLE_EXTERNAL_ACCOUNT_REVOKE"] = "1" + + # Run executable + result = subprocess.run( + self._credential_source_executable_command.split(), + timeout=self._credential_source_executable_interactive_timeout_millis + / 1000, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env, + ) + + if result.returncode != 0: + raise exceptions.RefreshError( + "Auth revoke failed on executable. Exit with non-zero return code {}. Error: {}".format( + result.returncode, result.stdout + ) + ) + + response = json.loads(result.stdout.decode("utf-8")) + self._validate_revoke_response(response) + + @property + def external_account_id(self): + """Returns the external account identifier. + + When service account impersonation is used the identifier is the service + account email. + + Without service account impersonation, this returns None, unless it is + being used by the Google Cloud CLI which populates this field. + """ + + return self.service_account_email or self._tokeninfo_username + @classmethod def from_info(cls, info, **kwargs): """Creates a Pluggable Credentials instance from parsed external account info. @@ -241,17 +325,23 @@ def from_file(cls, filename, **kwargs): """ return super(Credentials, cls).from_file(filename, **kwargs) + def _inject_env_variables(self, env): + env["GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE"] = self._audience + env["GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE"] = self._subject_token_type + env["GOOGLE_EXTERNAL_ACCOUNT_ID"] = self.external_account_id + env["GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE"] = "1" if self.interactive else "0" + + if self._service_account_impersonation_url is not None: + env[ + "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL" + ] = self.service_account_email + if self._credential_source_executable_output_file is not None: + env[ + "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE" + ] = self._credential_source_executable_output_file + def _parse_subject_token(self, response): - if "version" not in response: - raise ValueError("The executable response is missing the version field.") - if response["version"] > EXECUTABLE_SUPPORTED_MAX_VERSION: - raise exceptions.RefreshError( - "Executable returned unsupported version {}.".format( - response["version"] - ) - ) - if "success" not in response: - raise ValueError("The executable response is missing the success field.") + self._validate_response_schema(response) if not response["success"]: if "code" not in response or "message" not in response: raise ValueError( @@ -262,13 +352,6 @@ def _parse_subject_token(self, response): response["code"], response["message"] ) ) - if ( - "expiration_time" not in response - and self._credential_source_executable_output_file - ): - raise ValueError( - "The executable response must contain an expiration_time for successful responses when an output_file has been specified in the configuration." - ) if "expiration_time" in response and response["expiration_time"] < time.time(): raise exceptions.RefreshError( "The token returned by the executable is expired." @@ -284,3 +367,38 @@ def _parse_subject_token(self, response): return response["saml_response"] else: raise exceptions.RefreshError("Executable returned unsupported token type.") + + def _validate_revoke_response(self, response): + self._validate_response_schema(response) + if not response["success"]: + raise exceptions.RefreshError("Revoke failed with unsuccessful response.") + + def _validate_response_schema(self, response): + if "version" not in response: + raise ValueError("The executable response is missing the version field.") + if response["version"] > EXECUTABLE_SUPPORTED_MAX_VERSION: + raise exceptions.RefreshError( + "Executable returned unsupported version {}.".format( + response["version"] + ) + ) + + if "success" not in response: + raise ValueError("The executable response is missing the success field.") + + def _validate_running_mode(self): + env_allow_executables = os.environ.get( + "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES" + ) + if env_allow_executables != "1": + raise ValueError( + "Executables need to be explicitly allowed (set GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES to '1') to run." + ) + + if self.interactive and not self._credential_source_executable_output_file: + raise ValueError( + "An output_file must be specified in the credential configuration for interactive mode." + ) + + if self.interactive and not self.is_workforce_pool: + raise ValueError("Interactive mode is only enabled for workforce pool.") diff --git a/system_tests/secrets.tar.enc b/system_tests/secrets.tar.enc index 61917177f..2bef7d971 100644 Binary files a/system_tests/secrets.tar.enc and b/system_tests/secrets.tar.enc differ diff --git a/tests/test_pluggable.py b/tests/test_pluggable.py index 80cde7972..293d5c6ed 100644 --- a/tests/test_pluggable.py +++ b/tests/test_pluggable.py @@ -26,6 +26,7 @@ # from google.auth import _helpers from google.auth import exceptions from google.auth import pluggable +from tests.test__default import WORKFORCE_AUDIENCE # from google.auth import transport @@ -44,7 +45,6 @@ SUBJECT_TOKEN_FIELD_NAME = "access_token" TOKEN_URL = "https://sts.googleapis.com/v1/token" -SERVICE_ACCOUNT_IMPERSONATION_URL = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/byoid-test@cicpclientproj.iam.gserviceaccount.com:generateAccessToken" SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID" @@ -57,6 +57,7 @@ class TestCredentials(object): CREDENTIAL_SOURCE_EXECUTABLE = { "command": CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, "timeout_millis": 30000, + "interactive_timeout_millis": 300000, "output_file": CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, } CREDENTIAL_SOURCE = {"executable": CREDENTIAL_SOURCE_EXECUTABLE} @@ -68,6 +69,12 @@ class TestCredentials(object): "id_token": EXECUTABLE_OIDC_TOKEN, "expiration_time": 9999999999, } + EXECUTABLE_SUCCESSFUL_OIDC_NO_EXPIRATION_TIME_RESPONSE_ID_TOKEN = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:id_token", + "id_token": EXECUTABLE_OIDC_TOKEN, + } EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT = { "version": 1, "success": True, @@ -75,6 +82,12 @@ class TestCredentials(object): "id_token": EXECUTABLE_OIDC_TOKEN, "expiration_time": 9999999999, } + EXECUTABLE_SUCCESSFUL_OIDC_NO_EXPIRATION_TIME_RESPONSE_JWT = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:jwt", + "id_token": EXECUTABLE_OIDC_TOKEN, + } EXECUTABLE_SAML_TOKEN = "FAKE_SAML_RESPONSE" EXECUTABLE_SUCCESSFUL_SAML_RESPONSE = { "version": 1, @@ -83,6 +96,12 @@ class TestCredentials(object): "saml_response": EXECUTABLE_SAML_TOKEN, "expiration_time": 9999999999, } + EXECUTABLE_SUCCESSFUL_SAML_NO_EXPIRATION_TIME_RESPONSE = { + "version": 1, + "success": True, + "token_type": "urn:ietf:params:oauth:token-type:saml2", + "saml_response": EXECUTABLE_SAML_TOKEN, + } EXECUTABLE_FAILED_RESPONSE = { "version": 1, "success": False, @@ -104,6 +123,7 @@ def make_pluggable( service_account_impersonation_url=None, credential_source=None, workforce_pool_user_project=None, + interactive=None, ): return pluggable.Credentials( audience=audience, @@ -117,6 +137,7 @@ def make_pluggable( scopes=scopes, default_scopes=default_scopes, workforce_pool_user_project=workforce_pool_user_project, + interactive=interactive, ) @mock.patch.object(pluggable.Credentials, "__init__", return_value=None) @@ -262,77 +283,134 @@ def test_info_with_credential_source(self): "credential_source": self.CREDENTIAL_SOURCE, } - @mock.patch.dict( - os.environ, - { - "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1", - "GOOGLE_EXTERNAL_ACCOUNT_AUDIENCE": "original_audience", - "GOOGLE_EXTERNAL_ACCOUNT_TOKEN_TYPE": "original_token_type", - "GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE": "0", - "GOOGLE_EXTERNAL_ACCOUNT_IMPERSONATED_EMAIL": "original_impersonated_email", - "GOOGLE_EXTERNAL_ACCOUNT_OUTPUT_FILE": "original_output_file", - }, - ) - def test_retrieve_subject_token_oidc_id_token(self): - with mock.patch( - "subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], - stdout=json.dumps( + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_successfully(self, tmpdir): + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = tmpdir.join( + "actual_output_file" + ) + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { + "command": "command", + "interactive_timeout_millis": 300000, + "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} + + testData = { + "subject_token_oidc_id_token": { + "stdout": json.dumps( self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN ).encode("UTF-8"), - returncode=0, - ), - ): - credentials = self.make_pluggable( - audience=AUDIENCE, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - credential_source=self.CREDENTIAL_SOURCE, - ) + "impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "file_content": self.EXECUTABLE_SUCCESSFUL_OIDC_NO_EXPIRATION_TIME_RESPONSE_ID_TOKEN, + "expect_token": self.EXECUTABLE_OIDC_TOKEN, + }, + "subject_token_oidc_id_token_interacitve_mode": { + "audience": WORKFORCE_AUDIENCE, + "file_content": self.EXECUTABLE_SUCCESSFUL_OIDC_NO_EXPIRATION_TIME_RESPONSE_ID_TOKEN, + "interactive": True, + "expect_token": self.EXECUTABLE_OIDC_TOKEN, + }, + "subject_token_oidc_jwt": { + "stdout": json.dumps( + self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT + ).encode("UTF-8"), + "impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "file_content": self.EXECUTABLE_SUCCESSFUL_OIDC_NO_EXPIRATION_TIME_RESPONSE_JWT, + "expect_token": self.EXECUTABLE_OIDC_TOKEN, + }, + "subject_token_oidc_jwt_interactive_mode": { + "audience": WORKFORCE_AUDIENCE, + "file_content": self.EXECUTABLE_SUCCESSFUL_OIDC_NO_EXPIRATION_TIME_RESPONSE_JWT, + "interactive": True, + "expect_token": self.EXECUTABLE_OIDC_TOKEN, + }, + "subject_token_saml": { + "stdout": json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE).encode( + "UTF-8" + ), + "impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL, + "file_content": self.EXECUTABLE_SUCCESSFUL_SAML_NO_EXPIRATION_TIME_RESPONSE, + "expect_token": self.EXECUTABLE_SAML_TOKEN, + }, + "subject_token_saml_interactive_mode": { + "audience": WORKFORCE_AUDIENCE, + "file_content": self.EXECUTABLE_SUCCESSFUL_SAML_NO_EXPIRATION_TIME_RESPONSE, + "interactive": True, + "expect_token": self.EXECUTABLE_SAML_TOKEN, + }, + } - subject_token = credentials.retrieve_subject_token(None) + for data in testData.values(): + with open( + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w" + ) as output_file: + json.dump(data.get("file_content"), output_file) - assert subject_token == self.EXECUTABLE_OIDC_TOKEN + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], stdout=data.get("stdout"), returncode=0 + ), + ): + credentials = self.make_pluggable( + audience=data.get("audience", AUDIENCE), + service_account_impersonation_url=data.get("impersonation_url"), + credential_source=ACTUAL_CREDENTIAL_SOURCE, + interactive=data.get("interactive", False), + ) + subject_token = credentials.retrieve_subject_token(None) + assert subject_token == data.get("expect_token") + os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_oidc_jwt(self): + def test_retrieve_subject_token_saml(self): with mock.patch( "subprocess.run", return_value=subprocess.CompletedProcess( args=[], - stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_JWT).encode( + stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE).encode( "UTF-8" ), returncode=0, ), ): - credentials = self.make_pluggable( - audience=AUDIENCE, - service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, - credential_source=self.CREDENTIAL_SOURCE, - ) + credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) subject_token = credentials.retrieve_subject_token(None) - assert subject_token == self.EXECUTABLE_OIDC_TOKEN + assert subject_token == self.EXECUTABLE_SAML_TOKEN @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_saml(self): + def test_retrieve_subject_token_saml_interactive_mode(self, tmpdir): + + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = tmpdir.join( + "actual_output_file" + ) + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { + "command": "command", + "interactive_timeout_millis": 300000, + "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} + with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: + json.dump( + self.EXECUTABLE_SUCCESSFUL_SAML_NO_EXPIRATION_TIME_RESPONSE, output_file + ) + with mock.patch( "subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], - stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_SAML_RESPONSE).encode( - "UTF-8" - ), - returncode=0, - ), + return_value=subprocess.CompletedProcess(args=[], returncode=0), ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) + credentials = self.make_pluggable( + audience=WORKFORCE_AUDIENCE, + credential_source=ACTUAL_CREDENTIAL_SOURCE, + interactive=True, + ) subject_token = credentials.retrieve_subject_token(None) assert subject_token == self.EXECUTABLE_SAML_TOKEN + os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_failed(self): @@ -353,6 +431,46 @@ def test_retrieve_subject_token_failed(self): r"Executable returned unsuccessful response: code: 401, message: Permission denied. Caller not authorized." ) + @mock.patch.dict( + os.environ, + { + "GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1", + "GOOGLE_EXTERNAL_ACCOUNT_INTERACTIVE": "1", + }, + ) + def test_retrieve_subject_token_failed_interactive_mode(self, tmpdir): + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = tmpdir.join( + "actual_output_file" + ) + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { + "command": "command", + "interactive_timeout_millis": 300000, + "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} + with open( + ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w", encoding="utf-8" + ) as output_file: + json.dump(self.EXECUTABLE_FAILED_RESPONSE, output_file) + + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess(args=[], returncode=0), + ): + credentials = self.make_pluggable( + audience=WORKFORCE_AUDIENCE, + credential_source=ACTUAL_CREDENTIAL_SOURCE, + interactive=True, + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"Executable returned unsuccessful response: code: 401, message: Permission denied. Caller not authorized." + ) + os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "0"}) def test_retrieve_subject_token_not_allowd(self): with mock.patch( @@ -641,63 +759,6 @@ def test_retrieve_subject_token_missing_error_code_message(self): r"Error code and message fields are required in the response." ) - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_without_expiration_time_should_fail_when_output_file_specified( - self, - ): - EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = { - "version": 1, - "success": True, - "token_type": "urn:ietf:params:oauth:token-type:id_token", - "id_token": self.EXECUTABLE_OIDC_TOKEN, - } - - with mock.patch( - "subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], - stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE).encode("UTF-8"), - returncode=0, - ), - ): - credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"The executable response must contain an expiration_time for successful responses when an output_file has been specified in the configuration." - ) - - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) - def test_retrieve_subject_token_without_expiration_time_should_fail_when_retrieving_from_output_file( - self, tmpdir - ): - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE = tmpdir.join( - "actual_output_file" - ) - ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE = { - "command": "command", - "timeout_millis": 30000, - "output_file": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, - } - ACTUAL_CREDENTIAL_SOURCE = {"executable": ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE} - data = self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN.copy() - data.pop("expiration_time") - - with open(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, "w") as output_file: - json.dump(data, output_file) - - credentials = self.make_pluggable(credential_source=ACTUAL_CREDENTIAL_SOURCE) - - with pytest.raises(ValueError) as excinfo: - _ = credentials.retrieve_subject_token(None) - - assert excinfo.match( - r"The executable response must contain an expiration_time for successful responses when an output_file has been specified in the configuration." - ) - os.remove(ACTUAL_CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE) - @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_without_expiration_time_should_pass_when_output_file_not_specified( self, @@ -767,6 +828,36 @@ def test_credential_source_missing_command(self): r"Missing command field. Executable command must be provided." ) + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_missing_output_interactive_mode(self): + CREDENTIAL_SOURCE = { + "executable": {"command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND} + } + credentials = self.make_pluggable( + credential_source=CREDENTIAL_SOURCE, interactive=True + ) + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"An output_file must be specified in the credential configuration for interactive mode." + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_timeout_missing_will_use_default_timeout_value(self): + CREDENTIAL_SOURCE = { + "executable": { + "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + } + credentials = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) + + assert ( + credentials._credential_source_executable_timeout_millis + == pluggable.EXECUTABLE_TIMEOUT_MILLIS_DEFAULT + ) + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_credential_source_timeout_small(self): with pytest.raises(ValueError) as excinfo: @@ -795,6 +886,51 @@ def test_credential_source_timeout_large(self): assert excinfo.match(r"Timeout must be between 5 and 120 seconds.") + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_interactive_timeout_missing_will_use_default_interactive_timeout_value( + self + ): + CREDENTIAL_SOURCE = { + "executable": { + "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + } + credentials = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) + + assert ( + credentials._credential_source_executable_interactive_timeout_millis + == pluggable.EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_DEFAULT + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_interactive_timeout_small(self): + with pytest.raises(ValueError) as excinfo: + CREDENTIAL_SOURCE = { + "executable": { + "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "interactive_timeout_millis": 30000 - 1, + "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + } + _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) + + assert excinfo.match(r"Interactive timeout must be between 5 and 30 minutes.") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_credential_source_interactive_timeout_large(self): + with pytest.raises(ValueError) as excinfo: + CREDENTIAL_SOURCE = { + "executable": { + "command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND, + "interactive_timeout_millis": 1800000 + 1, + "output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE, + } + } + _ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE) + + assert excinfo.match(r"Interactive timeout must be between 5 and 30 minutes.") + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_executable_fail(self): with mock.patch( @@ -812,6 +948,120 @@ def test_retrieve_subject_token_executable_fail(self): r"Executable exited with non-zero return code 1. Error: None" ) + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_non_workforce_fail_interactive_mode(self): + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE, interactive=True + ) + with pytest.raises(ValueError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match(r"Interactive mode is only enabled for workforce pool.") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_retrieve_subject_token_executable_fail_interactive_mode(self): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], stdout=None, returncode=1 + ), + ): + credentials = self.make_pluggable( + audience=WORKFORCE_AUDIENCE, + credential_source=self.CREDENTIAL_SOURCE, + interactive=True, + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.retrieve_subject_token(None) + + assert excinfo.match( + r"Executable exited with non-zero return code 1. Error: None" + ) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "0"}) + def test_revoke_failed_executable_not_allowed(self): + credentials = self.make_pluggable( + credential_source=self.CREDENTIAL_SOURCE, interactive=True + ) + with pytest.raises(ValueError) as excinfo: + _ = credentials.revoke(None) + + assert excinfo.match(r"Executables need to be explicitly allowed") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_revoke_failed(self): + testData = { + "non_interactive_mode": { + "interactive": False, + "expectErrType": ValueError, + "expectErrPattern": r"Revoke is only enabled under interactive mode.", + }, + "executable_failed": { + "returncode": 1, + "expectErrType": exceptions.RefreshError, + "expectErrPattern": r"Auth revoke failed on executable.", + }, + "response_validation_missing_version": { + "response": {}, + "expectErrType": ValueError, + "expectErrPattern": r"The executable response is missing the version field.", + }, + "response_validation_invalid_version": { + "response": {"version": 2}, + "expectErrType": exceptions.RefreshError, + "expectErrPattern": r"Executable returned unsupported version.", + }, + "response_validation_missing_success": { + "response": {"version": 1}, + "expectErrType": ValueError, + "expectErrPattern": r"The executable response is missing the success field.", + }, + "response_validation_failed_with_success_field_is_false": { + "response": {"version": 1, "success": False}, + "expectErrType": exceptions.RefreshError, + "expectErrPattern": r"Revoke failed with unsuccessful response.", + }, + } + for data in testData.values(): + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(data.get("response")).encode("UTF-8"), + returncode=data.get("returncode", 0), + ), + ): + credentials = self.make_pluggable( + audience=WORKFORCE_AUDIENCE, + service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL, + credential_source=self.CREDENTIAL_SOURCE, + interactive=data.get("interactive", True), + ) + + with pytest.raises(data.get("expectErrType")) as excinfo: + _ = credentials.revoke(None) + + assert excinfo.match(data.get("expectErrPattern")) + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_revoke_successfully(self): + ACTUAL_RESPONSE = {"version": 1, "success": True} + with mock.patch( + "subprocess.run", + return_value=subprocess.CompletedProcess( + args=[], + stdout=json.dumps(ACTUAL_RESPONSE).encode("utf-8"), + returncode=0, + ), + ): + credentials = self.make_pluggable( + audience=WORKFORCE_AUDIENCE, + credential_source=self.CREDENTIAL_SOURCE, + interactive=True, + ) + _ = credentials.revoke(None) + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) def test_retrieve_subject_token_python_2(self): with mock.patch("sys.version_info", (2, 7)): @@ -821,3 +1071,17 @@ def test_retrieve_subject_token_python_2(self): _ = credentials.retrieve_subject_token(None) assert excinfo.match(r"Pluggable auth is only supported for python 3.6+") + + @mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"}) + def test_revoke_subject_token_python_2(self): + with mock.patch("sys.version_info", (2, 7)): + credentials = self.make_pluggable( + audience=WORKFORCE_AUDIENCE, + credential_source=self.CREDENTIAL_SOURCE, + interactive=True, + ) + + with pytest.raises(exceptions.RefreshError) as excinfo: + _ = credentials.revoke(None) + + assert excinfo.match(r"Pluggable auth is only supported for python 3.6+")