Skip to content

Commit

Permalink
improve accessibility saml2 urls
Browse files Browse the repository at this point in the history
  • Loading branch information
nilupulmanodya committed Oct 29, 2023
1 parent eefc67c commit 23b5446
Showing 1 changed file with 102 additions and 94 deletions.
196 changes: 102 additions & 94 deletions mslib/mscolab/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,14 +738,16 @@ def reset_request():
@APP.route("/metadata/<idp_identity_name>", methods=['GET'])
def metadata(idp_identity_name):
"""Return the SAML metadata XML for the requested IDP"""
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if idp_identity_name == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
metadata_string = create_metadata_string(
None, sp_config.config, 4, None, None, None, None, None
).decode("utf-8")
return Response(metadata_string, mimetype="text/xml")
return render_template('errors/404.html'), 404
if mscolab_settings.USE_SAML2:
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if idp_identity_name == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
metadata_string = create_metadata_string(
None, sp_config.config, 4, None, None, None, None, None
).decode("utf-8")
return Response(metadata_string, mimetype="text/xml")
return render_template('errors/404.html'), 404
return render_template('errors/403.html'), 403


@APP.route('/available_idps/', methods=['GET'])
Expand All @@ -764,28 +766,30 @@ def available_idps():
@APP.route("/idp_login/", methods=['POST'])
def idp_login():
"""Handle the login process for the user by selected IDP"""
selected_idp = request.form.get('selectedIdentityProvider')
sp_config = None
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
break
if mscolab_settings.USE_SAML2:
selected_idp = request.form.get('selectedIdentityProvider')
sp_config = None
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
if selected_idp == idp_config['idp_identity_name']:
sp_config = idp_config['idp_data']['saml2client']
break

try:
_, response_binding = sp_config.config.getattr("endpoints", "sp")[
"assertion_consumer_service"
][0]
entity_id = get_idp_entity_id(selected_idp)
_, binding, http_args = sp_config.prepare_for_negotiated_authenticate(
entityid=entity_id,
response_binding=response_binding,
)
if binding == BINDING_HTTP_REDIRECT:
headers = dict(http_args["headers"])
return redirect(str(headers["Location"]), code=303)
return Response(http_args["data"], headers=http_args["headers"])
except (NameError, AttributeError):
return render_template('errors/403.html'), 403
try:
_, response_binding = sp_config.config.getattr("endpoints", "sp")[
"assertion_consumer_service"
][0]
entity_id = get_idp_entity_id(selected_idp)
_, binding, http_args = sp_config.prepare_for_negotiated_authenticate(
entityid=entity_id,
response_binding=response_binding,
)
if binding == BINDING_HTTP_REDIRECT:
headers = dict(http_args["headers"])
return redirect(str(headers["Location"]), code=303)
return Response(http_args["data"], headers=http_args["headers"])
except (NameError, AttributeError):
return render_template('errors/403.html'), 403
return render_template('errors/403.html'), 403


@APP.route('/<path:url>', methods=['POST'])
Expand All @@ -794,84 +798,88 @@ def acs_post_handler(url):
Function to handle unknown POST requests,
Implemented to Handle the SAML authentication response received via POST request from configured IDPs.
"""
try:
# implementation for handle configured saml assertion consumer endpoints
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
# Check if the requested URL exists in the assertion_consumer_endpoints dictionary
url_with_slash = '/' + url
url_exists_with_slash = url_with_slash in idp_config['idp_data']['assertion_consumer_endpoints']
url_exists_without_slash = url in idp_config['idp_data']['assertion_consumer_endpoints']
if url_exists_without_slash or url_exists_with_slash:
outstanding_queries = {}
binding = BINDING_HTTP_POST
authn_response = idp_config['idp_data']['saml2client'].parse_authn_request_response(
request.form["SAMLResponse"], binding, outstanding=outstanding_queries
)
email = None
username = None

try:
email = authn_response.ava["email"][0]
username = authn_response.ava["givenName"][0]
token = generate_confirmation_token(email)
except (NameError, AttributeError, KeyError):
if mscolab_settings.USE_SAML2:
try:
# implementation for handle configured saml assertion consumer endpoints
for idp_config in setup_saml2_backend.CONFIGURED_IDPS:
# Check if the requested URL exists in the assertion_consumer_endpoints dictionary
url_with_slash = '/' + url
url_exists_with_slash = url_with_slash in idp_config['idp_data']['assertion_consumer_endpoints']
url_exists_without_slash = url in idp_config['idp_data']['assertion_consumer_endpoints']
if url_exists_without_slash or url_exists_with_slash:
outstanding_queries = {}
binding = BINDING_HTTP_POST
authn_response = idp_config['idp_data']['saml2client'].parse_authn_request_response(
request.form["SAMLResponse"], binding, outstanding=outstanding_queries
)
email = None
username = None

try:
# Initialize an empty dictionary to store attribute values
attributes = {}

# Loop through attribute statements
for attribute_statement in authn_response.assertion.attribute_statement:
for attribute in attribute_statement.attribute:
attribute_name = attribute.name
attribute_value = \
attribute.attribute_value[0].text if attribute.attribute_value else None
attributes[attribute_name] = attribute_value

# Extract the email and givenname attributes
email = attributes["email"]
username = attributes["givenName"]
email = authn_response.ava["email"][0]
username = authn_response.ava["givenName"][0]
token = generate_confirmation_token(email)

except (NameError, AttributeError, KeyError):
render_template('errors/403.html'), 403

if email is not None and username is not None:
idp_user_db_state = create_or_update_idp_user(email, username, token,
idp_config['idp_identity_name'])
if idp_user_db_state:
return render_template('idp/idp_login_success.html', token=token), 200
try:
# Initialize an empty dictionary to store attribute values
attributes = {}

# Loop through attribute statements
for attribute_statement in authn_response.assertion.attribute_statement:
for attribute in attribute_statement.attribute:
attribute_name = attribute.name
attribute_value = \
attribute.attribute_value[0].text if attribute.attribute_value else None
attributes[attribute_name] = attribute_value

# Extract the email and givenname attributes
email = attributes["email"]
username = attributes["givenName"]
token = generate_confirmation_token(email)

except (NameError, AttributeError, KeyError):
render_template('errors/403.html'), 403

if email is not None and username is not None:
idp_user_db_state = create_or_update_idp_user(email, username, token,
idp_config['idp_identity_name'])
if idp_user_db_state:
return render_template('idp/idp_login_success.html', token=token), 200
else:
return render_template('errors/500.html'), 500
else:
return render_template('errors/500.html'), 500
else:
return render_template('errors/500.html'), 500
except (NameError, AttributeError, KeyError):
return render_template('errors/403.html'), 403
except (NameError, AttributeError, KeyError):
return render_template('errors/403.html'), 403
return render_template('errors/403.html'), 403


@APP.route('/idp_login_auth/', methods=['POST'])
def idp_login_auth():
"""Handle the SAML authentication validation of client application."""
try:
data = request.get_json()
token = data.get('token')
email = confirm_token(token, expiration=1200)
if email:
user = check_login(email, token)
if user:
random_token = secrets.token_hex(16)
user.hash_password(random_token)
db.session.add(user)
db.session.commit()
return json.dumps({
"success": True,
'token': random_token,
'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid}
})
if mscolab_settings.USE_SAML2:
try:
data = request.get_json()
token = data.get('token')
email = confirm_token(token, expiration=1200)
if email:
user = check_login(email, token)
if user:
random_token = secrets.token_hex(16)
user.hash_password(random_token)
db.session.add(user)
db.session.commit()
return json.dumps({
"success": True,
'token': random_token,
'user': {'username': user.username, 'id': user.id, 'emailid': user.emailid}
})
return jsonify({"success": False}), 401
return jsonify({"success": False}), 401
return jsonify({"success": False}), 401
except TypeError:
return jsonify({"success": False}), 401
except TypeError:
return jsonify({"success": False}), 401
return render_template('errors/403.html'), 403


def start_server(app, sockio, cm, fm, port=8083):
Expand Down

0 comments on commit 23b5446

Please sign in to comment.