From 2b02c1c7791875438130a372df2bff965e033d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20=22decko=22=20de=20Brito?= Date: Mon, 22 Jul 2024 16:29:15 -0300 Subject: [PATCH] Support storing and retrieving a local token. Closes #1013 --- pulpcore/cli/common/generic.py | 38 ++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/pulpcore/cli/common/generic.py b/pulpcore/cli/common/generic.py index fa708fac9..1c5582c8c 100644 --- a/pulpcore/cli/common/generic.py +++ b/pulpcore/cli/common/generic.py @@ -5,6 +5,7 @@ import typing as t from contextlib import closing from functools import lru_cache, wraps +from pathlib import Path import click import requests @@ -245,8 +246,38 @@ def __init__(self, *args, **kwargs): self.flow = kwargs.get("flow") self.token_url = self.flow["flows"]["clientCredentials"]["tokenUrl"] self.scope = [*self.flow["flows"]["clientCredentials"]["scopes"]][0] + self.token = "" def __call__(self, request): + self.retrieve_local_token() + if self.is_token_expired(): + self.retrieve_token() + + request.headers["Authorization"] = f"Bearer {self.token['access_token']}" + return request + + def is_token_expired(self): + if self.token: + issued_at = datetime.datetime.fromisoformat(self.token["issued_at"]) + expires_in = datetime.timedelta(seconds=self.token["expires_in"]) + timedelta = issued_at + expires_in - datetime.timedelta(seconds=5) + if timedelta >= datetime.datetime.now(): + return False + return True + + def store_local_token(self): + TOKEN_LOCATION = (Path(click.utils.get_app_dir("pulp"), "token.json")) + with Path(TOKEN_LOCATION).open("w") as token_file: + token = json.dumps(self.token) + token_file.write(token) + + def retrieve_local_token(self): + TOKEN_LOCATION = (Path(click.utils.get_app_dir("pulp"), "token.json")) + with Path(TOKEN_LOCATION).open("r") as token_file: + token_json = token_file.read() + self.token = json.loads(token_json) + + def retrieve_token(self): data = { "client_id": self.client_id, "client_secret": self.client_secret, @@ -256,10 +287,9 @@ def __call__(self, request): response: requests.Response = requests.post(self.token_url, data=data) - token = response.json() if response.status_code == 200 else None - - request.headers["Authorization"] = f"Bearer {token['access_token']}" - return request + self.token = response.json() if response.status_code == 200 else None + self.token["issued_at"] = datetime.datetime.now().isoformat() + self.store_local_token() return OAuth2AuthBase( username=self.pulp_ctx.username, password=self.pulp_ctx.password, flow=flow