Skip to content

Commit

Permalink
fix: updated the lower bound of interactive timeout and fix the kwarg… (
Browse files Browse the repository at this point in the history
#1182)

* fix: udpated the lower bound of interactive timeout and fix the kwargs invalid syntax

* update token

* update token

* prohibit the access from constructor and only allow the injection and test

* fix lint

* adding interactive timeout validation test

* update token
  • Loading branch information
BigTailWolf authored Nov 10, 2022
1 parent bf95dfc commit 50c0fd2
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 33 deletions.
36 changes: 22 additions & 14 deletions google/auth/pluggable.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
EXECUTABLE_TIMEOUT_MILLIS_LOWER_BOUND = 5 * 1000 # 5 seconds
EXECUTABLE_TIMEOUT_MILLIS_UPPER_BOUND = 120 * 1000 # 2 minutes

EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_DEFAULT = 5 * 60 * 1000 # 5 minutes
EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_LOWER_BOUND = 5 * 60 * 1000 # 5 minutes
EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_LOWER_BOUND = 30 * 1000 # 30 seconds
EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_UPPER_BOUND = 30 * 60 * 1000 # 30 minutes


Expand Down Expand Up @@ -132,7 +131,9 @@ def __init__(
self._credential_source_executable_output_file = self._credential_source_executable.get(
"output_file"
)
self._tokeninfo_username = kwargs.get("tokeninfo_username", "") # dummy value

# Dummy value. This variable is only used via injection, not exposed to ctor
self._tokeninfo_username = ""

if not self._credential_source_executable_command:
raise ValueError(
Expand All @@ -150,17 +151,16 @@ def __init__(
):
raise ValueError("Timeout must be between 5 and 120 seconds.")

if not self._credential_source_executable_interactive_timeout_millis:
self._credential_source_executable_interactive_timeout_millis = (
EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_DEFAULT
)
elif (
self._credential_source_executable_interactive_timeout_millis
< EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_LOWER_BOUND
or self._credential_source_executable_interactive_timeout_millis
> EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_UPPER_BOUND
):
raise ValueError("Interactive timeout must be between 5 and 30 minutes.")
if self._credential_source_executable_interactive_timeout_millis:
if (
self._credential_source_executable_interactive_timeout_millis
< EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_LOWER_BOUND
or self._credential_source_executable_interactive_timeout_millis
> EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_UPPER_BOUND
):
raise ValueError(
"Interactive timeout must be between 30 seconds and 30 minutes."
)

@_helpers.copy_docstring(external_account.Credentials)
def retrieve_subject_token(self, request):
Expand Down Expand Up @@ -400,5 +400,13 @@ def _validate_running_mode(self):
"An output_file must be specified in the credential configuration for interactive mode."
)

if (
self.interactive
and not self._credential_source_executable_interactive_timeout_millis
):
raise ValueError(
"Interactive mode cannot run without an interactive timeout."
)

if self.interactive and not self.is_workforce_pool:
raise ValueError("Interactive mode is only enabled for workforce pool.")
Binary file modified system_tests/secrets.tar.enc
Binary file not shown.
59 changes: 40 additions & 19 deletions tests/test_pluggable.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,21 @@ def make_pluggable(
interactive=interactive,
)

def test_from_constructor_and_injection(self):
credentials = pluggable.Credentials(
audience=AUDIENCE,
subject_token_type=SUBJECT_TOKEN_TYPE,
token_url=TOKEN_URL,
token_info_url=TOKEN_INFO_URL,
credential_source=self.CREDENTIAL_SOURCE,
interactive=True,
)
setattr(credentials, "_tokeninfo_username", "mock_external_account_id")

assert isinstance(credentials, pluggable.Credentials)
assert credentials.interactive
assert credentials.external_account_id == "mock_external_account_id"

@mock.patch.object(pluggable.Credentials, "__init__", return_value=None)
def test_from_info_full_options(self, mock_init):
credentials = pluggable.Credentials.from_info(
Expand Down Expand Up @@ -1064,23 +1079,6 @@ def test_credential_source_timeout_large(self):

assert excinfo.match(r"Timeout must be between 5 and 120 seconds.")

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_credential_source_interactive_timeout_missing_will_use_default_interactive_timeout_value(
self
):
CREDENTIAL_SOURCE = {
"executable": {
"command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND,
"output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE,
}
}
credentials = self.make_pluggable(credential_source=CREDENTIAL_SOURCE)

assert (
credentials._credential_source_executable_interactive_timeout_millis
== pluggable.EXECUTABLE_INTERACTIVE_TIMEOUT_MILLIS_DEFAULT
)

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_credential_source_interactive_timeout_small(self):
with pytest.raises(ValueError) as excinfo:
Expand All @@ -1093,7 +1091,9 @@ def test_credential_source_interactive_timeout_small(self):
}
_ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE)

assert excinfo.match(r"Interactive timeout must be between 5 and 30 minutes.")
assert excinfo.match(
r"Interactive timeout must be between 30 seconds and 30 minutes."
)

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_credential_source_interactive_timeout_large(self):
Expand All @@ -1107,7 +1107,9 @@ def test_credential_source_interactive_timeout_large(self):
}
_ = self.make_pluggable(credential_source=CREDENTIAL_SOURCE)

assert excinfo.match(r"Interactive timeout must be between 5 and 30 minutes.")
assert excinfo.match(
r"Interactive timeout must be between 30 seconds and 30 minutes."
)

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_executable_fail(self):
Expand Down Expand Up @@ -1136,6 +1138,25 @@ def test_retrieve_subject_token_non_workforce_fail_interactive_mode(self):

assert excinfo.match(r"Interactive mode is only enabled for workforce pool.")

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_fail_on_validation_missing_interactive_timeout(
self
):
CREDENTIAL_SOURCE_EXECUTABLE = {
"command": self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND,
"output_file": self.CREDENTIAL_SOURCE_EXECUTABLE_OUTPUT_FILE,
}
CREDENTIAL_SOURCE = {"executable": CREDENTIAL_SOURCE_EXECUTABLE}
credentials = self.make_pluggable(
credential_source=CREDENTIAL_SOURCE, interactive=True
)
with pytest.raises(ValueError) as excinfo:
_ = credentials.retrieve_subject_token(None)

assert excinfo.match(
r"Interactive mode cannot run without an interactive timeout."
)

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_executable_fail_interactive_mode(self):
with mock.patch(
Expand Down

0 comments on commit 50c0fd2

Please sign in to comment.