Skip to content

Commit

Permalink
Support storing and retrieving a local token.
Browse files Browse the repository at this point in the history
Closes pulp#926
  • Loading branch information
decko committed Aug 9, 2024
1 parent ee1e4a6 commit ed6e4bf
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions pulpcore/cli/common/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing as t
from contextlib import closing
from functools import lru_cache, wraps
from pathlib import Path

import click
import requests
Expand Down Expand Up @@ -245,8 +246,38 @@ def __init__(self, *args, **kwargs):
self.flow = kwargs.get("flow")
self.token_url = self.flow["flows"]["clientCredentials"]["tokenUrl"]
self.scope = [*self.flow["flows"]["clientCredentials"]["scopes"]][0]
self.token = ""

def __call__(self, request):
self.retrieve_local_token()
if self.is_token_expired():
self.retrieve_token()

request.headers["Authorization"] = f"Bearer {self.token['access_token']}"
return request

def is_token_expired(self):
if self.token:
issued_at = datetime.datetime.fromisoformat(self.token["issued_at"])
expires_in = datetime.timedelta(seconds=self.token["expires_in"])
timedelta = issued_at + expires_in - datetime.timedelta(seconds=5)
if timedelta >= datetime.datetime.now():
return False
return True

def store_local_token(self):
TOKEN_LOCATION = (Path(click.utils.get_app_dir("pulp"), "token.json"))
with Path(TOKEN_LOCATION).open("w") as token_file:
token = json.dumps(self.token)
token_file.write(token)

def retrieve_local_token(self):
TOKEN_LOCATION = (Path(click.utils.get_app_dir("pulp"), "token.json"))
with Path(TOKEN_LOCATION).open("r") as token_file:
token_json = token_file.read()
self.token = json.loads(token_json)

def retrieve_token(self):
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
Expand All @@ -256,10 +287,9 @@ def __call__(self, request):

response: requests.Response = requests.post(self.token_url, data=data)

token = response.json() if response.status_code == 200 else None

request.headers["Authorization"] = f"Bearer {token['access_token']}"
return request
self.token = response.json() if response.status_code == 200 else None
self.token["issued_at"] = datetime.datetime.now().isoformat()
self.store_local_token()

return OAuth2AuthBase(
username=self.pulp_ctx.username, password=self.pulp_ctx.password, flow=flow
Expand Down

0 comments on commit ed6e4bf

Please sign in to comment.