Skip to content

Commit

Permalink
fix: Handle integrity fails if groups map to same roles (#1605)
Browse files Browse the repository at this point in the history
If AUTH_USER_REGISTRATION is enabled and AUTH_ROLE_MAPPING is set such
that multiple LDAP groups map to the same role, then upon attempting to
register the user, specifically inserting that user's roles into the
database, then these inserts will fail due to the contraint check.

To ensure this does not occur, the roles that are to be inserted for the
user needs to be de-dupicated.

Fixes issue #1603.

Co-authored-by: Daniel Vaz Gaspar <[email protected]>
  • Loading branch information
fredthomsen and dpgaspar authored Jun 14, 2021
1 parent 46b7050 commit 8c3989c
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 12 deletions.
22 changes: 11 additions & 11 deletions flask_appbuilder/security/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def create_jwt_manager(self, app) -> JWTManager:
def create_builtin_roles(self):
return self.appbuilder.get_app.config.get("FAB_ROLES", {})

def get_roles_from_keys(self, role_keys: List[str]) -> List[role_model]:
def get_roles_from_keys(self, role_keys: List[str]) -> Set[role_model]:
"""
Construct a list of FAB role objects, from a list of keys.
Expand All @@ -314,14 +314,14 @@ def get_roles_from_keys(self, role_keys: List[str]) -> List[role_model]:
:param role_keys: the list of FAB role keys
:return: a list of RoleModelView
"""
_roles = []
_roles = set()
_role_keys = set(role_keys)
for role_key, fab_role_names in self.auth_roles_mapping.items():
if role_key in _role_keys:
for fab_role_name in fab_role_names:
fab_role = self.find_role(fab_role_name)
if fab_role:
_roles.append(fab_role)
_roles.add(fab_role)
else:
log.warning(
"Can't find role specified in AUTH_ROLES_MAPPING: {0}".format(
Expand Down Expand Up @@ -918,14 +918,14 @@ def _search_ldap(self, ldap, con, username):
def _ldap_calculate_user_roles(
self, user_attributes: Dict[str, bytes]
) -> List[str]:
user_role_objects = []
user_role_objects = set()

# apply AUTH_ROLES_MAPPING
if len(self.auth_roles_mapping) > 0:
user_role_keys = self.ldap_extract_list(
user_attributes, self.auth_ldap_group_field
)
user_role_objects += self.get_roles_from_keys(user_role_keys)
user_role_objects.update(self.get_roles_from_keys(user_role_keys))

# apply AUTH_USER_REGISTRATION
if self.auth_user_registration:
Expand All @@ -934,15 +934,15 @@ def _ldap_calculate_user_roles(
# lookup registration role in flask db
fab_role = self.find_role(registration_role_name)
if fab_role:
user_role_objects.append(fab_role)
user_role_objects.add(fab_role)
else:
log.warning(
"Can't find AUTH_USER_REGISTRATION role: {0}".format(
registration_role_name
)
)

return user_role_objects
return list(user_role_objects)

def _ldap_bind_indirect(self, ldap, con) -> None:
"""
Expand Down Expand Up @@ -1243,12 +1243,12 @@ def auth_user_remote_user(self, username):
return user

def _oauth_calculate_user_roles(self, userinfo) -> List[str]:
user_role_objects = []
user_role_objects = set()

# apply AUTH_ROLES_MAPPING
if len(self.auth_roles_mapping) > 0:
user_role_keys = userinfo.get("role_keys", [])
user_role_objects += self.get_roles_from_keys(user_role_keys)
user_role_objects.update(self.get_roles_from_keys(user_role_keys))

# apply AUTH_USER_REGISTRATION_ROLE
if self.auth_user_registration:
Expand All @@ -1266,15 +1266,15 @@ def _oauth_calculate_user_roles(self, userinfo) -> List[str]:
# lookup registration role in flask db
fab_role = self.find_role(registration_role_name)
if fab_role:
user_role_objects.append(fab_role)
user_role_objects.add(fab_role)
else:
log.warning(
"Can't find AUTH_USER_REGISTRATION role: {0}".format(
registration_role_name
)
)

return user_role_objects
return list(user_role_objects)

def auth_user_oauth(self, userinfo):
"""
Expand Down
96 changes: 95 additions & 1 deletion flask_appbuilder/tests/_test_auth_ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def tearDown(self):
"email": [b"[email protected]"],
},
)
user_natalie = (
"uid=natalie,ou=users,o=test",
{
"uid": ["natalie"],
"userPassword": ["natalie_password"],
"memberOf": [
b"cn=staff,ou=groups,o=test",
b"cn=admin,ou=groups,o=test",
b"cn=exec,ou=groups,o=test",
],
"givenName": [b"Natalie"],
"sn": [b"Smith"],
"email": [b"[email protected]"],
},
)
group_admins = (
"cn=admins,ou=groups,o=test",
{"cn": ["admins"], "member": [user_admin[0]]},
Expand All @@ -92,7 +107,16 @@ def tearDown(self):
{"cn": ["staff"], "member": [user_alice[0]]},
)
directory = dict(
[top, ou_users, ou_groups, user_admin, user_alice, group_admins, group_staff]
[
top,
ou_users,
ou_groups,
user_admin,
user_alice,
user_natalie,
group_admins,
group_staff,
]
)

# ----------------
Expand All @@ -111,6 +135,11 @@ def tearDown(self):
tuple(["uid=alice,ou=users,o=test", "alice_password"]),
{},
)
call_bind_natalie = (
"simple_bind_s",
tuple(["uid=natalie,ou=users,o=test", "natalie_password"]),
{},
)
call_search_alice = (
"search_s",
tuple(["ou=users,o=test", 2, "(uid=alice)", ["givenName", "sn", "email"]]),
Expand All @@ -128,6 +157,18 @@ def tearDown(self):
),
{},
)
call_search_natalie_memberof = (
"search_s",
tuple(
[
"ou=users,o=test",
2,
"(uid=natalie)",
["givenName", "sn", "email", "memberOf"],
]
),
{},
)
call_search_alice_filter = (
"search_s",
tuple(
Expand Down Expand Up @@ -320,6 +361,59 @@ def test__inactive_user(self):
# validate - expected LDAP methods were called
self.assertEquals(self.ldapobj.methods_called(with_args=True), [])

def test__multi_group_user_mapping_to_same_role(self):
"""
LDAP: test login flow for - user in multiple groups mapping to same role
"""
self.app.config["AUTH_ROLES_MAPPING"] = {
"cn=staff,ou=groups,o=test": ["Admin"],
"cn=admin,ou=groups,o=test": ["Admin", "User"],
"cn=exec,ou=groups,o=test": ["Public"],
}
self.app.config["AUTH_LDAP_SEARCH"] = "ou=users,o=test"
self.app.config["AUTH_LDAP_USERNAME_FORMAT"] = "uid=%s,ou=users,o=test"
self.app.config["AUTH_USER_REGISTRATION"] = True
self.app.config["AUTH_USER_REGISTRATION_ROLE"] = "Public"
self.appbuilder = AppBuilder(self.app, self.db.session)
sm = self.appbuilder.sm

# add User role
sm.add_role("User")

# validate - no users are registered
self.assertEqual(sm.get_all_users(), [])

# attempt login
user = sm.auth_user_ldap("natalie", "natalie_password")

# validate - user was allowed to log in
self.assertIsInstance(user, sm.user_model)

# validate - user was registered
self.assertEqual(len(sm.get_all_users()), 1)

# validate - user was given the correct roles
self.assertListEqual(
user.roles,
[sm.find_role("Admin"), sm.find_role("Public"), sm.find_role("User")],
)

# validate - user was given the correct attributes (read from LDAP)
self.assertEqual(user.first_name, "Natalie")
self.assertEqual(user.last_name, "Smith")
self.assertEqual(user.email, "[email protected]")

# validate - expected LDAP methods were called
self.assertEqual(
self.ldapobj.methods_called(with_args=True),
[
self.call_initialize,
self.call_set_option,
self.call_bind_natalie,
self.call_search_natalie_memberof,
],
)

def test__direct_bind__unregistered(self):
"""
LDAP: test login flow for - direct bind - unregistered user
Expand Down

0 comments on commit 8c3989c

Please sign in to comment.