-
Notifications
You must be signed in to change notification settings - Fork 32
/
oauth_client_listener.py
74 lines (63 loc) · 4.47 KB
/
oauth_client_listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python
"""
Utility Python3 script creating a OAUTH2 Client callback listener in order to exchange an Authorization Code against a Access/Refresh token.
Useful when assessing API based on OAUTH2 like PSD2 API for example.
Usage example:
Open a URL in a browser to init a Authorization Code flow with a url like:
https://authorization-server/authorize?response_type=code&client_id=xxx&scope=openid&redirect_uri=http://localhost/redirect
The flow end with a redirect on the url pointed by "redirect_uri" and then trigger the "get_tokens()" flask handler
Dependencies:
pip install requests flask
References:
https://flask.palletsprojects.com/en/2.0.x/quickstart/#a-minimal-application
Run (PowerShell):
PS> $env:FLASK_ENV = "production"
PS> $env:FLASK_APP = "oauth_client_listener"
PS> flask run --host=localhost --port=80 --no-reload --no-debugger --with-threads
"""
import requests
import json
import io
import base64
from flask import Flask, request, Response, send_file
# Constants
REQ_HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0"}
FAVICON_B64 = "iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAADV0lEQVRYhd2W70sTcRzHj6SCetIfEGTuNAj6AUVFBD2K6EnPDIIeVEql32VWaARR04h2kyJqBY5iUv7YffO2SpIyyLayWmVuO39ket8zxXS7uX0XVFDZpwfbzc1WO2tq9IX3g7sH9359ft6XYTJwiuw5e5DAhpHAEtSYuz4T39R89lWvmYsE3UcksBDTwxkFyMdMFhJ0IRWguJFtnlEAhmEYvUO3DQmsB9nZVmTPyZtxgFk5xXb2cLGgOzprAEhgh5CgG5lFgKVLSh3Z2TNm2NA/trGejG2ZLE70/6yu4LqMA2A50oUJhUTxUhhMopJK7v8PgJeoERP6NC2AL+DiRMWQcYAoRHhvOgCjGNgxLeb/HEBD3wBUP+fg4K21UHa/EM56+qcXwGAwzMFy5J4KYHFzcOnRHvUPCGUtRbEsBOzTAsBL9Fhi+i+7UBLAoTubExsRZdTcJkc2YEK/JAJY3FVJAOUt+sRe+Gzs9K/KiLlDDi/iSVievANsfYNgcZ+DktsboaylGIweMrkhe6q8owv/GgATejNqGoYa7x2wdjQALwXSLaKoOoPWvzLnZXpAjbi2+zmYnQVgdhaA1dOoDUBUoEr07/qzyKXQCkzoJxWgvrcHzM59YHYWwHXx4QSAzw+GZ41Q/qAEjjbvgLL7hXDCZYazHb0qxAdTdyB3SuZN72FBqt1f3/sW6t744s83ul9CadNWSLiQxqW3L4dTbbUqRPvF5r752qMn9ComFGz9w3CtvQbqejqAl0KxqENQ29MOlhfnk6YgJYRjBZxpfx2DCF7QZM7LkZ1q01ncVfG6m1374cqTI3DZdSD+Lh0AElg4fHc7cL4RMInKd847uv235rZBqsOERjChYO3AE+a/kBYAJLBwvPW0moUg5x1bnDrtXTAPS/RltOPdYHYWZgwACXlQ6W6JTYXixABZKeoeORet+whceVya1nxqACyU3N4EnHcoth+UimRzObwaEzqOCYUab5Mm86kCIIGFk0+s6lR85bzDyyYaj4Qr46PV1ZbUaJkC0DtWQqW7OXFLliekP6RPmnkpBLb+UU0yegbSyzsAnM8/+eq2Ow5Q/erVXF4KcZjQISzTb1iOjGsRT+i4qTOoXT7lGycq70ydSkU+xlkMwzA/AGJ0YgsiILrEAAAAAElFTkSuQmCC"
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
# Context configuration
client_cert_file = "crt.pem"
client_cert_private_key_file = "crt.key"
client_id = "xxx"
client_secret = "xxx"
redirect_uri = "http://localhost/redirect"
authorization_server_token_endpoint = "https://righettod.eu.auth0.com/oauth/token"
# Web listener
app = Flask(__name__)
@app.route("/favicon.ico")
def get_favicon():
return send_file(io.BytesIO(base64.b64decode(FAVICON_B64)), mimetype="image/jpg")
@app.route("/redirect")
def get_tokens():
content = "No valid authorization code passed!"
mtype = "text/plain"
if "error" in request.args and request.args["error"] is not None and len(request.args["error"]) > 0:
content = request.args["error"]
if "error_description" in request.args and request.args["error_description"] is not None and len(request.args["error_description"]) > 0:
content += " => " + request.args["error_description"]
elif "code" in request.args and request.args["code"] is not None and len(request.args["code"]) > 0:
authorization_code = request.args["code"]
print(f">>>> Try exchanging authorization code '{authorization_code}' against tokens...")
params = {"grant_type": "authorization_code", "client_id": client_id, "client_secret": client_secret, "redirect_uri": redirect_uri, "code": authorization_code}
if len(client_cert_file) > 0 and len(client_cert_private_key_file) > 0:
crt = (client_cert_file, client_cert_private_key_file)
else:
crt = None
response = requests.post(url=authorization_server_token_endpoint, data=params, cert=crt, verify=False, timeout=20, headers=REQ_HEADERS)
print(f"<<<< HTTP RC '{response.status_code}' received.")
if response.status_code == 200:
mtype = "application/json"
content = json.dumps(response.json(), indent=2, sort_keys=False)
else:
content = f"Token endpoint returned not OK response (HTTP {response.status_code}) with the following content:\n"
content += response.text
return Response(content, mimetype=mtype)