Skip to content

Commit

Permalink
Improve error handling in export-credentials cmd
Browse files Browse the repository at this point in the history
* Use consistant RCs for config/credential fetching errors
* Have separate error messages for cycles vs max recursion
  • Loading branch information
jamesls committed Nov 16, 2022
1 parent b705644 commit 59ebc9c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 44 deletions.
41 changes: 20 additions & 21 deletions awscli/customizations/configure/exportcreds.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections import namedtuple

from awscli.customizations.commands import BasicCommand
from awscli.customizations.exceptions import ConfigurationError


# Takes botocore's ReadOnlyCredentials and exposes an expiry_time.
Expand Down Expand Up @@ -204,15 +205,23 @@ def __init__(self, session, out_stream=None, error_stream=None, env=None):
self._error_stream = error_stream
self._env = env

def _recursion_barrier_detected(self):
def _detect_recursion_barrier(self):
profile = self._get_current_profile()
seen_profiles = self._parse_profile_chain(
self._env.get(self._RECURSION_VAR, ''))
if len(seen_profiles) >= self._MAX_RECURSION:
return True
return profile in seen_profiles
raise ConfigurationError(
f"Maximum recursive credential process resolution reached "
f"({self._MAX_RECURSION}).\n"
f"Profiles seen: {' -> '.join(seen_profiles)}"
)
if profile in seen_profiles:
raise ConfigurationError(
f"Credential process resolution detected an infinite loop, "
f"profile cycle: {' -> '.join(seen_profiles + [profile])}\n"
)

def _set_recursion_barrier(self):
def _update_recursion_barrier(self):
profile = self._get_current_profile()
seen_profiles = self._parse_profile_chain(
self._env.get(self._RECURSION_VAR, ''))
Expand All @@ -238,27 +247,17 @@ def _parse_profile_chain(self, value):
return result

def _run_main(self, parsed_args, parsed_globals):
if self._recursion_barrier_detected():
self._error_stream.write(
"\n\nRecursive credential resolution process detected.\n"
"Try setting an explicit '--profile' value in the "
"'credential_process' configuration and ensure there "
"are no cycles:\n\n"
"credential_process = aws configure export-credentials "
"--profile other-profile\n"
)
return 2
self._set_recursion_barrier()
self._detect_recursion_barrier()
self._update_recursion_barrier()
try:
creds = self._session.get_credentials()
except Exception as e:
self._error_stream.write(
"Unable to retrieve credentials: %s\n" % e)
return 1
original_msg = str(e).strip()
raise ConfigurationError(
f"Unable to retrieve credentials: {original_msg}\n")
if creds is None:
self._error_stream.write(
"Unable to retrieve credentials: no credentials found\n")
return 1
raise ConfigurationError(
"Unable to retrieve credentials: no credentials found")
creds_with_expiry = convert_botocore_credentials(creds)
formatter = SUPPORTED_FORMATS[parsed_args.format](self._out_stream)
formatter.display_credentials(creds_with_expiry)
46 changes: 23 additions & 23 deletions tests/unit/customizations/configure/test_exportcreds.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from botocore.session import Session

from awscli.testutils import unittest
from awscli.customizations.exceptions import ConfigurationError
from awscli.customizations.configure.exportcreds import (
Credentials,
convert_botocore_credentials,
Expand Down Expand Up @@ -199,29 +200,28 @@ def test_can_export_creds_explicit_format(self):

def test_show_error_when_no_cred(self):
self.session.get_credentials.return_value = None
rc = self.export_creds_cmd(args=[], parsed_globals=self.global_args)
with pytest.raises(ConfigurationError) as excinfo:
self.export_creds_cmd(args=[], parsed_globals=self.global_args)
self.assertIn(
'Unable to retrieve credentials', self.err_stream.getvalue()
)
self.assertEqual(rc, 1)
'Unable to retrieve credentials', str(excinfo))

def test_show_error_when_cred_resolution_errors(self):
self.session.get_credentials.side_effect = Exception(
"resolution failed")
rc = self.export_creds_cmd(args=[], parsed_globals=self.global_args)
with pytest.raises(ConfigurationError) as excinfo:
self.export_creds_cmd(args=[], parsed_globals=self.global_args)
self.assertIn(
'resolution failed', self.err_stream.getvalue()
'resolution failed', str(excinfo)
)
self.assertEqual(rc, 1)

def test_can_detect_recursive_resolution(self):
self.os_env['_AWS_CLI_PROFILE_CHAIN'] = 'default'
rc = self.export_creds_cmd(args=[], parsed_globals=self.global_args)
with pytest.raises(ConfigurationError) as excinfo:
self.export_creds_cmd(args=[], parsed_globals=self.global_args)
self.assertIn(
'Recursive credential resolution process detected',
self.err_stream.getvalue()
'Credential process resolution detected an infinite loop',
str(excinfo),
)
self.assertEqual(rc, 2)

def test_nested_calls_not_recursive(self):
self.session.get_credentials.return_value = self.creds
Expand All @@ -243,12 +243,12 @@ def test_nested_calls_with_cycle(self):
self.session.get_credentials.return_value = self.creds
self.os_env['_AWS_CLI_PROFILE_CHAIN'] = 'foo,bar,baz'
self.session.get_config_variable.return_value = 'bar'
rc = self.export_creds_cmd(args=[], parsed_globals=self.global_args)
with pytest.raises(ConfigurationError) as excinfo:
self.export_creds_cmd(args=[], parsed_globals=self.global_args)
self.assertIn(
'Recursive credential resolution process detected',
self.err_stream.getvalue()
'Credential process resolution detected an infinite loop',
str(excinfo),
)
self.assertEqual(rc, 2)

def test_handles_comma_char_in_profile_name_no_cycle(self):
self.session.get_credentials.return_value = self.creds
Expand Down Expand Up @@ -277,20 +277,20 @@ def test_detects_comma_char_with_cycle(self):
# Second time, it detects the cycle.
second_invoke = ConfigureExportCredentialsCommand(
self.session, self.out_stream, self.err_stream, env=self.os_env)
rc = second_invoke(args=[], parsed_globals=self.global_args)
with pytest.raises(ConfigurationError) as excinfo:
second_invoke(args=[], parsed_globals=self.global_args)
self.assertIn(
'Recursive credential resolution process detected',
self.err_stream.getvalue()
'Credential process resolution detected an infinite loop',
str(excinfo),
)
self.assertEqual(rc, 2)

def test_max_recursion_limit(self):
self.session.get_credentials.return_value = self.creds
self.os_env['_AWS_CLI_PROFILE_CHAIN'] = ','.join(
['a', 'b', 'c', 'd', 'e', 'f', 'g'])
rc = self.export_creds_cmd(args=[], parsed_globals=self.global_args)
with pytest.raises(ConfigurationError) as excinfo:
self.export_creds_cmd(args=[], parsed_globals=self.global_args)
self.assertIn(
'Recursive credential resolution process detected',
self.err_stream.getvalue()
'Maximum recursive credential process resolution reached',
str(excinfo),
)
self.assertEqual(rc, 2)

0 comments on commit 59ebc9c

Please sign in to comment.