From 97449ef54fb38c516a6fc4f4767bf8befd9b4e45 Mon Sep 17 00:00:00 2001 From: Deborah Kaplan Date: Mon, 28 Oct 2024 16:34:15 -0400 Subject: [PATCH] feat: add more authentication information to swagger (#35674) * feat: add more authentication information to swagger * updates the `docs-settings` to make the generated swagger `securityDefinitions` include both JWT and CSRF methods, as well as basic. A few linter fixes happened as a side effect. * Put in wordier descriptions for all three, since we don't have great shared documentation about authn/authz. * Added CSRF to `login_session`, which also serves as a proof of concept for other endpoits * Also regenerated the swagger doc, which picked up some extra changes. Generated swagger now has help and allows extra auth methods so some preveiously unusable endpoints can be hit. FIXES: APER-3554 --- docs/docs_settings.py | 75 +++- docs/lms-openapi.yaml | 244 +++++++++-- .../core/djangoapps/user_authn/views/login.py | 393 ++++++++++-------- 3 files changed, 489 insertions(+), 223 deletions(-) diff --git a/docs/docs_settings.py b/docs/docs_settings.py index f12848876e8a..f791b2faafb9 100644 --- a/docs/docs_settings.py +++ b/docs/docs_settings.py @@ -4,7 +4,7 @@ import all the Studio code. """ - +from textwrap import dedent import os from openedx.core.lib.derived import derive_settings @@ -27,18 +27,71 @@ FEATURES[key] = True # Settings that will fail if we enable them, and we don't need them for docs anyway. -FEATURES['RUN_AS_ANALYTICS_SERVER_ENABLED'] = False -FEATURES['ENABLE_SOFTWARE_SECURE_FAKE'] = False -FEATURES['ENABLE_MKTG_SITE'] = False +FEATURES["RUN_AS_ANALYTICS_SERVER_ENABLED"] = False +FEATURES["ENABLE_SOFTWARE_SECURE_FAKE"] = False +FEATURES["ENABLE_MKTG_SITE"] = False + +INSTALLED_APPS.extend( + [ + "cms.djangoapps.contentstore.apps.ContentstoreConfig", + "cms.djangoapps.course_creators", + "cms.djangoapps.xblock_config.apps.XBlockConfig", + "lms.djangoapps.lti_provider", + ] +) + +# Swagger generation details +openapi_security_info_basic = ( + "Obtain with a `POST` request to `/user/v1/account/login_session/`. " + "If needed, copy the cookies from the response to your new call." +) +openapi_security_info_jwt = dedent( + """ + Obtain by making a `POST` request to `/oauth2/v1/access_token`. + + You will need to be logged in and have a client ID and secret already created. -INSTALLED_APPS.extend([ - 'cms.djangoapps.contentstore.apps.ContentstoreConfig', - 'cms.djangoapps.course_creators', - 'cms.djangoapps.xblock_config.apps.XBlockConfig', - 'lms.djangoapps.lti_provider', -]) + Your request should have the headers + + ``` + 'Content-Type': 'application/x-www-form-urlencoded' + ``` + + Your request should have the data payload + + ``` + 'grant_type': 'client_credentials' + 'client_id': [your client ID] + 'client_secret': [your client secret] + 'token_type': 'jwt' + ``` + + Your JWT will be returned in the response as `access_token`. Prefix with `JWT ` in your header. + """ +) +openapi_security_info_csrf = ( + "Obtain by making a `GET` request to `/csrf/api/v1/token`. The token will be in the response cookie `csrftoken`." +) +SWAGGER_SETTINGS["SECURITY_DEFINITIONS"] = { + "Basic": { + "type": "basic", + "description": openapi_security_info_basic, + }, + "jwt": { + "type": "apiKey", + "name": "Authorization", + "in": "header", + "description": openapi_security_info_jwt, + }, + "csrf": { + "type": "apiKey", + "name": "X-CSRFToken", + "in": "header", + "description": openapi_security_info_csrf, + }, +} -COMMON_TEST_DATA_ROOT = '' +COMMON_TEST_DATA_ROOT = "" derive_settings(__name__) diff --git a/docs/lms-openapi.yaml b/docs/lms-openapi.yaml index 5e9afcc6d370..8011f84b4573 100644 --- a/docs/lms-openapi.yaml +++ b/docs/lms-openapi.yaml @@ -13,8 +13,44 @@ produces: securityDefinitions: Basic: type: basic + description: Obtain with a `POST` request to `/user/v1/account/login_session/`. If + needed, copy the cookies from the response to your new call. + jwt: + type: apiKey + name: Authorization + in: header + description: |2 + + Obtain by making a `POST` request to `/oauth2/v1/access_token`. + + You will need to be logged in and have a client ID and secret already created. + + Your request should have the headers + + ``` + 'Content-Type': 'application/x-www-form-urlencoded' + ``` + + Your request should have the data payload + + ``` + 'grant_type': 'client_credentials' + 'client_id': [your client ID] + 'client_secret': [your client secret] + 'token_type': 'jwt' + ``` + + Your JWT will be returned in the response as `access_token`. Prefix with `JWT ` in your header. + csrf: + type: apiKey + name: X-CSRFToken + in: header + description: Obtain by making a `GET` request to `/csrf/api/v1/token`. The token + will be in the response cookie `csrftoken`. security: - Basic: [] +- csrf: [] +- jwt: [] paths: /agreements/v1/integrity_signature/{course_id}: get: @@ -3975,6 +4011,7 @@ paths: "profile_name": "Jon Doe" "verification_attempt_id": (Optional) "proctored_exam_attempt_id": (Optional) + "platform_verification_attempt_id": (Optional) "status": (Optional) } parameters: @@ -4130,6 +4167,7 @@ paths: "profile_name": "Jon Doe" "verification_attempt_id": (Optional) "proctored_exam_attempt_id": (Optional) + "platform_verification_attempt_id": (Optional) "status": (Optional) } parameters: @@ -6788,6 +6826,59 @@ paths: in: path required: true type: string + /mobile/{api_version}/notifications/create-token/: + post: + operationId: mobile_notifications_create-token_create + summary: |- + **Use Case** + This endpoint allows clients to register a device for push notifications. + description: |- + If the device is already registered, the existing registration will be updated. + If setting PUSH_NOTIFICATIONS_SETTINGS is not configured, the endpoint will return a 501 error. + + **Example Request** + POST /api/mobile/{version}/notifications/create-token/ + **POST Parameters** + The body of the POST request can include the following parameters. + * name (optional) - A name of the device. + * registration_id (required) - The device token of the device. + * device_id (optional) - ANDROID_ID / TelephonyManager.getDeviceId() (always as hex) + * active (optional) - Whether the device is active, default is True. + If False, the device will not receive notifications. + * cloud_message_type (required) - You should choose FCM or GCM. Currently, only FCM is supported. + * application_id (optional) - Opaque application identity, should be filled in for multiple + key/certificate access. Should be equal settings.FCM_APP_NAME. + **Example Response** + ```json + { + "id": 1, + "name": "My Device", + "registration_id": "fj3j4", + "device_id": 1234, + "active": true, + "date_created": "2024-04-18T07:39:37.132787Z", + "cloud_message_type": "FCM", + "application_id": "my_app_id" + } + ``` + parameters: + - name: data + in: body + required: true + schema: + $ref: '#/definitions/GCMDevice' + responses: + '201': + description: '' + schema: + $ref: '#/definitions/GCMDevice' + tags: + - mobile + parameters: + - name: api_version + in: path + required: true + type: string /mobile/{api_version}/users/{username}: get: operationId: mobile_users_read @@ -8849,22 +8940,6 @@ paths: tags: - user parameters: [] - /user/v1/accounts/verifications/{attempt_id}/: - get: - operationId: user_v1_accounts_verifications_read - description: Get IDV attempt details by attempt_id. Only accessible by global - staff. - parameters: [] - responses: - '200': - description: '' - tags: - - user - parameters: - - name: attempt_id - in: path - required: true - type: string /user/v1/accounts/{username}: get: operationId: user_v1_accounts_read @@ -9423,22 +9498,57 @@ paths: - user post: operationId: user_account_login_session_create - summary: Log in a user. - description: |- - See `login_user` for details. - - Example Usage: - - POST /api/user/v1/login_session - with POST params `email`, `password`. - - 200 {'success': true} - parameters: [] + summary: POST /user/{api_version}/account/login_session/ + description: Returns 200 on success, and a detailed error message otherwise. + parameters: + - name: data + in: body + required: true + schema: + type: object + properties: + email: + type: string + password: + type: string responses: - '201': + '200': + description: '' + schema: + type: object + properties: + success: + type: boolean + value: + type: string + error_code: + type: string + '400': + description: '' + schema: + type: object + properties: + success: + type: boolean + value: + type: string + error_code: + type: string + '403': description: '' + schema: + type: object + properties: + success: + type: boolean + value: + type: string + error_code: + type: string tags: - user + security: + - csrf: [] parameters: - name: api_version in: path @@ -10047,6 +10157,7 @@ definitions: required: - celebrations - course_access + - studio_access - course_id - is_enrolled - is_self_paced @@ -10084,6 +10195,9 @@ definitions: additionalProperties: type: string x-nullable: true + studio_access: + title: Studio access + type: boolean course_id: title: Course id type: string @@ -11237,10 +11351,24 @@ definitions: title: Verification attempt id type: integer x-nullable: true + verification_attempt_status: + title: Verification attempt status + type: string + minLength: 1 + x-nullable: true proctored_exam_attempt_id: title: Proctored exam attempt id type: integer x-nullable: true + platform_verification_attempt_id: + title: Platform verification attempt id + type: integer + x-nullable: true + platform_verification_attempt_status: + title: Platform verification attempt status + type: string + minLength: 1 + x-nullable: true status: title: Status type: string @@ -11277,10 +11405,24 @@ definitions: title: Verification attempt id type: integer x-nullable: true + verification_attempt_status: + title: Verification attempt status + type: string + minLength: 1 + x-nullable: true proctored_exam_attempt_id: title: Proctored exam attempt id type: integer x-nullable: true + platform_verification_attempt_id: + title: Platform verification attempt id + type: integer + x-nullable: true + platform_verification_attempt_status: + title: Platform verification attempt status + type: string + minLength: 1 + x-nullable: true status: title: Status type: string @@ -11710,6 +11852,52 @@ definitions: title: Enddatetime type: string format: date-time + GCMDevice: + required: + - registration_id + type: object + properties: + id: + title: ID + type: integer + name: + title: Name + type: string + maxLength: 255 + x-nullable: true + registration_id: + title: Registration ID + type: string + minLength: 1 + device_id: + title: Device id + description: 'ANDROID_ID / TelephonyManager.getDeviceId() (e.g: 0x01)' + type: integer + x-nullable: true + active: + title: Is active + description: Inactive devices will not be sent notifications + type: boolean + date_created: + title: Creation date + type: string + format: date-time + readOnly: true + x-nullable: true + cloud_message_type: + title: Cloud Message Type + description: You should choose FCM, GCM is deprecated + type: string + enum: + - FCM + - GCM + application_id: + title: Application ID + description: Opaque application identity, should be filled in for multiple + key/certificate access + type: string + maxLength: 64 + x-nullable: true mobile_api.User: required: - username diff --git a/openedx/core/djangoapps/user_authn/views/login.py b/openedx/core/djangoapps/user_authn/views/login.py index 2ad3c0ff18a0..6c7390406be1 100644 --- a/openedx/core/djangoapps/user_authn/views/login.py +++ b/openedx/core/djangoapps/user_authn/views/login.py @@ -23,11 +23,14 @@ from django.views.decorators.debug import sensitive_post_parameters from django.views.decorators.http import require_http_methods from django_ratelimit.decorators import ratelimit +from drf_yasg import openapi +from drf_yasg.utils import swagger_auto_schema from edx_django_utils.monitoring import set_custom_attribute from eventtracking import tracker from openedx_events.learning.data import UserData, UserPersonalData from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED from openedx_filters.learning.filters import StudentLoginRequested +from rest_framework import status from rest_framework.views import APIView from common.djangoapps import third_party_auth @@ -49,7 +52,7 @@ from openedx.core.djangoapps.user_authn.tasks import check_pwned_password_and_send_track_event from openedx.core.djangoapps.user_authn.toggles import ( is_require_third_party_auth_enabled, - should_redirect_to_authn_microfrontend + should_redirect_to_authn_microfrontend, ) from openedx.core.djangoapps.user_authn.views.login_form import get_login_session_form from openedx.core.djangoapps.user_authn.views.password_reset import send_password_reset_email_for_user @@ -62,7 +65,7 @@ log = logging.getLogger("edx.student") AUDIT_LOG = logging.getLogger("audit") USER_MODEL = get_user_model() -PASSWORD_RESET_INITIATED = 'edx.user.passwordreset.initiated' +PASSWORD_RESET_INITIATED = "edx.user.passwordreset.initiated" def _do_third_party_auth(request): @@ -70,9 +73,9 @@ def _do_third_party_auth(request): User is already authenticated via 3rd party, now try to find and return their associated Django user. """ running_pipeline = pipeline.get(request) - username = running_pipeline['kwargs'].get('username') - backend_name = running_pipeline['backend'] - third_party_uid = running_pipeline['kwargs']['uid'] + username = running_pipeline["kwargs"].get("username") + backend_name = running_pipeline["backend"] + third_party_uid = running_pipeline["kwargs"]["uid"] requested_provider = provider.Registry.get_from_pipeline(running_pipeline) platform_name = configuration_helpers.get_value("platform_name", settings.PLATFORM_NAME) @@ -81,26 +84,25 @@ def _do_third_party_auth(request): except USER_MODEL.DoesNotExist: AUDIT_LOG.info( "Login failed - user with username {username} has no social auth " - "with backend_name {backend_name}".format( - username=username, backend_name=backend_name) + "with backend_name {backend_name}".format(username=username, backend_name=backend_name) ) - message = Text(_( - "You've successfully signed in to your {provider_name} account, " - "but this account isn't linked with your {platform_name} account yet. {blank_lines}" - "Use your {platform_name} username and password to sign in to {platform_name} below, " - "and then link your {platform_name} account with {provider_name} from your dashboard. {blank_lines}" - "If you don't have an account on {platform_name} yet, " - "click {register_label_strong} at the top of the page." - )).format( - blank_lines=HTML('

'), + message = Text( + _( + "You've successfully signed in to your {provider_name} account, " + "but this account isn't linked with your {platform_name} account yet. {blank_lines}" + "Use your {platform_name} username and password to sign in to {platform_name} below, " + "and then link your {platform_name} account with {provider_name} from your dashboard. {blank_lines}" + "If you don't have an account on {platform_name} yet, " + "click {register_label_strong} at the top of the page." + ) + ).format( + blank_lines=HTML("

"), platform_name=platform_name, provider_name=requested_provider.name, - register_label_strong=HTML('{register_text}').format( - register_text=_('Register') - ) + register_label_strong=HTML("{register_text}").format(register_text=_("Register")), ) - raise AuthFailedError(message, error_code='third-party-auth-with-no-linked-account') # lint-amnesty, pylint: disable=raise-missing-from + raise AuthFailedError(message, error_code="third-party-auth-with-no-linked-account") # lint-amnesty, pylint: disable=raise-missing-from def _get_user_by_email(email): @@ -128,14 +130,14 @@ def _get_user_by_email_or_username(request, api_version): Finds a user object in the database based on the given request, ignores all fields except for email and username. """ is_api_v2 = api_version != API_V1 - login_fields = ['email', 'password'] + login_fields = ["email", "password"] if is_api_v2: - login_fields = ['email_or_username', 'password'] + login_fields = ["email_or_username", "password"] if any(f not in request.POST.keys() for f in login_fields): - raise AuthFailedError(_('There was an error receiving your login information. Please email us.')) + raise AuthFailedError(_("There was an error receiving your login information. Please email us.")) - email_or_username = request.POST.get('email', None) or request.POST.get('email_or_username', None) + email_or_username = request.POST.get("email", None) or request.POST.get("email_or_username", None) user = _get_user_by_email(email_or_username) if not user and is_api_v2: @@ -143,7 +145,7 @@ def _get_user_by_email_or_username(request, api_version): user = _get_user_by_username(email_or_username) if not user: - digest = hashlib.shake_128(email_or_username.encode('utf-8')).hexdigest(16) + digest = hashlib.shake_128(email_or_username.encode("utf-8")).hexdigest(16) AUDIT_LOG.warning(f"Login failed - Unknown user email or username {digest}") return user @@ -165,27 +167,30 @@ def _generate_locked_out_error_message(): """ locked_out_period_in_sec = settings.MAX_FAILED_LOGIN_ATTEMPTS_LOCKOUT_PERIOD_SECS - error_message = Text(_('To protect your account, it’s been temporarily ' - 'locked. Try again in {locked_out_period} minutes.' - '{li_start}To be on the safe side, you can reset your ' - 'password {link_start}here{link_end} before you try again.')).format( - link_start=HTML(''), - link_end=HTML(''), - li_start=HTML('
  • '), - li_end=HTML('
  • '), - locked_out_period=int(locked_out_period_in_sec / 60)) + error_message = Text( + _( + "To protect your account, it’s been temporarily " + "locked. Try again in {locked_out_period} minutes." + "{li_start}To be on the safe side, you can reset your " + "password {link_start}here{link_end} before you try again." + ) + ).format( + link_start=HTML(''), + link_end=HTML(""), + li_start=HTML("
  • "), + li_end=HTML("
  • "), + locked_out_period=int(locked_out_period_in_sec / 60), + ) raise AuthFailedError( error_message, - error_code='account-locked-out', - context={ - 'locked_out_period': int(locked_out_period_in_sec / 60) - } + error_code="account-locked-out", + context={"locked_out_period": int(locked_out_period_in_sec / 60)}, ) def _enforce_password_policy_compliance(request, user): # lint-amnesty, pylint: disable=missing-function-docstring try: - password_policy_compliance.enforce_compliance_on_login(user, request.POST.get('password')) + password_policy_compliance.enforce_compliance_on_login(user, request.POST.get("password")) except password_policy_compliance.NonCompliantPasswordWarning as e: # Allow login, but warn the user that they will be required to reset their password soon. PageLevelMessages.register_warning_message(request, HTML(str(e))) @@ -201,7 +206,7 @@ def _enforce_password_policy_compliance(request, user): # lint-amnesty, pylint: { "user_id": user.id, "source": "Policy Compliance", - } + }, ) send_password_reset_email_for_user(user, request) @@ -214,19 +219,17 @@ def _log_and_raise_inactive_user_auth_error(unauthenticated_user): Depending on Django version we can get here a couple of ways, but this takes care of logging an auth attempt by an inactive user, re-sending the activation email, and raising an error with the correct message. """ - AUDIT_LOG.warning( - f"Login failed - Account not active for user.id: {unauthenticated_user.id}, resending activation" - ) + AUDIT_LOG.warning(f"Login failed - Account not active for user.id: {unauthenticated_user.id}, resending activation") profile = UserProfile.objects.get(user=unauthenticated_user) compose_and_send_activation_email(unauthenticated_user, profile) raise AuthFailedError( - error_code='inactive-user', + error_code="inactive-user", context={ - 'platformName': configuration_helpers.get_value('PLATFORM_NAME', settings.PLATFORM_NAME), - 'supportLink': configuration_helpers.get_value('SUPPORT_SITE_LINK', settings.SUPPORT_SITE_LINK) - } + "platformName": configuration_helpers.get_value("PLATFORM_NAME", settings.PLATFORM_NAME), + "supportLink": configuration_helpers.get_value("SUPPORT_SITE_LINK", settings.SUPPORT_SITE_LINK), + }, ) @@ -234,9 +237,11 @@ def _authenticate_first_party(request, unauthenticated_user, third_party_auth_re """ Use Django authentication on the given request, using rate limiting if configured """ - should_be_rate_limited = getattr(request, 'limited', False) + should_be_rate_limited = getattr(request, "limited", False) if should_be_rate_limited: - raise AuthFailedError(_('Too many failed login attempts. Try again later.')) # lint-amnesty, pylint: disable=raise-missing-from + raise AuthFailedError( + _("Too many failed login attempts. Try again later.") + ) # lint-amnesty, pylint: disable=raise-missing-from # If the user doesn't exist, we want to set the username to an invalid username so that authentication is guaranteed # to fail and we can take advantage of the ratelimited backend @@ -248,12 +253,8 @@ def _authenticate_first_party(request, unauthenticated_user, third_party_auth_re if not third_party_auth_requested: _check_user_auth_flow(request.site, unauthenticated_user) - password = normalize_password(request.POST['password']) - return authenticate( - username=username, - password=password, - request=request - ) + password = normalize_password(request.POST["password"]) + return authenticate(username=username, password=password, request=request) def _handle_failed_authentication(user, authenticated_user): @@ -279,34 +280,37 @@ def _handle_failed_authentication(user, authenticated_user): if not LoginFailures.is_user_locked_out(user): max_failures_allowed = settings.MAX_FAILED_LOGIN_ATTEMPTS_ALLOWED remaining_attempts = max_failures_allowed - failure_count - error_message = Text(_('Email or password is incorrect.' - '{li_start}You have {remaining_attempts} more sign-in ' - 'attempts before your account is temporarily locked.{li_end}' - '{li_start}If you\'ve forgotten your password, click ' - '{link_start}here{link_end} to reset.{li_end}')).format( - link_start=HTML( - '' - ), - link_end=HTML(''), - li_start=HTML('
  • '), - li_end=HTML('
  • '), - remaining_attempts=remaining_attempts) + error_message = Text( + _( + "Email or password is incorrect." + "{li_start}You have {remaining_attempts} more sign-in " + "attempts before your account is temporarily locked.{li_end}" + "{li_start}If you've forgotten your password, click " + "{link_start}here{link_end} to reset.{li_end}" + ) + ).format( + link_start=HTML(''), + link_end=HTML(""), + li_start=HTML("
  • "), + li_end=HTML("
  • "), + remaining_attempts=remaining_attempts, + ) raise AuthFailedError( error_message, - error_code='failed-login-attempt', + error_code="failed-login-attempt", context={ - 'remaining_attempts': remaining_attempts, - 'allowed_failure_attempts': max_failures_allowed, - 'failure_count': failure_count, - } + "remaining_attempts": remaining_attempts, + "allowed_failure_attempts": max_failures_allowed, + "failure_count": failure_count, + }, ) _generate_locked_out_error_message() raise AuthFailedError( - _('Email or password is incorrect.'), - error_code='incorrect-email-or-password', - context={'failure_count': failure_count}, + _("Email or password is incorrect."), + error_code="incorrect-email-or-password", + context={"failure_count": failure_count}, ) @@ -352,25 +356,18 @@ def _track_user_login(user, request): # .. pii_retirement: third_party segment.identify( user.id, - { - 'email': user.email, - 'username': user.username - }, + {"email": user.email, "username": user.username}, { # Disable MailChimp because we don't want to update the user's email # and username in MailChimp on every page load. We only need to capture # this data on registration/activation. - 'MailChimp': False - } + "MailChimp": False + }, ) segment.track( user.id, "edx.bi.user.account.authenticated", - { - 'category': "conversion", - 'label': request.POST.get('course_id'), - 'provider': None - }, + {"category": "conversion", "label": request.POST.get("course_id"), "provider": None}, ) @@ -380,20 +377,22 @@ def _create_message(site, root_url, allowed_domain): to an allowed domain and not whitelisted then ask such users to login through allowed domain SSO provider. """ - msg = Text(_( - 'As {allowed_domain} user, You must login with your {allowed_domain} ' - '{link_start}{provider} account{link_end}.' - )).format( + msg = Text( + _( + "As {allowed_domain} user, You must login with your {allowed_domain} " + "{link_start}{provider} account{link_end}." + ) + ).format( allowed_domain=allowed_domain, link_start=HTML("").format( - root_url=root_url if root_url else '', - tpa_provider_link='{dashboard_url}?tpa_hint={tpa_hint}'.format( - dashboard_url=reverse('dashboard'), - tpa_hint=site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_HINT'), - ) + root_url=root_url if root_url else "", + tpa_provider_link="{dashboard_url}?tpa_hint={tpa_hint}".format( + dashboard_url=reverse("dashboard"), + tpa_hint=site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_HINT"), + ), ), - provider=site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_PROVIDER'), - link_end=HTML("") + provider=site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_PROVIDER"), + link_end=HTML(""), ) return msg @@ -404,13 +403,13 @@ def _check_user_auth_flow(site, user): then ask user to login through allowed domain SSO provider. """ if user and ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY.is_enabled(): - allowed_domain = site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_DOMAIN', '').lower() - email_parts = user.email.split('@') + allowed_domain = site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_DOMAIN", "").lower() + email_parts = user.email.split("@") if len(email_parts) != 2: # User has a nonstandard email so we record their id. # we don't record their e-mail in case there is sensitive info accidentally # in there. - set_custom_attribute('login_tpa_domain_shortcircuit_user_id', user.id) + set_custom_attribute("login_tpa_domain_shortcircuit_user_id", user.id) log.warning("User %s has nonstandard e-mail. Shortcircuiting THIRD_PART_AUTH_ONLY_DOMAIN check.", user.id) return user_domain = email_parts[1].strip().lower() @@ -422,19 +421,19 @@ def _check_user_auth_flow(site, user): raise AuthFailedError(msg) raise AuthFailedError( - error_code='allowed-domain-login-error', + error_code="allowed-domain-login-error", context={ - 'allowed_domain': allowed_domain, - 'provider': site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_PROVIDER'), - 'tpa_hint': site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_HINT'), - } + "allowed_domain": allowed_domain, + "provider": site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_PROVIDER"), + "tpa_hint": site.configuration.get_value("THIRD_PARTY_AUTH_ONLY_HINT"), + }, ) @login_required -@require_http_methods(['GET']) +@require_http_methods(["GET"]) def finish_auth(request): - """ Following logistration (1st or 3rd party), handle any special query string params. + """Following logistration (1st or 3rd party), handle any special query string params. See FinishAuthView.js for details on the query string params. @@ -459,10 +458,13 @@ def finish_auth(request): GET /account/finish_auth/?course_id=course-v1:blah&enrollment_action=enroll """ - return render_to_response('student_account/finish_auth.html', { - 'disable_courseware_js': True, - 'disable_footer': True, - }) + return render_to_response( + "student_account/finish_auth.html", + { + "disable_courseware_js": True, + "disable_footer": True, + }, + ) def enterprise_selection_page(request, user, next_url): @@ -478,14 +480,14 @@ def enterprise_selection_page(request, user, next_url): response = get_enterprise_learner_data_from_api(user) if response and len(response) > 1: - redirect_url = reverse('enterprise_select_active') + '/?success_url=' + urllib.parse.quote(next_url) + redirect_url = reverse("enterprise_select_active") + "/?success_url=" + urllib.parse.quote(next_url) # Check to see if next url has an enterprise in it. In this case if user is associated with # that enterprise, activate that enterprise and bypass the selection page. if re.match(ENTERPRISE_ENROLLMENT_URL_REGEX, urllib.parse.unquote(next_url)): enterprise_in_url = re.search(UUID4_REGEX, next_url).group(0) for enterprise in response: - if enterprise_in_url == str(enterprise['enterprise_customer']['uuid']): + if enterprise_in_url == str(enterprise["enterprise_customer"]["uuid"]): is_activated_successfully = activate_learner_enterprise(request, user, enterprise_in_url) if is_activated_successfully: redirect_url = next_url @@ -495,20 +497,20 @@ def enterprise_selection_page(request, user, next_url): @ensure_csrf_cookie -@require_http_methods(['POST']) +@require_http_methods(["POST"]) @ratelimit( - key='openedx.core.djangoapps.util.ratelimit.request_post_email_or_username', + key="openedx.core.djangoapps.util.ratelimit.request_post_email_or_username", rate=settings.LOGISTRATION_PER_EMAIL_RATELIMIT_RATE, - method='POST', + method="POST", block=False, ) # lint-amnesty, pylint: disable=too-many-statements @ratelimit( - key='openedx.core.djangoapps.util.ratelimit.real_ip', + key="openedx.core.djangoapps.util.ratelimit.real_ip", rate=settings.LOGISTRATION_RATELIMIT_RATE, - method='POST', + method="POST", block=False, ) # lint-amnesty, pylint: disable=too-many-statements -def login_user(request, api_version='v1'): # pylint: disable=too-many-statements +def login_user(request, api_version="v1"): # pylint: disable=too-many-statements """ AJAX request to log in the user. @@ -542,10 +544,10 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement _parse_analytics_param_for_course_id(request) third_party_auth_requested = third_party_auth.is_enabled() and pipeline.running(request) - first_party_auth_requested = any(bool(request.POST.get(p)) for p in ['email', 'email_or_username', 'password']) + first_party_auth_requested = any(bool(request.POST.get(p)) for p in ["email", "email_or_username", "password"]) is_user_third_party_authenticated = False - set_custom_attribute('login_user_course_id', request.POST.get('course_id')) + set_custom_attribute("login_user_course_id", request.POST.get("course_id")) if is_require_third_party_auth_enabled() and not third_party_auth_requested: return HttpResponseForbidden( @@ -564,12 +566,12 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement try: user = _do_third_party_auth(request) is_user_third_party_authenticated = True - set_custom_attribute('login_user_tpa_success', True) + set_custom_attribute("login_user_tpa_success", True) except AuthFailedError as e: - set_custom_attribute('login_user_tpa_success', False) - set_custom_attribute('login_user_tpa_failure_msg', e.value) + set_custom_attribute("login_user_tpa_success", False) + set_custom_attribute("login_user_tpa_failure_msg", e.value) if e.error_code: - set_custom_attribute('login_error_code', e.error_code) + set_custom_attribute("login_error_code", e.error_code) # user successfully authenticated with a third party provider, but has no linked Open edX account response_content = e.get_response() @@ -585,7 +587,10 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement possibly_authenticated_user = StudentLoginRequested.run_filter(user=possibly_authenticated_user) except StudentLoginRequested.PreventLogin as exc: raise AuthFailedError( - str(exc), redirect_url=exc.redirect_to, error_code=exc.error_code, context=exc.context, + str(exc), + redirect_url=exc.redirect_to, + error_code=exc.error_code, + context=exc.context, ) from exc if not is_user_third_party_authenticated: @@ -599,82 +604,82 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement ): _handle_failed_authentication(user, possibly_authenticated_user) - pwned_properties = check_pwned_password_and_send_track_event( - user_id=user.id, - password=request.POST.get('password'), - internal_user=user.is_staff, - request_page='login' - ) if not is_user_third_party_authenticated else {} + pwned_properties = ( + check_pwned_password_and_send_track_event( + user_id=user.id, + password=request.POST.get("password"), + internal_user=user.is_staff, + request_page="login", + ) + if not is_user_third_party_authenticated + else {} + ) # Set default for third party login - password_frequency = pwned_properties.get('frequency', -1) + password_frequency = pwned_properties.get("frequency", -1) if ( - settings.ENABLE_AUTHN_LOGIN_BLOCK_HIBP_POLICY and - password_frequency >= settings.HIBP_LOGIN_BLOCK_PASSWORD_FREQUENCY_THRESHOLD + settings.ENABLE_AUTHN_LOGIN_BLOCK_HIBP_POLICY + and password_frequency >= settings.HIBP_LOGIN_BLOCK_PASSWORD_FREQUENCY_THRESHOLD ): - raise VulnerablePasswordError( - accounts.AUTHN_LOGIN_BLOCK_HIBP_POLICY_MSG, - 'require-password-change' - ) + raise VulnerablePasswordError(accounts.AUTHN_LOGIN_BLOCK_HIBP_POLICY_MSG, "require-password-change") _handle_successful_authentication_and_login(possibly_authenticated_user, request) # The AJAX method calling should know the default destination upon success - redirect_url, finish_auth_url = None, '' + redirect_url, finish_auth_url = None, "" if third_party_auth_requested: running_pipeline = pipeline.get(request) - finish_auth_url = pipeline.get_complete_url(backend_name=running_pipeline['backend']) + finish_auth_url = pipeline.get_complete_url(backend_name=running_pipeline["backend"]) if is_user_third_party_authenticated: redirect_url = finish_auth_url elif should_redirect_to_authn_microfrontend(): next_url, root_url = get_next_url_for_login_page(request, include_host=True) redirect_url = get_redirect_url_with_host( - root_url, - enterprise_selection_page(request, possibly_authenticated_user, finish_auth_url or next_url) + root_url, enterprise_selection_page(request, possibly_authenticated_user, finish_auth_url or next_url) ) if ( - settings.ENABLE_AUTHN_LOGIN_NUDGE_HIBP_POLICY and - 0 <= password_frequency <= settings.HIBP_LOGIN_NUDGE_PASSWORD_FREQUENCY_THRESHOLD + settings.ENABLE_AUTHN_LOGIN_NUDGE_HIBP_POLICY + and 0 <= password_frequency <= settings.HIBP_LOGIN_NUDGE_PASSWORD_FREQUENCY_THRESHOLD ): raise VulnerablePasswordError( - accounts.AUTHN_LOGIN_NUDGE_HIBP_POLICY_MSG, - 'nudge-password-change', - redirect_url + accounts.AUTHN_LOGIN_NUDGE_HIBP_POLICY_MSG, "nudge-password-change", redirect_url ) - response = JsonResponse({ - 'success': True, - 'redirect_url': redirect_url, - }) + response = JsonResponse( + { + "success": True, + "redirect_url": redirect_url, + } + ) # Ensure that the external marketing site can # detect that the user is logged in. response = set_logged_in_cookies(request, response, possibly_authenticated_user) - set_custom_attribute('login_user_auth_failed_error', False) - set_custom_attribute('login_user_response_status', response.status_code) - set_custom_attribute('login_user_redirect_url', redirect_url) + set_custom_attribute("login_user_auth_failed_error", False) + set_custom_attribute("login_user_response_status", response.status_code) + set_custom_attribute("login_user_redirect_url", redirect_url) mark_user_change_as_expected(user.id) return response except AuthFailedError as error: response_content = error.get_response() log.exception(response_content) - error_code = response_content.get('error_code') + error_code = response_content.get("error_code") if error_code: - set_custom_attribute('login_error_code', error_code) - email_or_username_key = 'email' if api_version == API_V1 else 'email_or_username' + set_custom_attribute("login_error_code", error_code) + email_or_username_key = "email" if api_version == API_V1 else "email_or_username" email_or_username = request.POST.get(email_or_username_key, None) email_or_username = possibly_authenticated_user.email if possibly_authenticated_user else email_or_username - response_content['email'] = email_or_username + response_content["email"] = email_or_username except VulnerablePasswordError as error: response_content = error.get_response() log.exception(response_content) response = JsonResponse(response_content, status=400) - set_custom_attribute('login_user_auth_failed_error', True) - set_custom_attribute('login_user_response_status', response.status_code) + set_custom_attribute("login_user_auth_failed_error", True) + set_custom_attribute("login_user_response_status", response.status_code) return response @@ -683,10 +688,10 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement # to get a CSRF token before we need to refresh adds too much # complexity. @csrf_exempt -@require_http_methods(['POST']) +@require_http_methods(["POST"]) def login_refresh(request): # lint-amnesty, pylint: disable=missing-function-docstring if not request.user.is_authenticated or request.user.is_anonymous: - return JsonResponse('Unauthorized', status=401) + return JsonResponse("Unauthorized", status=401) try: return get_response_with_refreshed_jwt_cookies(request, request.user) @@ -700,33 +705,57 @@ def redirect_to_lms_login(request): This view redirect the admin/login url to the site's login page if waffle switch is on otherwise returns the admin site's login view. """ - return redirect('/login?next=/admin') + return redirect("/login?next=/admin") + + +login_user_schema = openapi.Schema( + type=openapi.TYPE_OBJECT, + properties={ + "email": openapi.Schema(type=openapi.TYPE_STRING), + "password": openapi.Schema(type=openapi.TYPE_STRING), + }, +) + +login_user_return_schema = openapi.Schema( + type=openapi.TYPE_OBJECT, + properties={ + "success": openapi.Schema(type=openapi.TYPE_BOOLEAN), + "value": openapi.Schema(type=openapi.TYPE_STRING), + "error_code": openapi.Schema(type=openapi.TYPE_STRING), + }, +) class LoginSessionView(APIView): - """HTTP end-points for logging in users. """ + """HTTP end-points for logging in users.""" # This end-point is available to anonymous users, # so do not require authentication. authentication_classes = [] + login_user_responses = { + status.HTTP_200_OK: login_user_return_schema, + status.HTTP_400_BAD_REQUEST: login_user_return_schema, + status.HTTP_403_FORBIDDEN: login_user_return_schema, + } + @method_decorator(ensure_csrf_cookie) def get(self, request, *args, **kwargs): return HttpResponse(get_login_session_form(request).to_json(), content_type="application/json") # lint-amnesty, pylint: disable=http-response-with-content-type-json + @swagger_auto_schema( + request_body=login_user_schema, + responses=login_user_responses, + security=[ + {"csrf": []}, + ], + ) @method_decorator(csrf_protect) def post(self, request, api_version): - """Log in a user. - - See `login_user` for details. - - Example Usage: - - POST /api/user/v1/login_session - with POST params `email`, `password`. - - 200 {'success': true} + """ + POST /user/{api_version}/account/login_session/ + Returns 200 on success, and a detailed error message otherwise. """ return login_user(request, api_version) @@ -736,19 +765,19 @@ def dispatch(self, request, *args, **kwargs): def _parse_analytics_param_for_course_id(request): - """ If analytics request param is found, parse and add course id as a new request param. """ + """If analytics request param is found, parse and add course id as a new request param.""" # Make a copy of the current POST request to modify. modified_request = request.POST.copy() if isinstance(request, HttpRequest): # Works for an HttpRequest but not a rest_framework.request.Request. # Note: This case seems to be used for tests only. request.POST = modified_request - set_custom_attribute('login_user_request_type', 'django') + set_custom_attribute("login_user_request_type", "django") else: # The request must be a rest_framework.request.Request. # Note: Only DRF seems to be used in Production. request._data = modified_request # pylint: disable=protected-access - set_custom_attribute('login_user_request_type', 'drf') + set_custom_attribute("login_user_request_type", "drf") # Include the course ID if it's specified in the analytics info # so it can be included in analytics events. @@ -758,9 +787,5 @@ def _parse_analytics_param_for_course_id(request): if "enroll_course_id" in analytics: modified_request["course_id"] = analytics.get("enroll_course_id") except (ValueError, TypeError): - set_custom_attribute('shim_analytics_course_id', 'parse-error') - log.error( - "Could not parse analytics object sent to user API: {analytics}".format( - analytics=analytics - ) - ) + set_custom_attribute("shim_analytics_course_id", "parse-error") + log.error("Could not parse analytics object sent to user API: {analytics}".format(analytics=analytics))