This repository has been archived by the owner on Sep 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 94
/
website_sample.py
141 lines (124 loc) · 5.29 KB
/
website_sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
try:
from http import server as httpserver
from http import cookies as Cookie
except ImportError:
import SimpleHTTPServer as httpserver
import Cookie as Cookie
try:
import socketserver
except ImportError:
import SocketServer as socketserver
try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
import json
import os
import random
import string
import sys
from adal import AuthenticationContext
# You can provide account information by using a JSON file. Either
# through a command line argument, 'python sample.py parameters.json', or
# specifying in an environment variable of ADAL_SAMPLE_PARAMETERS_FILE.
#
# The information inside such file can be obtained via app registration.
# See https://github.com/AzureAD/azure-activedirectory-library-for-python/wiki/Register-your-application-with-Azure-Active-Directory
#
# {
# "resource": "your_resource",
# "tenant" : "rrandallaad1.onmicrosoft.com",
# "authorityHostUrl" : "https://login.microsoftonline.com",
# "clientId" : "624ac9bd-4c1c-4687-aec8-b56a8991cfb3",
# "clientSecret" : "verySecret=""
# }
parameters_file = (sys.argv[1] if len(sys.argv) == 2 else
os.environ.get('ADAL_SAMPLE_PARAMETERS_FILE'))
if parameters_file:
with open(parameters_file, 'r') as f:
parameters = f.read()
sample_parameters = json.loads(parameters)
else:
raise ValueError('Please provide parameter file with account information.')
PORT = 8088
TEMPLATE_AUTHZ_URL = ('https://login.microsoftonline.com/{}/oauth2/authorize?'+
'response_type=code&client_id={}&redirect_uri={}&'+
'state={}&resource={}')
GRAPH_RESOURCE = '00000002-0000-0000-c000-000000000000'
RESOURCE = sample_parameters.get('resource', GRAPH_RESOURCE)
REDIRECT_URI = 'http://localhost:{}/getAToken'.format(PORT)
authority_url = (sample_parameters['authorityHostUrl'] + '/' +
sample_parameters['tenant'])
class OAuth2RequestHandler(httpserver.SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(307)
login_url = 'http://localhost:{}/login'.format(PORT)
self.send_header('Location', login_url)
self.end_headers()
elif self.path == '/login':
auth_state = (''.join(random.SystemRandom()
.choice(string.ascii_uppercase + string.digits)
for _ in range(48)))
cookie = Cookie.SimpleCookie()
cookie['auth_state'] = auth_state
authorization_url = TEMPLATE_AUTHZ_URL.format(
sample_parameters['tenant'],
sample_parameters['clientId'],
REDIRECT_URI,
auth_state,
RESOURCE)
self.send_response(307)
self.send_header('Set-Cookie', cookie.output(header=''))
self.send_header('Location', authorization_url)
self.end_headers()
elif self.path.startswith('/getAToken'):
is_ok = True
try:
token_response = self._acquire_token()
message = 'response: ' + json.dumps(token_response)
#Later, if the access token is expired it can be refreshed.
auth_context = AuthenticationContext(authority_url)
token_response = auth_context.acquire_token_with_refresh_token(
token_response['refreshToken'],
sample_parameters['clientId'],
RESOURCE,
sample_parameters['clientSecret'])
message = (message + '*** And here is the refresh response:' +
json.dumps(token_response))
except ValueError as exp:
message = str(exp)
is_ok = False
self._send_response(message, is_ok)
def _acquire_token(self):
parsed = urlparse(self.path)
code = parse_qs(parsed.query)['code'][0]
state = parse_qs(parsed.query)['state'][0]
cookie = Cookie.SimpleCookie(self.headers["Cookie"])
if state != cookie['auth_state'].value:
raise ValueError('state does not match')
### Main logic begins
auth_context = AuthenticationContext(authority_url)
return auth_context.acquire_token_with_authorization_code(
code,
REDIRECT_URI,
RESOURCE,
sample_parameters['clientId'],
sample_parameters['clientSecret'])
### Main logic ends
def _send_response(self, message, is_ok=True):
self.send_response(200 if is_ok else 400)
self.send_header('Content-type', 'text/html')
self.end_headers()
if is_ok:
#todo, pretty format token response in json
message_template = ('<html><head><title>Succeeded</title></head>'
'<body><p>{}</p></body></html>')
else:
message_template = ('<html><head><title>Failed</title></head>'
'<body><p>{}</p></body></html>')
output = message_template.format(message)
self.wfile.write(output.encode())
httpd = socketserver.TCPServer(('', PORT), OAuth2RequestHandler)
print('serving at port', PORT)
httpd.serve_forever()