diff --git a/src/azure-cli/azure/cli/command_modules/role/custom.py b/src/azure-cli/azure/cli/command_modules/role/custom.py index 367500f489f..2810cbaa99e 100644 --- a/src/azure-cli/azure/cli/command_modules/role/custom.py +++ b/src/azure-cli/azure/cli/command_modules/role/custom.py @@ -147,7 +147,7 @@ def create_role_assignment(cmd, role, assignee=None, assignee_object_id=None, re raise CLIError('usage error: --assignee STRING | --assignee-object-id GUID') if assignee_principal_type and not assignee_object_id: - raise CLIError('usage error: --assignee-object-id GUID [--assignee-principal-type]') + raise CLIError('usage error: --assignee-object-id GUID --assignee-principal-type TYPE') # If condition is set and condition-version is empty, condition-version defaults to "2.0". if condition and not condition_version: @@ -157,8 +157,18 @@ def create_role_assignment(cmd, role, assignee=None, assignee_object_id=None, re if condition_version and not condition: raise CLIError('usage error: When --condition-version is set, --condition must be set as well.') - object_id, principal_type = _resolve_assignee_object(cmd.cli_ctx, assignee, assignee_object_id, - assignee_principal_type) + if assignee: + object_id, principal_type = _resolve_object_id_and_type(cmd.cli_ctx, assignee, fallback_to_object_id=True) + else: + object_id = assignee_object_id + if assignee_principal_type: + # If principal type is provided, nothing to resolve, do not call Graph + principal_type = assignee_principal_type + else: + # Try best to get principal type + logger.warning('RBAC service might reject creating role assignment without --assignee-principal-type ' + 'in the future. Better to specify --assignee-principal-type manually.') + principal_type = _get_principal_type_from_object_id(cmd.cli_ctx, assignee_object_id) try: return _create_role_assignment(cmd.cli_ctx, role, object_id, resource_group_name, scope, resolve_assignee=False, @@ -1766,46 +1776,24 @@ def _encode_custom_key_description(key_description): return key_description.encode('utf-16') -def _resolve_assignee_object(cli_ctx, assignee, assignee_object_id, assignee_principal_type): +def _get_principal_type_from_object_id(cli_ctx, assignee_object_id): client = _graph_client_factory(cli_ctx) - result = None - - # resolve assignee (same as _resolve_object_id) - if assignee: - if assignee.find('@') >= 0: # looks like a user principal name - result = list(client.users.list(filter="userPrincipalName eq '{}'".format(assignee))) - if not result: - result = list(client.service_principals.list( - filter="servicePrincipalNames/any(c:c eq '{}')".format(assignee))) - if not result and is_guid(assignee): # assume an object id, let us verify it - result = _get_object_stubs(client, [assignee]) - - # 2+ matches should never happen, so we only check 'no match' here - if not result: - raise CLIError("Cannot find user or service principal in graph database for '{assignee}'. " - "If the assignee is an appId, make sure the corresponding service principal is created " - "with 'az ad sp create --id {assignee}'.".format(assignee=assignee)) - - return result[0].object_id, result[0].object_type - - # try to resolve assignee object id try: result = _get_object_stubs(client, [assignee_object_id]) if result: - return result[0].object_id, result[0].object_type + return result[0].object_type except CloudError: - pass - - # If failed to verify assignee object id, DO NOT raise exception - # since --assignee-object-id is exposed to bypass Graph API - if not assignee_principal_type: - logger.warning('Failed to query --assignee-principal-type for %s by invoking Graph API.\n' - 'RBAC server might reject creating role assignment without --assignee-principal-type ' - 'in the future. Better to specify --assignee-principal-type manually.', assignee_object_id) - return assignee_object_id, assignee_principal_type + logger.warning('Failed to query --assignee-principal-type for --assignee-object-id %s by invoking Graph API.', + assignee_object_id) + return None def _resolve_object_id(cli_ctx, assignee, fallback_to_object_id=False): + object_id, _ = _resolve_object_id_and_type(cli_ctx, assignee, fallback_to_object_id=fallback_to_object_id) + return object_id + + +def _resolve_object_id_and_type(cli_ctx, assignee, fallback_to_object_id=False): client = _graph_client_factory(cli_ctx) result = None try: @@ -1823,10 +1811,14 @@ def _resolve_object_id(cli_ctx, assignee, fallback_to_object_id=False): "If the assignee is an appId, make sure the corresponding service principal is created " "with 'az ad sp create --id {assignee}'.".format(assignee=assignee)) - return result[0].object_id + return result[0].object_id, result[0].object_type except (CloudError, GraphErrorException): + logger.warning('Failed to query %s by invoking Graph API. ' + 'If you don\'t have permission to query Graph API, please ' + 'specify --assignee-object-id and --assignee-principal-type.', assignee) if fallback_to_object_id and is_guid(assignee): - return assignee + logger.warning('Assuming %s as an object ID.', assignee) + return assignee, None raise diff --git a/src/azure-cli/azure/cli/command_modules/role/tests/latest/test_role.py b/src/azure-cli/azure/cli/command_modules/role/tests/latest/test_role.py index 24295cb5ffe..cf4000ac1ca 100644 --- a/src/azure-cli/azure/cli/command_modules/role/tests/latest/test_role.py +++ b/src/azure-cli/azure/cli/command_modules/role/tests/latest/test_role.py @@ -358,11 +358,20 @@ def _test_role_assignment(assignee_object_id, assignee_principal_type=None): self.kwargs['object_id'] = assignee_object_id self.kwargs['principal_type'] = assignee_principal_type # test role assignment on subscription level - if assignee_principal_type: - self.cmd( - 'role assignment create --assignee-object-id {object_id} --assignee-principal-type {principal_type} --role Reader -g {rg}') - else: - self.cmd('role assignment create --assignee-object-id {object_id} --role Reader -g {rg}') + + with mock.patch('azure.graphrbac.operations.ObjectsOperations.get_objects_by_object_ids') \ + as get_objects_by_object_ids_mock: + if assignee_principal_type: + self.cmd( + 'role assignment create --assignee-object-id {object_id} ' + '--assignee-principal-type {principal_type} --role Reader -g {rg}') + # Verify no graph call + get_objects_by_object_ids_mock.assert_not_called() + else: + self.cmd('role assignment create --assignee-object-id {object_id} --role Reader -g {rg}') + # Verify 1 graph call to resolve principal type + get_objects_by_object_ids_mock.assert_called_once() + self.cmd('role assignment list -g {rg}', checks=self.check("length([])", 1)) self.cmd('role assignment delete -g {rg}') self.cmd('role assignment list -g {rg}', checks=self.check("length([])", 0))