From 5228622ab70c18e86ade334df650c340ec90c9e7 Mon Sep 17 00:00:00 2001 From: pgrochal Date: Thu, 24 Oct 2024 11:23:29 +0200 Subject: [PATCH] Review adjustements --- README.md | 81 +++++++++++++++++++--------------- catalystwan/abstractions.py | 9 ++-- catalystwan/apigw_auth.py | 10 ++--- catalystwan/request_limiter.py | 25 ++++++----- catalystwan/session.py | 23 +++++----- catalystwan/vmanage_auth.py | 31 +++++++------ 6 files changed, 94 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index 603eccac..839a818d 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,51 @@ with create_apigw_session( ``` +
+ Threading (click to expand) + +```python +from threading import Thread +from catalystwan.session import ManagerSession +from catalystwan.vmanage_auth import vManageAuth +from copy import copy + +def print_devices(manager: ManagerSession): + # using context manager (recommended) + with manager.login() as session: + print(session.api.devices.get()) + +if __name__ =="__main__": + + auth = vManageAuth(username="username", password="password") + manager = ManagerSession(base_url="https://url:port", auth=auth) + + t1 = Thread(target=print_devices, args=(manager,)) + t2 = Thread(target=print_devices, args=(copy(manager),)) + t3 = Thread(target=print_devices, args=(copy(manager),)) + + t1.start() + t2.start() + t3.start() + + t1.join() + t2.join() + t3.join() + + print("Done!") +``` +Threading can be achieved by using a shared auth object with sessions in each thread. As `ManagerSession` is not guaranteed to be thread-safe, it is recommended to create one session per thread. `ManagerSession` also comes in with a default `RequestLimiter`, which limits the number of concurrent requests to 50. It keeps `ManagerSession` from overloading the server and avoids HTTP 503 and HTTP 429 errors. +If you wish to modify the limit, you can pass a modified `RequestLimiter` to `ManagerSession`: +```python +from catalystwan.session import ManagerSession +from catalystwan.vmanage_auth import vManageAuth +from catalystwan.request_limiter import RequestLimiter + +auth = vManageAuth(username="username", password="password") +limiter = RequestLimiter(max_requests=30) +manager = ManagerSession(base_url="https://url:port", auth=auth) +``` +
## API usage examples All examples below assumes `session` variable contains logged-in [Manager Session](#Manager-Session) instance. @@ -413,42 +458,6 @@ migrate_task.wait_for_completed() ``` -
- Threading (click to expand) - -```python -from concurrent.futures import ThreadPoolExecutor - -from catalystwan.request_limiter import RequestLimiter -from catalystwan.session import create_manager_session -from catalystwan.vmanage_auth import create_vmanage_auth - - -device_ids = [...] -auth = create_vmanage_auth("username", "password") -limiter = RequestLimiter(max_requests=60) - - -def simple_request(auth, device_id): - with create_manager_session( - url="url", - username="username", - password="password", - auth=auth, - request_lmiter=limiter - ) as client: - admin_tech_file = client.api.admin_tech.generate(device_id) - return admin_tech_file - -with ThreadPoolExecutor(len(device_ids)) as executor: - results = [executor.submit(simple_request, auth, device_id) for device_id in device_ids] - - -admin_tech_files = [result.result() for result in results] -``` -Threading can be achieved by using a shared auth object with sessions in each thread. As `ManagerSession` is not guaranteed to be thread-safe, it is recommended to create one session per thread. -`RequestLimiter` puts a limit on concurrent requests. It is not required, but highly recommended. Sending too many concurrent requests will result in either HTTP 503 or HTTP 429 errors. -
### Note: To remove `InsecureRequestWarning`, you can include in your scripts (warning is suppressed when `catalystwan_devel` environment variable is set): diff --git a/catalystwan/abstractions.py b/catalystwan/abstractions.py index 05f2f50d..e17fdf82 100644 --- a/catalystwan/abstractions.py +++ b/catalystwan/abstractions.py @@ -67,14 +67,11 @@ class AuthProtocol(Protocol): def logout(self, client: APIEndpointClient) -> None: ... - def clear(self) -> None: + def clear(self, last_request: Optional[PreparedRequest]) -> None: ... - def clear_sync(self, last_request: Optional[PreparedRequest]) -> None: - ... - - def register_session(self) -> None: + def increase_session_count(self) -> None: ... - def unregister_session(self) -> None: + def decrease_session_count(self) -> None: ... diff --git a/catalystwan/apigw_auth.py b/catalystwan/apigw_auth.py index 6c203a4a..6afb6a41 100644 --- a/catalystwan/apigw_auth.py +++ b/catalystwan/apigw_auth.py @@ -96,19 +96,19 @@ def get_token( def logout(self, client: APIEndpointClient) -> None: return None - def clear(self) -> None: + def _clear(self) -> None: with self.lock: self.token = "" - def register_session(self) -> None: + def increase_session_count(self) -> None: with self.lock: self.registered_sessions += 1 - def unregister_session(self) -> None: + def decrease_session_count(self) -> None: with self.lock: self.registered_sessions -= 1 - def clear_sync(self, last_request: Optional[PreparedRequest]) -> None: + def clear(self, last_request: Optional[PreparedRequest]) -> None: with self.lock: # extract previously used jsessionid if last_request is None: @@ -118,7 +118,7 @@ def clear_sync(self, last_request: Optional[PreparedRequest]) -> None: if self.token == "" or f"Bearer {self.token}" == token: # used auth was up-to-date, clear state - return self.clear() + return self._clear() else: # used auth was out-of-date, repeat the request with a new one return diff --git a/catalystwan/request_limiter.py b/catalystwan/request_limiter.py index 4cf8a8d5..9f1801b2 100644 --- a/catalystwan/request_limiter.py +++ b/catalystwan/request_limiter.py @@ -1,15 +1,18 @@ -from threading import Semaphore -from typing import Callable +from __future__ import annotations -from catalystwan.response import ManagerResponse +from contextlib import AbstractContextManager +from threading import Semaphore -class RequestLimiter: - def __init__(self, max_requests: int = 60): - self.max_requests: int = max_requests - self._semaphore = Semaphore(value=self.max_requests) +class RequestLimiter(AbstractContextManager): + def __init__(self, max_requests: int = 49): + self._max_requests: int = max_requests + self._semaphore: Semaphore = Semaphore(value=self._max_requests) + + def __enter__(self) -> RequestLimiter: + self._semaphore.acquire() + return self - def send_request(self, delayed_request: Callable[[], ManagerResponse]) -> ManagerResponse: - with self._semaphore: - result = delayed_request() - return result + def __exit__(self, *exc_info) -> None: + self._semaphore.release() + return diff --git a/catalystwan/session.py b/catalystwan/session.py index 551759a6..0fdb7608 100644 --- a/catalystwan/session.py +++ b/catalystwan/session.py @@ -158,8 +158,6 @@ def create_manager_session( port: Optional[int] = None, subdomain: Optional[str] = None, logger: Optional[logging.Logger] = None, - auth: Optional[AuthProtocol] = None, - request_lmiter: Optional[RequestLimiter] = None, ) -> ManagerSession: """Factory method that creates session object and performs login according to parameters @@ -175,14 +173,12 @@ def create_manager_session( Returns: ManagerSession: logged-in and operative session to perform tasks on SDWAN Manager. """ - if auth is None: - auth = create_vmanage_auth(username, password, subdomain, logger) + auth = create_vmanage_auth(username, password, subdomain, logger) session = ManagerSession( base_url=create_base_url(url, port), auth=auth, subdomain=subdomain, logger=logger, - request_limiter=request_lmiter, ) session.state = ManagerSessionState.LOGIN return session @@ -250,8 +246,8 @@ def __init__( super(ManagerSession, self).__init__() self.verify = False self.headers.update({"User-Agent": USER_AGENT}) + self._added_to_auth = False self._auth = auth - self._auth.register_session() self._platform_version: str = "" self._api_version: Version = NullVersion # type: ignore self.restart_timeout: int = 1200 @@ -260,7 +256,7 @@ def __init__( self._validate_responses = True self._state: ManagerSessionState = ManagerSessionState.OPERATIVE self._last_request: Optional[PreparedRequest] = None - self._limiter: Optional[RequestLimiter] = request_limiter + self._limiter: RequestLimiter = request_limiter or RequestLimiter() @cached_property def api(self) -> APIContainer: @@ -327,7 +323,10 @@ def restart_imminent(self, restart_timeout_override: Optional[int] = None): def _sync_auth(self) -> None: self.cookies.clear_session_cookies() - self._auth.clear_sync(self._last_request) + if not self._added_to_auth: + self._auth.increase_session_count() + self._added_to_auth = True + self._auth.clear(self._last_request) self.auth = self._auth def _fetch_server_info(self) -> ServerInfo: @@ -432,12 +431,8 @@ def request(self, method, url, *args, **kwargs) -> ManagerResponse: if self.request_timeout is not None: # do not modify user provided kwargs unless property is set _kwargs.update(timeout=self.request_timeout) try: - if self._limiter is None: + with self._limiter: response = super(ManagerSession, self).request(method, full_url, *args, **_kwargs) - else: - delayed_request = lambda: super(ManagerSession, self).request(method, full_url, *args, **_kwargs) - response = self._limiter.send_request(delayed_request) - self.logger.debug(self.response_trace(response, None)) if self.state == ManagerSessionState.RESTART_IMMINENT and response.status_code == 503: self.state = ManagerSessionState.WAIT_SERVER_READY_AFTER_RESTART @@ -532,6 +527,8 @@ def get_tenant_id(self) -> str: return tenant.tenant_id def logout(self) -> None: + if self._added_to_auth: + self._auth.decrease_session_count() self._auth.logout(self) def close(self) -> None: diff --git a/catalystwan/vmanage_auth.py b/catalystwan/vmanage_auth.py index 6ffce5a4..ea469cf5 100644 --- a/catalystwan/vmanage_auth.py +++ b/catalystwan/vmanage_auth.py @@ -79,7 +79,7 @@ def __init__(self, username: str, password: str, logger: Optional[logging.Logger self.logger = logger or logging.getLogger(__name__) self.cookies: RequestsCookieJar = RequestsCookieJar() self._base_url: str = "" - self.registered_sessions: int = 0 + self.session_count: int = 0 self.lock: RLock = RLock() def __str__(self) -> str: @@ -112,7 +112,10 @@ def get_jsessionid(self) -> str: response: Response = post(url=url, headers=headers, data=security_payload, verify=self.verify) self.sync_cookies(response.cookies) self.logger.debug(auth_response_debug(response, str(self))) + if response.status_code != 200: + print("OLABOGA") if response.text != "" or not isinstance(self.jsessionid, str) or self.jsessionid == "": + print("NEIN!!!") raise UnauthorizedAccessError(self.username, self.password) return self.jsessionid @@ -128,6 +131,7 @@ def get_xsrftoken(self) -> str: self.sync_cookies(response.cookies) self.logger.debug(auth_response_debug(response, str(self))) if response.status_code != 200 or "" in response.text: + print("MAMMA MIA!") raise CatalystwanException("Failed to get XSRF token") return response.text @@ -138,9 +142,8 @@ def authenticate(self, request: PreparedRequest): def logout(self, client: APIEndpointClient) -> None: with self.lock: - if self.registered_sessions > 1: + if self.session_count > 1: # Other sessions still use the auth, unregister and return - self.unregister_session() return # last session using the auth, logout @@ -157,23 +160,23 @@ def logout(self, client: APIEndpointClient) -> None: self.logger.debug(auth_response_debug(response, str(self))) if response.status_code != 200: self.logger.error("Unsuccessfull logout") - self.clear() - self.unregister_session() + self._clear() - def clear(self) -> None: + def _clear(self) -> None: with self.lock: self.cookies.clear_session_cookies() self.xsrftoken = None - def register_session(self) -> None: + def increase_session_count(self) -> None: with self.lock: - self.registered_sessions += 1 + self.session_count += 1 - def unregister_session(self) -> None: + def decrease_session_count(self) -> None: with self.lock: - self.registered_sessions -= 1 + self.session_count -= 1 + print(f"Remaining: {self.session_count}") - def clear_sync(self, last_request: Optional[PreparedRequest]) -> None: + def clear(self, last_request: Optional[PreparedRequest]) -> None: with self.lock: # extract previously used jsessionid if last_request is None: @@ -188,7 +191,7 @@ def clear_sync(self, last_request: Optional[PreparedRequest]) -> None: if self.jsessionid is None or self.jsessionid == jsessionid: # used auth was up-to-date, clear state - return self.clear() + return self._clear() else: # used auth was out-of-date, repeat the request with a new one return @@ -251,9 +254,9 @@ def get_vsessionid(self, tenantid: str) -> str: self.logger.debug(auth_response_debug(response, str(self))) return response.json()["VSessionId"] - def clear(self) -> None: + def _clear(self) -> None: with self.lock: - super().clear() + super()._clear() self.vsessionid = None