Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Review adjustements
Browse files Browse the repository at this point in the history
  • Loading branch information
PrzeG committed Oct 24, 2024
1 parent f4766a3 commit 5228622
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 85 deletions.
81 changes: 45 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,51 @@ with create_apigw_session(
```
</details>

<details>
<summary> <b>Threading</b> <i>(click to expand)</i></summary>

```python
from threading import Thread
from catalystwan.session import ManagerSession
from catalystwan.vmanage_auth import vManageAuth
from copy import copy

def print_devices(manager: ManagerSession):
# using context manager (recommended)
with manager.login() as session:
print(session.api.devices.get())

if __name__ =="__main__":

auth = vManageAuth(username="username", password="password")
manager = ManagerSession(base_url="https://url:port", auth=auth)

t1 = Thread(target=print_devices, args=(manager,))
t2 = Thread(target=print_devices, args=(copy(manager),))
t3 = Thread(target=print_devices, args=(copy(manager),))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print("Done!")
```
Threading can be achieved by using a shared auth object with sessions in each thread. As `ManagerSession` is not guaranteed to be thread-safe, it is recommended to create one session per thread. `ManagerSession` also comes in with a default `RequestLimiter`, which limits the number of concurrent requests to 50. It keeps `ManagerSession` from overloading the server and avoids HTTP 503 and HTTP 429 errors.
If you wish to modify the limit, you can pass a modified `RequestLimiter` to `ManagerSession`:
```python
from catalystwan.session import ManagerSession
from catalystwan.vmanage_auth import vManageAuth
from catalystwan.request_limiter import RequestLimiter

auth = vManageAuth(username="username", password="password")
limiter = RequestLimiter(max_requests=30)
manager = ManagerSession(base_url="https://url:port", auth=auth)
```
</details>

## API usage examples
All examples below assumes `session` variable contains logged-in [Manager Session](#Manager-Session) instance.
Expand Down Expand Up @@ -413,42 +458,6 @@ migrate_task.wait_for_completed()
```
</details>

<details>
<summary> <b>Threading</b> <i>(click to expand)</i></summary>

```python
from concurrent.futures import ThreadPoolExecutor

from catalystwan.request_limiter import RequestLimiter
from catalystwan.session import create_manager_session
from catalystwan.vmanage_auth import create_vmanage_auth


device_ids = [...]
auth = create_vmanage_auth("username", "password")
limiter = RequestLimiter(max_requests=60)


def simple_request(auth, device_id):
with create_manager_session(
url="url",
username="username",
password="password",
auth=auth,
request_lmiter=limiter
) as client:
admin_tech_file = client.api.admin_tech.generate(device_id)
return admin_tech_file

with ThreadPoolExecutor(len(device_ids)) as executor:
results = [executor.submit(simple_request, auth, device_id) for device_id in device_ids]


admin_tech_files = [result.result() for result in results]
```
Threading can be achieved by using a shared auth object with sessions in each thread. As `ManagerSession` is not guaranteed to be thread-safe, it is recommended to create one session per thread.
`RequestLimiter` puts a limit on concurrent requests. It is not required, but highly recommended. Sending too many concurrent requests will result in either HTTP 503 or HTTP 429 errors.
</details>

### Note:
To remove `InsecureRequestWarning`, you can include in your scripts (warning is suppressed when `catalystwan_devel` environment variable is set):
Expand Down
9 changes: 3 additions & 6 deletions catalystwan/abstractions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,11 @@ class AuthProtocol(Protocol):
def logout(self, client: APIEndpointClient) -> None:
...

def clear(self) -> None:
def clear(self, last_request: Optional[PreparedRequest]) -> None:
...

def clear_sync(self, last_request: Optional[PreparedRequest]) -> None:
...

def register_session(self) -> None:
def increase_session_count(self) -> None:
...

def unregister_session(self) -> None:
def decrease_session_count(self) -> None:
...
10 changes: 5 additions & 5 deletions catalystwan/apigw_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,19 @@ def get_token(
def logout(self, client: APIEndpointClient) -> None:
return None

def clear(self) -> None:
def _clear(self) -> None:
with self.lock:
self.token = ""

def register_session(self) -> None:
def increase_session_count(self) -> None:
with self.lock:
self.registered_sessions += 1

def unregister_session(self) -> None:
def decrease_session_count(self) -> None:
with self.lock:
self.registered_sessions -= 1

def clear_sync(self, last_request: Optional[PreparedRequest]) -> None:
def clear(self, last_request: Optional[PreparedRequest]) -> None:
with self.lock:
# extract previously used jsessionid
if last_request is None:
Expand All @@ -118,7 +118,7 @@ def clear_sync(self, last_request: Optional[PreparedRequest]) -> None:

if self.token == "" or f"Bearer {self.token}" == token:
# used auth was up-to-date, clear state
return self.clear()
return self._clear()
else:
# used auth was out-of-date, repeat the request with a new one
return
25 changes: 14 additions & 11 deletions catalystwan/request_limiter.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from threading import Semaphore
from typing import Callable
from __future__ import annotations

from catalystwan.response import ManagerResponse
from contextlib import AbstractContextManager
from threading import Semaphore


class RequestLimiter:
def __init__(self, max_requests: int = 60):
self.max_requests: int = max_requests
self._semaphore = Semaphore(value=self.max_requests)
class RequestLimiter(AbstractContextManager):
def __init__(self, max_requests: int = 49):
self._max_requests: int = max_requests
self._semaphore: Semaphore = Semaphore(value=self._max_requests)

def __enter__(self) -> RequestLimiter:
self._semaphore.acquire()
return self

def send_request(self, delayed_request: Callable[[], ManagerResponse]) -> ManagerResponse:
with self._semaphore:
result = delayed_request()
return result
def __exit__(self, *exc_info) -> None:
self._semaphore.release()
return
23 changes: 10 additions & 13 deletions catalystwan/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ def create_manager_session(
port: Optional[int] = None,
subdomain: Optional[str] = None,
logger: Optional[logging.Logger] = None,
auth: Optional[AuthProtocol] = None,
request_lmiter: Optional[RequestLimiter] = None,
) -> ManagerSession:
"""Factory method that creates session object and performs login according to parameters
Expand All @@ -175,14 +173,12 @@ def create_manager_session(
Returns:
ManagerSession: logged-in and operative session to perform tasks on SDWAN Manager.
"""
if auth is None:
auth = create_vmanage_auth(username, password, subdomain, logger)
auth = create_vmanage_auth(username, password, subdomain, logger)
session = ManagerSession(
base_url=create_base_url(url, port),
auth=auth,
subdomain=subdomain,
logger=logger,
request_limiter=request_lmiter,
)
session.state = ManagerSessionState.LOGIN
return session
Expand Down Expand Up @@ -250,8 +246,8 @@ def __init__(
super(ManagerSession, self).__init__()
self.verify = False
self.headers.update({"User-Agent": USER_AGENT})
self._added_to_auth = False
self._auth = auth
self._auth.register_session()
self._platform_version: str = ""
self._api_version: Version = NullVersion # type: ignore
self.restart_timeout: int = 1200
Expand All @@ -260,7 +256,7 @@ def __init__(
self._validate_responses = True
self._state: ManagerSessionState = ManagerSessionState.OPERATIVE
self._last_request: Optional[PreparedRequest] = None
self._limiter: Optional[RequestLimiter] = request_limiter
self._limiter: RequestLimiter = request_limiter or RequestLimiter()

@cached_property
def api(self) -> APIContainer:
Expand Down Expand Up @@ -327,7 +323,10 @@ def restart_imminent(self, restart_timeout_override: Optional[int] = None):

def _sync_auth(self) -> None:
self.cookies.clear_session_cookies()
self._auth.clear_sync(self._last_request)
if not self._added_to_auth:
self._auth.increase_session_count()
self._added_to_auth = True
self._auth.clear(self._last_request)
self.auth = self._auth

def _fetch_server_info(self) -> ServerInfo:
Expand Down Expand Up @@ -432,12 +431,8 @@ def request(self, method, url, *args, **kwargs) -> ManagerResponse:
if self.request_timeout is not None: # do not modify user provided kwargs unless property is set
_kwargs.update(timeout=self.request_timeout)
try:
if self._limiter is None:
with self._limiter:
response = super(ManagerSession, self).request(method, full_url, *args, **_kwargs)
else:
delayed_request = lambda: super(ManagerSession, self).request(method, full_url, *args, **_kwargs)
response = self._limiter.send_request(delayed_request)

self.logger.debug(self.response_trace(response, None))
if self.state == ManagerSessionState.RESTART_IMMINENT and response.status_code == 503:
self.state = ManagerSessionState.WAIT_SERVER_READY_AFTER_RESTART
Expand Down Expand Up @@ -532,6 +527,8 @@ def get_tenant_id(self) -> str:
return tenant.tenant_id

def logout(self) -> None:
if self._added_to_auth:
self._auth.decrease_session_count()
self._auth.logout(self)

def close(self) -> None:
Expand Down
31 changes: 17 additions & 14 deletions catalystwan/vmanage_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(self, username: str, password: str, logger: Optional[logging.Logger
self.logger = logger or logging.getLogger(__name__)
self.cookies: RequestsCookieJar = RequestsCookieJar()
self._base_url: str = ""
self.registered_sessions: int = 0
self.session_count: int = 0
self.lock: RLock = RLock()

def __str__(self) -> str:
Expand Down Expand Up @@ -112,7 +112,10 @@ def get_jsessionid(self) -> str:
response: Response = post(url=url, headers=headers, data=security_payload, verify=self.verify)
self.sync_cookies(response.cookies)
self.logger.debug(auth_response_debug(response, str(self)))
if response.status_code != 200:
print("OLABOGA")
if response.text != "" or not isinstance(self.jsessionid, str) or self.jsessionid == "":
print("NEIN!!!")
raise UnauthorizedAccessError(self.username, self.password)
return self.jsessionid

Expand All @@ -128,6 +131,7 @@ def get_xsrftoken(self) -> str:
self.sync_cookies(response.cookies)
self.logger.debug(auth_response_debug(response, str(self)))
if response.status_code != 200 or "<html>" in response.text:
print("MAMMA MIA!")
raise CatalystwanException("Failed to get XSRF token")
return response.text

Expand All @@ -138,9 +142,8 @@ def authenticate(self, request: PreparedRequest):

def logout(self, client: APIEndpointClient) -> None:
with self.lock:
if self.registered_sessions > 1:
if self.session_count > 1:
# Other sessions still use the auth, unregister and return
self.unregister_session()
return

# last session using the auth, logout
Expand All @@ -157,23 +160,23 @@ def logout(self, client: APIEndpointClient) -> None:
self.logger.debug(auth_response_debug(response, str(self)))
if response.status_code != 200:
self.logger.error("Unsuccessfull logout")
self.clear()
self.unregister_session()
self._clear()

def clear(self) -> None:
def _clear(self) -> None:
with self.lock:
self.cookies.clear_session_cookies()
self.xsrftoken = None

def register_session(self) -> None:
def increase_session_count(self) -> None:
with self.lock:
self.registered_sessions += 1
self.session_count += 1

def unregister_session(self) -> None:
def decrease_session_count(self) -> None:
with self.lock:
self.registered_sessions -= 1
self.session_count -= 1
print(f"Remaining: {self.session_count}")

def clear_sync(self, last_request: Optional[PreparedRequest]) -> None:
def clear(self, last_request: Optional[PreparedRequest]) -> None:
with self.lock:
# extract previously used jsessionid
if last_request is None:
Expand All @@ -188,7 +191,7 @@ def clear_sync(self, last_request: Optional[PreparedRequest]) -> None:

if self.jsessionid is None or self.jsessionid == jsessionid:
# used auth was up-to-date, clear state
return self.clear()
return self._clear()
else:
# used auth was out-of-date, repeat the request with a new one
return
Expand Down Expand Up @@ -251,9 +254,9 @@ def get_vsessionid(self, tenantid: str) -> str:
self.logger.debug(auth_response_debug(response, str(self)))
return response.json()["VSessionId"]

def clear(self) -> None:
def _clear(self) -> None:
with self.lock:
super().clear()
super()._clear()
self.vsessionid = None


Expand Down

0 comments on commit 5228622

Please sign in to comment.