-
Notifications
You must be signed in to change notification settings - Fork 38
/
provider_configuration.py
207 lines (165 loc) · 8.08 KB
/
provider_configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import collections.abc
import logging
import requests
from oic.oic import Client
from oic.utils.settings import ClientSettings
logger = logging.getLogger(__name__)
class OIDCData(collections.abc.MutableMapping):
"""
Basic OIDC data representation providing validation of required fields.
"""
def __init__(self, *args, **kwargs):
"""
Args:
args (List[Tuple[String, String]]): key-value pairs to store
kwargs (Dict[string, string]): key-value pairs to store
"""
self.store = {}
self.update(dict(*args, **kwargs))
def __getitem__(self, key):
return self.store[key]
def __setitem__(self, key, value):
self.store[key] = value
def __delitem__(self, key):
del self.store[key]
def __iter__(self):
return iter(self.store)
def __len__(self):
return len(self.store)
def __str__(self):
data = self.store.copy()
if 'client_secret' in data:
data['client_secret'] = '<masked>'
return str(data)
def __repr__(self):
return str(self.store)
def __bool__(self):
return True
def copy(self, **kwargs):
values = self.to_dict()
values.update(kwargs)
return self.__class__(**values)
def to_dict(self):
return self.store.copy()
class ProviderMetadata(OIDCData):
def __init__(self,
issuer=None,
authorization_endpoint=None,
jwks_uri=None,
token_endpoint=None,
userinfo_endpoint=None,
introspection_endpoint=None,
registration_endpoint=None,
**kwargs):
"""OpenID Providers have metadata describing their configuration.
Parameters
----------
issuer: str, Optional
OP Issuer Identifier.
authorization_endpoint: str, Optional
URL of the OP's OAuth 2.0 Authorization Endpoint.
jwks_uri: str, Optional
URL of the OP's JSON Web Key Set [JWK] document.
token_endpoint: str, Optional
URL of the OP's OAuth 2.0 Token Endpoint.
userinfo_endpoint: str, Optional
URL of the OP's UserInfo Endpoint.
introspection_endpoint: str, Optional
URL of the OP's token introspection endpoint.
registration_endpoint: str, Optional
URL of the OP's Dynamic Client Registration Endpoint.
**kwargs : dict, Optional
Extra arguments to [OpenID Provider Metadata](https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata)
"""
super().__init__(issuer=issuer, authorization_endpoint=authorization_endpoint,
token_endpoint=token_endpoint, userinfo_endpoint=userinfo_endpoint,
jwks_uri=jwks_uri, introspection_endpoint=introspection_endpoint,
registration_endpoint=registration_endpoint, **kwargs)
class ClientRegistrationInfo(OIDCData):
pass
class ClientMetadata(OIDCData):
def __init__(self, client_id=None, client_secret=None, **kwargs):
"""
Args:
client_id (str): client identifier representing the client
client_secret (str): client secret to authenticate the client with
the OP
kwargs (dict): key-value pairs
"""
super().__init__(client_id=client_id, client_secret=client_secret, **kwargs)
class ProviderConfiguration:
"""
Metadata for communicating with an OpenID Connect Provider (OP).
Attributes:
auth_request_params (dict): Extra parameters, as key-value pairs, to include in the query parameters
of the authentication request
registered_client_metadata (ClientMetadata): The client metadata registered with the provider.
requests_session (requests.Session): Requests object to use when communicating with the provider.
session_refresh_interval_seconds (int): Number of seconds between updates of user data (tokens, user data, etc.)
fetched from the provider. If `None` is specified, no silent updates should be made user data will be made.
userinfo_endpoint_method (str): HTTP method ("GET" or "POST") to use when making the UserInfo Request. If
`None` is specified, no UserInfo Request will be made.
"""
DEFAULT_REQUEST_TIMEOUT = 5
def __init__(self,
issuer=None,
provider_metadata=None,
userinfo_http_method='GET',
client_registration_info=None,
client_metadata=None,
auth_request_params=None,
session_refresh_interval_seconds=None,
requests_session=None):
"""
Args:
issuer (str): OP Issuer Identifier. If this is specified discovery will be used to fetch the provider
metadata, otherwise `provider_metadata` must be specified.
provider_metadata (ProviderMetadata): OP metadata,
userinfo_http_method (Optional[str]): HTTP method (GET or POST) to use when sending the UserInfo Request.
If `none` is specified, no userinfo request will be sent.
client_registration_info (ClientRegistrationInfo): Client metadata to register your app
dynamically with the provider. Either this or `registered_client_metadata` must be specified.
client_metadata (ClientMetadata): Client metadata if your app is statically
registered with the provider. Either this or `client_registration_info` must be specified.
auth_request_params (dict): Extra parameters that should be included in the authentication request.
session_refresh_interval_seconds (int): Length of interval (in seconds) between attempted user data
refreshes.
requests_session (requests.Session): custom requests object to allow for example retry handling, etc.
"""
if not issuer and not provider_metadata:
raise ValueError("Specify either 'issuer' or 'provider_metadata'.")
if not client_registration_info and not client_metadata:
raise ValueError("Specify either 'client_registration_info' or 'client_metadata'.")
self._issuer = issuer
self._provider_metadata = provider_metadata
self._client_registration_info = client_registration_info
self._client_metadata = client_metadata
self.userinfo_endpoint_method = userinfo_http_method
self.auth_request_params = auth_request_params or {}
self.session_refresh_interval_seconds = session_refresh_interval_seconds
# For session persistence
self.client_settings = ClientSettings(timeout=self.DEFAULT_REQUEST_TIMEOUT,
requests_session=requests_session or requests.Session())
def ensure_provider_metadata(self, client: Client):
if not self._provider_metadata:
resp = client.provider_config(self._issuer)
logger.debug(f'Received discovery response: {resp.to_dict()}')
self._provider_metadata = ProviderMetadata(**resp.to_dict())
return self._provider_metadata
@property
def registered_client_metadata(self):
return self._client_metadata
def register_client(self, client: Client):
if not self._client_metadata:
if not self._provider_metadata['registration_endpoint']:
raise ValueError("Can't use dynamic client registration, provider metadata is missing "
"'registration_endpoint'.")
registration_request = self._client_registration_info.to_dict()
# Send request to register the client dynamically.
registration_response = client.register(
url=self._provider_metadata['registration_endpoint'],
**registration_request)
logger.info('Received registration response.')
self._client_metadata = ClientMetadata(
**registration_response.to_dict())
return self._client_metadata