Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Create ApiGwAuth class for session. Implement APIGW.
Browse files Browse the repository at this point in the history
  • Loading branch information
jpkrajewski committed Aug 1, 2024
1 parent 8f81336 commit 4288363
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 106 deletions.
6 changes: 3 additions & 3 deletions catalystwan/api/administration.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ def update(self, user_update_request: UserUpdateRequest):
"""
self._endpoints.update_user(user_update_request.username, user_update_request)

def update_password(self, username: str, new_password: str):
def update_password(self, username: str, new_password: str, current_user_password: str):
"""Updates exisiting user password
Args:
username (str): Name of the user
new_password (str): New password for given user
"""
update_password_request = UserUpdateRequest(
username=username, password=new_password, current_user_password=self.session.password
) # type: ignore
username=username, password=new_password, current_user_password=current_user_password
)
self._endpoints.update_password(username, update_password_request)

def reset(self, username: str):
Expand Down
6 changes: 2 additions & 4 deletions catalystwan/api/tenant_management_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, List

from catalystwan.api.task_status_api import Task
from catalystwan.endpoints.tenant_management import (
Expand Down Expand Up @@ -62,7 +62,7 @@ def update(self, tenant_update_request: TenantUpdateRequest) -> Tenant:
tenant_id=tenant_update_request.tenant_id, tenant_update_request=tenant_update_request
)

def delete(self, tenant_id_list: List[str], password: Optional[str] = None) -> Task:
def delete(self, tenant_id_list: List[str], password: str) -> Task:
"""Deletes tenants on vManage
Args:
Expand All @@ -72,8 +72,6 @@ def delete(self, tenant_id_list: List[str], password: Optional[str] = None) -> T
Returns:
Task: Object representing tenant deletion process
"""
if password is None:
password = self.session.password
delete_request = TenantBulkDeleteRequest(tenant_id_list=tenant_id_list, password=password)
task_id = self._endpoints.delete_tenant_async_bulk(delete_request).id
return Task(self.session, task_id)
Expand Down
66 changes: 66 additions & 0 deletions catalystwan/apigw_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
from typing import TYPE_CHECKING, Literal, Optional

from pydantic import BaseModel, Field, PositiveInt
from requests import PreparedRequest, post
from requests.auth import AuthBase
from requests.cookies import RequestsCookieJar

from catalystwan.exceptions import CatalystwanException
from catalystwan.response import ManagerResponse

if TYPE_CHECKING:
from catalystwan.session import ManagerSession

LoginMode = Literal["machine", "user", "session"]


class ApiGwLogin(BaseModel):
client_id: str
client_secret: str
org_name: str
mode: Optional[LoginMode] = None
username: Optional[str] = None
session: Optional[str] = None
tenant_user: Optional[bool] = None
token_duration: PositiveInt = Field(default=10, description="in minutes")


class ApiGwAuth(AuthBase):
def __init__(self, base_url: str, login: ApiGwLogin, verify: bool = False):
self.base_url = base_url
self.verify = verify
self.login = login
self.token = ""
self.set_cookie = RequestsCookieJar() # It is need for compatibility with ManagerSession::login method
self.logger = logging.getLogger(__name__)

def __call__(self, prepared_request: PreparedRequest) -> PreparedRequest:
if self.token == "":
self.token = self._get_token()
prepared_request.headers.update(self._prepare_header())
return prepared_request

def _get_token(self) -> str:
response = post(
url=f"{self.base_url}/apigw/login",
verify=self.verify,
json=self.login.model_dump(exclude_none=True),
timeout=10,
)
token = response.json().get("token", "")
if not token or not isinstance(token, str):
raise CatalystwanException("Failed to get bearer token")
return token

def _prepare_header(self) -> dict:
return {
"sdwan-org": self.login.org_name,
"Authorization": f"Bearer {self.token}",
}

def __str__(self):
return f"ApiGatewayAuth(base_url={self.base_url}, mode={self.login.mode})"

def logout(self, session: "ManagerSession") -> Optional[ManagerResponse]:
return None
35 changes: 35 additions & 0 deletions catalystwan/endpoints/api_gateway.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pydantic import BaseModel

from catalystwan.endpoints import APIEndpoints, delete, post


class OnBoardClient(BaseModel):
client_id: str
client_secret: str
client_name: str


class ApiGateway(APIEndpoints):
@delete("/certificate/{uuid}")
def delete_configuration(self, uuid: str) -> None:
...

@post("/apigw/config/reload")
def configuration_reload(self) -> None:
"""After launching the API Gateway, SSP can use the API
and bearer authentication header with provision access token obtained
in the step above. It reloads the configuration from S3 bucket, Secrets Manager
and reset the RDS connection pool."""
...

@post("/apigw/client/registration")
def on_board_client(self, payload: OnBoardClient) -> None:
...

@post("/certificate/vedge/list?action={action}")
def send_to_controllers(self, action: str = "push") -> None:
...

@post("/certificate/vsmart/list")
def send_to_vbond(self) -> None:
...
Loading

0 comments on commit 4288363

Please sign in to comment.