Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Style deserializing reimplementation #694

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion openapi_core/deserializing/styles/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
from openapi_core.deserializing.styles.datatypes import StyleDeserializersDict
from openapi_core.deserializing.styles.factories import (
StyleDeserializersFactory,
)
from openapi_core.deserializing.styles.util import deep_object_loads
from openapi_core.deserializing.styles.util import form_loads
from openapi_core.deserializing.styles.util import label_loads
from openapi_core.deserializing.styles.util import matrix_loads
from openapi_core.deserializing.styles.util import pipe_delimited_loads
from openapi_core.deserializing.styles.util import simple_loads
from openapi_core.deserializing.styles.util import space_delimited_loads

__all__ = ["style_deserializers_factory"]

style_deserializers_factory = StyleDeserializersFactory()
style_deserializers: StyleDeserializersDict = {
"matrix": matrix_loads,
"label": label_loads,
"form": form_loads,
"simple": simple_loads,
"spaceDelimited": space_delimited_loads,
"pipeDelimited": pipe_delimited_loads,
"deepObject": deep_object_loads,
}

style_deserializers_factory = StyleDeserializersFactory(
style_deserializers=style_deserializers,
)
6 changes: 5 additions & 1 deletion openapi_core/deserializing/styles/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Mapping

DeserializerCallable = Callable[[str], List[str]]
DeserializerCallable = Callable[[bool, str, str, Mapping[str, Any]], Any]
StyleDeserializersDict = Dict[str, DeserializerCallable]
42 changes: 14 additions & 28 deletions openapi_core/deserializing/styles/deserializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any
from typing import Callable
from typing import List
from typing import Mapping
from typing import Optional

from jsonschema_path import SchemaPath
Expand All @@ -11,46 +12,31 @@
from openapi_core.deserializing.styles.exceptions import (
EmptyQueryParameterValue,
)
from openapi_core.schema.parameters import get_aslist
from openapi_core.schema.parameters import get_explode


class CallableStyleDeserializer:
class StyleDeserializer:
def __init__(
self,
param_or_header: SchemaPath,
style: str,
explode: bool,
name: str,
schema_type: str,
deserializer_callable: Optional[DeserializerCallable] = None,
):
self.param_or_header = param_or_header
self.style = style
self.explode = explode
self.name = name
self.schema_type = schema_type
self.deserializer_callable = deserializer_callable

self.aslist = get_aslist(self.param_or_header)
self.explode = get_explode(self.param_or_header)

def deserialize(self, value: Any) -> Any:
def deserialize(self, location: Mapping[str, Any]) -> Any:
if self.deserializer_callable is None:
warnings.warn(f"Unsupported {self.style} style")
return value

# if "in" not defined then it's a Header
if "allowEmptyValue" in self.param_or_header:
warnings.warn(
"Use of allowEmptyValue property is deprecated",
DeprecationWarning,
)
allow_empty_values = self.param_or_header.getkey(
"allowEmptyValue", False
)
location_name = self.param_or_header.getkey("in", "header")
if location_name == "query" and value == "" and not allow_empty_values:
name = self.param_or_header["name"]
raise EmptyQueryParameterValue(name)
return location[self.name]

if not self.aslist or self.explode:
return value
try:
return self.deserializer_callable(value)
return self.deserializer_callable(
self.explode, self.name, self.schema_type, location
)
except (ValueError, TypeError, AttributeError):
raise DeserializeError(location_name, self.style, value)
raise DeserializeError(self.style, self.name)
37 changes: 23 additions & 14 deletions openapi_core/deserializing/styles/factories.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
import re
from functools import partial
from typing import Any
from typing import Dict
from typing import Mapping
from typing import Optional

from jsonschema_path import SchemaPath

from openapi_core.deserializing.styles.datatypes import DeserializerCallable
from openapi_core.deserializing.styles.deserializers import (
CallableStyleDeserializer,
)
from openapi_core.deserializing.styles.datatypes import StyleDeserializersDict
from openapi_core.deserializing.styles.deserializers import StyleDeserializer
from openapi_core.deserializing.styles.util import split
from openapi_core.schema.parameters import get_explode
from openapi_core.schema.parameters import get_style


class StyleDeserializersFactory:
STYLE_DESERIALIZERS: Dict[str, DeserializerCallable] = {
"form": partial(split, separator=","),
"simple": partial(split, separator=","),
"spaceDelimited": partial(split, separator=" "),
"pipeDelimited": partial(split, separator="|"),
"deepObject": partial(re.split, pattern=r"\[|\]"),
}
def __init__(
self,
style_deserializers: Optional[StyleDeserializersDict] = None,
):
if style_deserializers is None:
style_deserializers = {}
self.style_deserializers = style_deserializers

def create(self, param_or_header: SchemaPath) -> CallableStyleDeserializer:
def create(
self, param_or_header: SchemaPath, name: Optional[str] = None
) -> StyleDeserializer:
name = name or param_or_header["name"]
style = get_style(param_or_header)
explode = get_explode(param_or_header)
schema = param_or_header / "schema"
schema_type = schema.getkey("type", "")

deserialize_callable = self.STYLE_DESERIALIZERS.get(style)
return CallableStyleDeserializer(
param_or_header, style, deserialize_callable
deserialize_callable = self.style_deserializers.get(style)
return StyleDeserializer(
style, explode, name, schema_type, deserialize_callable
)
201 changes: 199 additions & 2 deletions openapi_core/deserializing/styles/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,202 @@
import re
from functools import partial
from typing import Any
from typing import List
from typing import Mapping
from typing import Optional

from openapi_core.schema.protocols import SuportsGetAll
from openapi_core.schema.protocols import SuportsGetList

def split(value: str, separator: str = ",") -> List[str]:
return value.split(separator)

def split(value: str, separator: str = ",", step: int = 1) -> List[str]:
parts = value.split(separator)

if step == 1:
return parts

result = []
for i in range(len(parts)):
if i % step == 0:
if i + 1 < len(parts):
result.append(parts[i] + separator + parts[i + 1])
return result


def delimited_loads(
explode: bool,
name: str,
schema_type: str,
location: Mapping[str, Any],
delimiter: str,
) -> Any:
value = location[name]

explode_type = (explode, schema_type)
if explode_type == (False, "array"):
return split(value, separator=delimiter)
if explode_type == (False, "object"):
return dict(
map(
partial(split, separator=delimiter),
split(value, separator=delimiter, step=2),
)
)

raise ValueError("not available")


def matrix_loads(
explode: bool, name: str, schema_type: str, location: Mapping[str, Any]
) -> Any:
if explode == False:
m = re.match(rf"^;{name}=(.*)$", location[f";{name}"])
if m is None:
raise KeyError(name)
value = m.group(1)
# ;color=blue,black,brown
if schema_type == "array":
return split(value)
# ;color=R,100,G,200,B,150
if schema_type == "object":
return dict(map(split, split(value, step=2)))
# .;color=blue
return value
else:
# ;color=blue;color=black;color=brown
if schema_type == "array":
return re.findall(rf";{name}=([^;]*)", location[f";{name}*"])
# ;R=100;G=200;B=150
if schema_type == "object":
value = location[f";{name}*"]
return dict(
map(
partial(split, separator="="),
split(value[1:], separator=";"),
)
)
# ;color=blue
m = re.match(rf"^;{name}=(.*)$", location[f";{name}*"])
if m is None:
raise KeyError(name)
value = m.group(1)
return value


def label_loads(
explode: bool, name: str, schema_type: str, location: Mapping[str, Any]
) -> Any:
if explode == False:
value = location[f".{name}"]
# .blue,black,brown
if schema_type == "array":
return split(value[1:])
# .R,100,G,200,B,150
if schema_type == "object":
return dict(map(split, split(value[1:], separator=",", step=2)))
# .blue
return value[1:]
else:
value = location[f".{name}*"]
# .blue.black.brown
if schema_type == "array":
return split(value[1:], separator=".")
# .R=100.G=200.B=150
if schema_type == "object":
return dict(
map(
partial(split, separator="="),
split(value[1:], separator="."),
)
)
# .blue
return value[1:]


def form_loads(
explode: bool, name: str, schema_type: str, location: Mapping[str, Any]
) -> Any:
explode_type = (explode, schema_type)
# color=blue,black,brown
if explode_type == (False, "array"):
return split(location[name], separator=",")
# color=blue&color=black&color=brown
elif explode_type == (True, "array"):
if name not in location:
raise KeyError(name)
if isinstance(location, SuportsGetAll):
return location.getall(name)
if isinstance(location, SuportsGetList):
return location.getlist(name)
return location[name]

value = location[name]
# color=R,100,G,200,B,150
if explode_type == (False, "object"):
return dict(map(split, split(value, separator=",", step=2)))
# R=100&G=200&B=150
elif explode_type == (True, "object"):
return dict(
map(partial(split, separator="="), split(value, separator="&"))
)

# color=blue
return value


def simple_loads(
explode: bool, name: str, schema_type: str, location: Mapping[str, Any]
) -> Any:
value = location[name]

# blue,black,brown
if schema_type == "array":
return split(value, separator=",")

explode_type = (explode, schema_type)
# R,100,G,200,B,150
if explode_type == (False, "object"):
return dict(map(split, split(value, separator=",", step=2)))
# R=100,G=200,B=150
elif explode_type == (True, "object"):
return dict(
map(partial(split, separator="="), split(value, separator=","))
)

# blue
return value


def space_delimited_loads(
explode: bool, name: str, schema_type: str, location: Mapping[str, Any]
) -> Any:
return delimited_loads(
explode, name, schema_type, location, delimiter="%20"
)


def pipe_delimited_loads(
explode: bool, name: str, schema_type: str, location: Mapping[str, Any]
) -> Any:
return delimited_loads(explode, name, schema_type, location, delimiter="|")


def deep_object_loads(
explode: bool, name: str, schema_type: str, location: Mapping[str, Any]
) -> Any:
explode_type = (explode, schema_type)

if explode_type != (True, "object"):
raise ValueError("not available")

keys_str = " ".join(location.keys())
if not re.search(rf"{name}\[\w+\]", keys_str):
raise KeyError(name)

values = {}
for key, value in location.items():
# Split the key from the brackets.
key_split = re.split(pattern=r"\[|\]", string=key)
if key_split[0] == name:
values[key_split[1]] = value
return values
Loading