-
-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(saml): Secure SP initiated SSO, disable IdP initiated SSO
- Loading branch information
Showing
9 changed files
with
174 additions
and
88 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from unittest.mock import Mock, patch | ||
from urllib.parse import parse_qs, urlparse | ||
|
||
from django.conf import settings | ||
from django.urls import reverse, reverse_lazy | ||
from django.utils.http import urlencode | ||
|
||
|
@@ -11,43 +12,66 @@ | |
from allauth.socialaccount.adapter import get_adapter | ||
from allauth.socialaccount.internal import statekit | ||
from allauth.socialaccount.models import SocialAccount | ||
from allauth.socialaccount.providers.base.constants import AuthProcess | ||
from allauth.socialaccount.providers.saml.utils import build_saml_config | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"is_connect,state_kwargs,relay_state, expected_url", | ||
"idp_initiated,adv_settings,state_kwargs,relay_state, expected_url", | ||
[ | ||
(False, None, None, "/accounts/profile/"), | ||
(False, None, "/foo", "/foo"), | ||
(True, {"process": "connect"}, None, reverse_lazy("socialaccount_connections")), | ||
(True, {"process": "connect", "next_url": "/conn"}, None, "/conn"), | ||
(False, {}, {}, "/not/here", settings.LOGIN_REDIRECT_URL), | ||
(False, {}, {"next": "/here"}, "/not/here", "/here"), | ||
( | ||
False, | ||
{}, | ||
{"process": "connect"}, | ||
"/not/here", | ||
reverse_lazy("socialaccount_connections"), | ||
), | ||
(False, {}, {"process": "connect", "next": "/here"}, "/not/here", "/here"), | ||
(True, {"reject_idp_initiated_sso": False}, {}, "/set-by-idp", "/set-by-idp"), | ||
( | ||
True, | ||
{"reject_idp_initiated_sso": False}, | ||
{}, | ||
"not-a-url", | ||
settings.LOGIN_REDIRECT_URL, | ||
), | ||
(True, {}, {}, "/set-by-idp", "/set-by-idp"), | ||
], | ||
) | ||
def test_acs( | ||
request, | ||
is_connect, | ||
idp_initiated, | ||
db, | ||
saml_settings, | ||
acs_saml_response, | ||
acs_saml_response_factory, | ||
mocked_signature_validation, | ||
expected_url, | ||
relay_state, | ||
state_kwargs, | ||
sociallogin_setup_state, | ||
adv_settings, | ||
settings, | ||
): | ||
provider_settings = settings.SOCIALACCOUNT_PROVIDERS["saml"]["APPS"][0]["settings"] | ||
advanced = dict(provider_settings["advanced"]) | ||
advanced.update(adv_settings) | ||
provider_settings["advanced"] = advanced | ||
process = state_kwargs.setdefault("process", AuthProcess.LOGIN) | ||
is_connect = process == AuthProcess.CONNECT | ||
if is_connect: | ||
client = request.getfixturevalue("auth_client") | ||
user = request.getfixturevalue("user") | ||
else: | ||
client = request.getfixturevalue("client") | ||
user = None | ||
|
||
if state_kwargs: | ||
assert not relay_state | ||
state_id = None | ||
if not idp_initiated: | ||
state_id = sociallogin_setup_state(client, **state_kwargs) | ||
relay_state = urlencode({"state": state_id}) | ||
|
||
data = {"SAMLResponse": acs_saml_response} | ||
data = {"SAMLResponse": acs_saml_response_factory(in_response_to=state_id)} | ||
if relay_state is not None: | ||
data["RelayState"] = relay_state | ||
resp = client.post( | ||
|
@@ -57,28 +81,37 @@ def test_acs( | |
assert resp.status_code == 302 | ||
assert resp["location"] == finish_url | ||
resp = client.get(finish_url) | ||
assert resp["location"] == expected_url | ||
account = SocialAccount.objects.get( | ||
provider="urn:dev-123.us.auth0.com", uid="dummysamluid" | ||
) | ||
assert account.extra_data["Role"] == ["view-profile", "manage-account-links"] | ||
email = EmailAddress.objects.get(user=account.user) | ||
assert email.email == (user.email if is_connect else "[email protected]") | ||
if idp_initiated and advanced.get("reject_idp_initiated_sso", True): | ||
assert "socialaccount/authentication_error.html" in ( | ||
t.name for t in resp.templates | ||
) | ||
else: | ||
assert resp["location"] == expected_url | ||
account = SocialAccount.objects.get( | ||
provider="urn:dev-123.us.auth0.com", uid="dummysamluid" | ||
) | ||
assert account.extra_data["Role"] == ["view-profile", "manage-account-links"] | ||
email = EmailAddress.objects.get(user=account.user) | ||
assert email.email == (user.email if is_connect else "[email protected]") | ||
|
||
|
||
def test_acs_error(client, db, saml_settings): | ||
data = {"SAMLResponse": "bad-response"} | ||
resp = client.post( | ||
reverse("saml_acs", kwargs={"organization_slug": "org"}), data=data | ||
) | ||
assert resp.status_code == 200 | ||
assert resp.status_code == 302 | ||
resp = client.get(resp["location"]) | ||
assert "socialaccount/authentication_error.html" in (t.name for t in resp.templates) | ||
|
||
|
||
def test_acs_get(client, db, saml_settings): | ||
"""ACS expects POST""" | ||
"""WHile ACS expects POST, it always redirects and handles the request in | ||
the FinishACSView. | ||
""" | ||
resp = client.get(reverse("saml_acs", kwargs={"organization_slug": "org"})) | ||
assert resp.status_code == 200 | ||
assert resp.status_code == 302 | ||
resp = client.get(resp["location"]) | ||
assert "socialaccount/authentication_error.html" in (t.name for t in resp.templates) | ||
|
||
|
||
|
@@ -103,8 +136,11 @@ def test_login(client, db, saml_settings): | |
location = resp["location"] | ||
assert location.startswith("https://dev-123.us.auth0.com/samlp/456?SAMLRequest=") | ||
resp_query = parse_qs(urlparse(location).query) | ||
relay_state = resp_query.get("RelayState")[0] | ||
state_id = parse_qs(relay_state)["state"][0] | ||
# We're not using RelayState | ||
assert resp_query.get("RelayState") is None | ||
# We're using the request ID / InResponseTo for tracking state. | ||
state_id = list(client.session[statekit.STATES_SESSION_KEY].keys())[0] | ||
assert state_id.startswith("ONELOGIN_") | ||
state = client.session[statekit.STATES_SESSION_KEY][state_id][0] | ||
assert state == {"process": "connect", "data": None, "next": "/foo"} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.