Skip to content

Commit

Permalink
Add the initial support for issuing an access_token and use it as
Browse files Browse the repository at this point in the history
authentication method.

Closes pulp#926
  • Loading branch information
decko committed Jul 11, 2024
1 parent 0b73cda commit 58b63c3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
40 changes: 33 additions & 7 deletions pulp-glue/pulp_glue/common/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
SAFE_METHODS = ["GET", "HEAD", "OPTIONS"]
ISO_DATE_FORMAT = "%Y-%m-%d"
ISO_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
AUTH_PRIORITY = ("oauth2", "basic")


class OpenAPIError(Exception):
Expand Down Expand Up @@ -57,18 +58,31 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth
"""Implement this to provide means of http basic auth."""
return None

def auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
return

def __call__(
self,
security: t.List[t.Dict[str, t.List[str]]],
security_schemes: t.Dict[str, t.Dict[str, t.Any]],
) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:

authorized_schemes = []
authorized_schemes_types = set()

for proposal in security:
if [security_schemes[name] for name in proposal] == [
{"type": "http", "scheme": "basic"}
]:
result = self.basic_auth()
if result:
return result
for name in proposal:
authorized_schemes.append(security_schemes[name])
authorized_schemes_types.add(security_schemes[name]['type'])

if "oauth2" in authorized_schemes_types:
oauth_flow = [flow for flow in authorized_schemes if flow['type'] == "oauth2"][0]
result = self.auth(oauth_flow)
elif "http" in authorized_schemes_types:
result = self.basic_auth()

if result:
return result
raise OpenAPIError(_("No suitable auth scheme found."))


Expand All @@ -85,6 +99,18 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth
return (self.username, self.password)


class OAuth2AuthProvider(AuthProviderBase):

def __init__(self, username: str, password: str):
self.client_id = username
self.client_secret = password

def auth(self, oauth_payload) -> requests.auth.AuthBase:
__import__('ipdb').set_trace()
response: requests.Response = requests.get()
pass


class OpenAPI:
"""
The abstraction Layer to interact with a server providing an openapi v3 specification.
Expand Down Expand Up @@ -608,7 +634,7 @@ def render_request(
# Bad idea, but you wanted it that way.
auth = None
else:
auth = self.auth_provider(security, self.api_spec["components"]["securitySchemes"])
auth: AuthProviderBase = self.auth_provider(security, self.api_spec["components"]["securitySchemes"])
else:
# No auth required? Don't provide it.
# No auth_provider available? Hope for the best (should do the trick for cert auth).
Expand Down
31 changes: 31 additions & 0 deletions pulpcore/cli/common/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,37 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth
self.pulp_ctx.password = click.prompt("Password", hide_input=True)
return (self.pulp_ctx.username, self.pulp_ctx.password)

def auth(self, flow):
if self.pulp_ctx.username is None:
self.pulp_ctx.username = click.prompt("Username/ClientID")
if self.pulp_ctx.password is None:
self.pulp_ctx.password = click.prompt("Password/ClientSecret")

class OAuth2AuthBase(requests.auth.AuthBase):

def __init__(self, *args, **kwargs):
self.client_id = kwargs.get('username')
self.client_secret = kwargs.get('password')
self.flow = kwargs.get('flow')
self.token_url = self.flow['flows']['clientCredentials']['tokenUrl']
self.scope = [*self.flow['flows']['clientCredentials']['scopes']][0]

def __call__(self, request):
data = {'client_id': self.client_id,
'client_secret': self.client_secret,
'scope': self.scope,
'grant_type': 'client_credentials'
}

response: requests.Response = requests.post(self.token_url, data=data)

token = response.json() if response.status_code == 200 else None

request.headers['Authorization'] = f"Bearer {token['access_token']}"
return request

return OAuth2AuthBase(username=self.pulp_ctx.username, password=self.pulp_ctx.password, flow=flow)


##############################################################################
# Decorator to access certain contexts
Expand Down

0 comments on commit 58b63c3

Please sign in to comment.