-
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds minimal infrastructure for http, httpApiKey, and oauth2 security…
… schemes.
- Loading branch information
1 parent
1646805
commit 5e44d44
Showing
10 changed files
with
656 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from typing import Any, Mapping | ||
|
||
JSONMappingValue = Any | ||
JSONMapping = Mapping[str, JSONMappingValue] | ||
JSONSchema = JSONMapping |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from importlib import import_module | ||
from typing import Callable | ||
|
||
|
||
def load_handler(handler_id: str) -> Callable: | ||
*module_path_elements, object_name = handler_id.split(".") | ||
module = import_module(".".join(module_path_elements)) | ||
|
||
return getattr(module, object_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from typing import Sequence | ||
|
||
from asynction.common_types import JSONMapping | ||
|
||
from .exceptions import UnregisteredSecurityScheme | ||
from .exceptions import UnsupportedSecurityScheme | ||
from .types import SecurityRequirement | ||
from .types import SecurityScheme | ||
from .types import SecuritySchemesType | ||
from .validation import security_handler_factory | ||
|
||
|
||
def _resolve_security_scheme( | ||
security: Sequence[JSONMapping], schemes: JSONMapping | ||
) -> Sequence[JSONMapping]: | ||
new_security = [] | ||
for item in security: | ||
for scheme_name, scopes in item.items(): | ||
if scheme_name not in schemes: | ||
raise UnregisteredSecurityScheme | ||
scheme = schemes[scheme_name] | ||
new_security.append(dict(name=scheme_name, scopes=scopes, scheme=scheme)) | ||
|
||
return new_security | ||
|
||
|
||
def _resolve_server_security_schemes( | ||
raw_spec: JSONMapping, schemes: JSONMapping | ||
) -> JSONMapping: | ||
for name, server in raw_spec.get("servers", {}).items(): | ||
if "security" in server: | ||
server["security"] = ( | ||
_resolve_security_scheme(server["security"], schemes) or None | ||
) | ||
|
||
return raw_spec | ||
|
||
|
||
def resolve_security_schemes(raw_spec: JSONMapping) -> JSONMapping: | ||
schemes = raw_spec.get("components", {}).get("securitySchemes", {}) | ||
if not schemes: | ||
return raw_spec | ||
raw_spec = _resolve_server_security_schemes(raw_spec, schemes) | ||
|
||
return raw_spec | ||
|
||
|
||
__all__ = [ | ||
"SecurityRequirement", | ||
"SecurityScheme", | ||
"SecuritySchemesType", | ||
"security_handler_factory", | ||
"resolve_security_schemes", | ||
"UnregisteredSecurityScheme", | ||
"UnsupportedSecurityScheme", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from asynction.exceptions import AsynctionException | ||
|
||
|
||
class SecurityException(AsynctionException): | ||
""" | ||
Base Security Exception type. | ||
""" | ||
pass | ||
|
||
|
||
class UnregisteredSecurityScheme(SecurityException): | ||
""" | ||
Raised when a security scheme not listed in the securitySchemes section of the | ||
spec is used in a ``security`` or ``x-security`` specification | ||
""" | ||
pass | ||
|
||
|
||
class UnsupportedSecurityScheme(SecurityException): | ||
""" | ||
Raised when a specified security scheme is not supported by asynction | ||
""" | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
from dataclasses import dataclass | ||
from enum import Enum | ||
from typing import Mapping, Optional, Sequence, Type | ||
|
||
from svarog import register_forge | ||
from svarog.types import Forge | ||
|
||
from asynction.common_types import JSONMapping | ||
from .exceptions import UnsupportedSecurityScheme | ||
|
||
|
||
class HTTPSecuritySchemeType(Enum): | ||
BASIC = "basic" | ||
DIGEST = "digest" | ||
BEARER = "bearer" | ||
|
||
|
||
class OAuth2FlowType(Enum): | ||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#oauthFlowsObject | ||
""" | ||
IMPLICIT = "implicit" | ||
PASSWORD = "password" | ||
CLIENT_CREDENTIALS = "clientCredentials " | ||
AUTHORIZATION_CODE = "authorizationCode" | ||
|
||
|
||
@dataclass | ||
class OAuth2Flow: | ||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#oauthFlowObject | ||
""" | ||
scopes: Sequence[str] | ||
authorization_url: Optional[str] = None | ||
token_url: Optional[str] = None | ||
refresh_url: Optional[str] = None | ||
|
||
@staticmethod | ||
def forge( | ||
type_: Type["OAuth2Flow"], | ||
data: JSONMapping, | ||
forge: Forge | ||
) -> "OAuth2Flow": | ||
return type_( | ||
scopes=forge( | ||
type_.__annotations__["scopes"], | ||
data.get("scopes") | ||
), | ||
authorization_url=forge( | ||
type_.__annotations__["authorization_url"], | ||
data.get("authorizationUrl") | ||
), | ||
token_url=forge( | ||
type_.__annotations__["token_url"], | ||
data.get("tokenUrl") | ||
), | ||
refresh_url=forge( | ||
type_.__annotations__["refresh_url"], | ||
data.get("refreshUrl") | ||
) | ||
) | ||
|
||
|
||
register_forge(OAuth2Flow, OAuth2Flow.forge) | ||
|
||
|
||
class SecuritySchemesType(Enum): | ||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#securitySchemeObject | ||
""" | ||
USER_PASSWORD = "userPassword" | ||
API_KEY = "apiKey" | ||
X509 = "X509" | ||
SYMMETRIC_ENCRYPTION = "symmetricEncryption" | ||
ASYMMETRIC_ENCRYPTION = "asymmetricEncryption" | ||
HTTP_API_KEY = "httpApiKey" | ||
HTTP = "http" | ||
OAUTH2 = "oauth2" | ||
OPENID_CONNECT = "openIdConnect" | ||
PLAIN = "plain" | ||
SCRAM_SHA256 = "scramSha256" | ||
SCRAM_SHA512 = "scramSha512" | ||
GSSAPI = "gssapi" | ||
|
||
|
||
@dataclass | ||
class SecurityScheme: | ||
""" | ||
https://www.asyncapi.com/docs/specifications/v2.2.0#securitySchemeObject | ||
""" | ||
type: SecuritySchemesType | ||
description: Optional[str] = None | ||
name: Optional[str] = None # Required for httpApiKey | ||
in_: Optional[str] = None # Required for httpApiKey | apiKey | ||
scheme: Optional[HTTPSecuritySchemeType] = None # Required for http | ||
bearer_format: Optional[str] = None # Optional for http ("bearer") | ||
flows: Optional[Mapping[OAuth2FlowType, OAuth2Flow]] = None # Required for oauth2 | ||
open_id_connect_url: Optional[str] = None # Required for openIdConnect | ||
|
||
x_basic_info_func: Optional[str] = None # Required for http(basic) | ||
x_token_info_func: Optional[str] = None # Required for oauth2 | ||
x_api_key_info_func: Optional[str] = None # Required for apiKey | ||
|
||
@staticmethod | ||
def forge( | ||
type_: Type["SecurityScheme"], | ||
data: JSONMapping, | ||
forge: Forge | ||
) -> "SecurityScheme": | ||
|
||
scheme_type_raw = data.get("type") | ||
if not scheme_type_raw: | ||
raise UnsupportedSecurityScheme | ||
try: | ||
SecuritySchemesType(scheme_type_raw) | ||
except ValueError: | ||
raise UnsupportedSecurityScheme(scheme_type_raw) | ||
|
||
return type_( | ||
type=forge( | ||
type_.__annotations__["type"], | ||
data.get("type") | ||
), | ||
description=forge( | ||
type_.__annotations__["description"], | ||
data.get("description") | ||
), | ||
name=forge( | ||
type_.__annotations__["name"], | ||
data.get("name") | ||
), | ||
in_=forge( | ||
type_.__annotations__["in_"], | ||
data.get("in") | ||
), | ||
scheme=forge( | ||
type_.__annotations__["scheme"], | ||
data.get("scheme") | ||
), | ||
bearer_format=forge( | ||
type_.__annotations__["bearer_format"], | ||
data.get("bearerFormat") | ||
), | ||
flows=forge( | ||
type_.__annotations__["flows"], | ||
data.get("flows") | ||
), | ||
open_id_connect_url=forge( | ||
type_.__annotations__["open_id_connect_url"], | ||
data.get("openIdConnectUrl") | ||
), | ||
x_basic_info_func=forge( | ||
type_.__annotations__["x_basic_info_func"], | ||
data.get("x-basicInfoFunc") | ||
), | ||
x_token_info_func=forge( | ||
type_.__annotations__["x_token_info_func"], | ||
data.get("x-tokenInfoFunc") | ||
), | ||
x_api_key_info_func=forge( | ||
type_.__annotations__["x_api_key_info_func"], | ||
data.get("x-apiKeyInfoFunc") | ||
) | ||
) | ||
|
||
|
||
register_forge(SecurityScheme, SecurityScheme.forge) | ||
|
||
|
||
@dataclass | ||
class SecurityRequirement: | ||
# https://www.asyncapi.com/docs/specifications/v2.2.0#securityRequirementObject | ||
name: str | ||
scopes: Sequence[str] | ||
scheme: SecurityScheme | ||
|
||
@staticmethod | ||
def forge( | ||
type_: Type["SecurityRequirement"], | ||
data: JSONMapping, | ||
forge: Forge | ||
) -> "SecurityRequirement": | ||
return type_( | ||
name=forge(type_.__annotations__["name"], data.get("name")), | ||
scopes=forge(type_.__annotations__["scopes"], data.get("scopes")), | ||
scheme=forge(type_.__annotations__["scheme"], data.get("scheme")) | ||
) | ||
|
||
|
||
register_forge(SecurityRequirement, SecurityRequirement.forge) |
Oops, something went wrong.