diff --git a/src/sentry/api/endpoints/oauth_userinfo.py b/src/sentry/api/endpoints/oauth_userinfo.py new file mode 100644 index 00000000000000..56d44ba9791096 --- /dev/null +++ b/src/sentry/api/endpoints/oauth_userinfo.py @@ -0,0 +1,50 @@ +from rest_framework import status +from rest_framework.authentication import get_authorization_header +from rest_framework.request import Request +from rest_framework.response import Response + +from sentry.api.base import Endpoint, control_silo_endpoint +from sentry.api.exceptions import ParameterValidationError, ResourceDoesNotExist, SentryAPIException +from sentry.models import ApiToken, UserEmail + + +class InsufficientScopesError(SentryAPIException): + status_code = status.HTTP_403_FORBIDDEN + code = "insufficient-scope" + message = "openid scope is required for userinfo access" + + +@control_silo_endpoint +class OAuthUserInfoEndpoint(Endpoint): + authentication_classes = () + permission_classes = () + + def get(self, request: Request) -> Response: + try: + access_token = get_authorization_header(request).split()[1].decode("utf-8") + except IndexError: + raise ParameterValidationError("Bearer token not found in authorization header") + try: + token_details = ApiToken.objects.get(token=access_token) + except ApiToken.DoesNotExist: + raise ResourceDoesNotExist("Access token not found") + + scopes = token_details.get_scopes() + if "openid" not in scopes: + raise InsufficientScopesError + + user = token_details.user + user_output = {"sub": user.id} + if "profile" in scopes: + profile_details = { + "name": user.name, + "avatar_type": user.avatar_type, + "avatar_url": user.avatar_url, + "date_joined": user.date_joined, + } + user_output.update(profile_details) + if "email" in scopes: + email = UserEmail.objects.get(user=user) + email_details = {"email": email.email, "email_verified": email.is_verified} + user_output.update(email_details) + return Response(user_output) diff --git a/src/sentry/web/urls.py b/src/sentry/web/urls.py index 64ca4d644b3f69..381af0c67a414f 100644 --- a/src/sentry/web/urls.py +++ b/src/sentry/web/urls.py @@ -8,6 +8,7 @@ from django.urls import URLPattern, URLResolver, re_path from django.views.generic import RedirectView +from sentry.api.endpoints.oauth_userinfo import OAuthUserInfoEndpoint from sentry.auth.providers.saml2.provider import SAML2AcceptACSView, SAML2MetadataView, SAML2SLSView from sentry.charts.endpoints import serve_chartcuterie_config from sentry.web import api @@ -164,6 +165,11 @@ r"^token/$", OAuthTokenView.as_view(), ), + re_path( + r"userinfo/$", + OAuthUserInfoEndpoint.as_view(), + name="sentry-api-0-oauth-userinfo", + ), ] ), ), diff --git a/tests/sentry/api/endpoints/test_oauth_userinfo.py b/tests/sentry/api/endpoints/test_oauth_userinfo.py new file mode 100644 index 00000000000000..567a42be5601eb --- /dev/null +++ b/tests/sentry/api/endpoints/test_oauth_userinfo.py @@ -0,0 +1,107 @@ +import datetime + +from django.urls import reverse +from rest_framework.test import APIClient + +from sentry.models import ApiToken +from sentry.testutils import APITestCase +from sentry.testutils.silo import control_silo_test + + +@control_silo_test(stable=True) +class OAuthUserInfoTest(APITestCase): + def setUp(self): + super().setUp() + self.login_as(self.user) + self.path = reverse( + "sentry-api-0-oauth-userinfo", + ) + self.client = APIClient() + + def test_requires_access_token(self): + response = self.client.get(self.path) + + assert response.status_code == 400 + assert response.data["detail"]["code"] == "parameter-validation-error" + assert ( + response.data["detail"]["message"] == "Bearer token not found in authorization header" + ) + + def test_declines_invalid_token(self): + self.client.credentials(HTTP_AUTHORIZATION="Bearer abcd") + response = self.client.get(self.path) + assert response.status_code == 404 + assert response.data["detail"] == "Access token not found" + + def test_declines_if_no_openid_scope(self): + token_without_openid_scope = ApiToken.objects.create(user=self.user, scope_list=[]) + self.client.credentials(HTTP_AUTHORIZATION="Bearer " + token_without_openid_scope.token) + + response = self.client.get(self.path) + + assert response.status_code == 403 + assert response.data["detail"]["code"] == "insufficient-scope" + assert response.data["detail"]["message"] == "openid scope is required for userinfo access" + + def test_gets_sub_with_openid_scope(self): + """ + Ensures we get `sub`, and only `sub`, if the only scope is openid. + """ + openid_only_token = ApiToken.objects.create(user=self.user, scope_list=["openid"]) + + self.client.credentials(HTTP_AUTHORIZATION="Bearer " + openid_only_token.token) + + response = self.client.get(self.path) + + assert response.status_code == 200 + assert response.data == {"sub": self.user.id} + + def test_gets_email_information(self): + email_token = ApiToken.objects.create(user=self.user, scope_list=["openid", "email"]) + self.client.credentials(HTTP_AUTHORIZATION="Bearer " + email_token.token) + + response = self.client.get(self.path) + + assert response.status_code == 200 + assert response.data == { + "sub": self.user.id, + "email": self.user.email, + "email_verified": True, + } + + def test_gets_profile_information(self): + profile_token = ApiToken.objects.create(user=self.user, scope_list=["openid", "profile"]) + self.client.credentials(HTTP_AUTHORIZATION="Bearer " + profile_token.token) + + response = self.client.get(self.path) + + assert response.status_code == 200 + + assert response.data["avatar_type"] == 0 + assert response.data["avatar_url"] is None + assert isinstance(response.data["date_joined"], datetime.datetime) + assert response.data["name"] == "" + assert response.data["sub"] == self.user.id + + def test_gets_multiple_scopes(self): + all_access_token = ApiToken.objects.create( + user=self.user, scope_list=["openid", "profile", "email", "address", "phone"] + ) + self.client.credentials(HTTP_AUTHORIZATION="Bearer " + all_access_token.token) + + response = self.client.get(self.path) + + assert response.status_code == 200 + + # profile information + assert response.data["avatar_type"] == 0 + assert response.data["avatar_url"] is None + assert isinstance(response.data["date_joined"], datetime.datetime) + assert response.data["name"] == "" + + # email information + assert response.data["email"] == self.user.email + assert response.data["email_verified"] + + # openid information + assert response.data["sub"] == self.user.id