Skip to content

Commit

Permalink
Merge pull request #1159 from james-garner-canonical/2024-10/style/ty…
Browse files Browse the repository at this point in the history
…pe-hint-constraints

#1159

#### Description

While looking into #1105, I found it useful to type annotate `juju/constraints.py` while figuring out how things work. 


#### QA Steps

There should be no runtime changes in behaviour. Unit and integration tests should continue to pass, especially `tests/unit/test_constraints.py`.


#### Notes & Discussion

I also broke the longer regexes down across multiple lines and added comments to help me understand them.

I don't mind cutting this PR back to use more minimal type annotations, etc if that's deemed more appropriate in review (e.g. Dict[str, Union[int, float, bool]] instead of the custom TypedDicts).
  • Loading branch information
jujubot authored Oct 16, 2024
2 parents 927dd12 + c49e474 commit 3911d1f
Showing 1 changed file with 90 additions and 21 deletions.
111 changes: 90 additions & 21 deletions juju/constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#

import re
from typing import Dict, List, Optional, TypedDict, Union
from typing_extensions import Required, NotRequired


# Matches on a string specifying memory size
MEM = re.compile('^[1-9][0-9]*[MGTP]$')
Expand Down Expand Up @@ -52,15 +55,37 @@
"spaces",
"virt_type",
"zones",
"allocate_public_ip"]
"allocate_public_ip",
]

LIST_KEYS = {'tags', 'spaces', 'zones'}

SNAKE1 = re.compile(r'(.)([A-Z][a-z]+)')
SNAKE2 = re.compile('([a-z0-9])([A-Z])')


def parse(constraints):
ParsedValue = Union[int, bool, str]


class ConstraintsDict(TypedDict, total=False):
allocate_public_ip: ParsedValue
arch: ParsedValue
container: ParsedValue
cores: ParsedValue
cpu_cores: ParsedValue
cpu_power: ParsedValue
instance_role: ParsedValue
instance_type: ParsedValue
mem: ParsedValue
root_disk: ParsedValue
root_dist_source: ParsedValue
spaces: List[ParsedValue]
tags: List[ParsedValue]
virt_type: ParsedValue
zones: List[ParsedValue]


def parse(constraints: Union[str, ConstraintsDict]) -> Optional[ConstraintsDict]:
"""
Constraints must be expressed as a string containing only spaces
and key value pairs joined by an '='.
Expand All @@ -69,23 +94,26 @@ def parse(constraints):
if not constraints:
return None

if type(constraints) is dict:
if isinstance(constraints, dict):
# Fowards compatibilty: already parsed
return constraints

normalized_constraints = {}
normalized_constraints: ConstraintsDict = {}
for s in constraints.split(" "):
if "=" not in s:
raise Exception("malformed constraint %s" % s)
raise ValueError("malformed constraint %s" % s)

k, v = s.split("=")
normalized_constraints[normalize_key(k)] = normalize_list_value(v) if\
k in LIST_KEYS else normalize_value(v)
normalized_constraints[normalize_key(k)] = (
normalize_list_value(v)
if k in LIST_KEYS
else normalize_value(v)
)

return normalized_constraints


def normalize_key(orig_key):
def normalize_key(orig_key: str) -> str:
key = orig_key.strip()

key = key.replace("-", "_") # Our _client lib wants "_" in place of "-"
Expand All @@ -95,11 +123,11 @@ def normalize_key(orig_key):
key = SNAKE2.sub(r'\1_\2', key).lower()

if key not in SUPPORTED_KEYS:
raise Exception("unknown constraint in %s" % orig_key)
raise ValueError("unknown constraint in %s" % orig_key)
return key


def normalize_value(value):
def normalize_value(value: str) -> Union[int, bool, str]:
value = value.strip()

if MEM.match(value):
Expand All @@ -117,22 +145,43 @@ def normalize_value(value):
return value


def normalize_list_value(value):
def normalize_list_value(value: str) -> List[ParsedValue]:
values = value.strip().split(',')
return [normalize_value(value) for value in values]


STORAGE = re.compile(
'(?:(?:^|(?<=,))(?:|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)|(?P<count>-?[0-9]+)|(?:(?P<size>-?[0-9]+(?:\\.[0-9]+)?)(?P<size_exp>[MGTPEZY])(?:i?B)?))(?:$|,))')


def parse_storage_constraint(constraint):
storage = {'count': 1}
# original regex:
# '(?:(?:^|(?<=,))(?:|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)|(?P<count>-?[0-9]+)|(?:(?P<size>-?[0-9]+(?:\\.[0-9]+)?)(?P<size_exp>[MGTPEZY])(?:i?B)?))(?:$|,))'
# with formatting and explanation -- note that this regex is used with re.finditer:
'(?:'
'(?:^|(?<=,))' # start of string or previous match ends with ','
'(?:' # match one of the following:
'|(?P<pool>[a-zA-Z]+[-?a-zA-Z0-9]*)' # * pool: a sequence starting with a letter, ending with a letter or number,
# ------- and including letters, numbers and hyphens (no more than one in a row)
'|(?P<count>-?[0-9]+)' # * count: an optional minus sign followed by one or more digits
'|(?:' # * size (number) and size_exp (units):
'(?P<size>-?[0-9]+(?:\\.[0-9]+)?)' # -- * an optional minus sign followed by one or more digits, optionally with decimal point and more digits
'(?P<size_exp>[MGTPEZY])(?:i?B)?)' # -- * one of MGTPEZY, optionally followed by iB or B, for example 1M or 2.0MB or -3.3MiB
')'
'(?:$|,)' # end of string or ','
')'
)


class StorageConstraintDict(TypedDict):
count: Required[int] # >= 1
pool: NotRequired[str]
size: NotRequired[int]


def parse_storage_constraint(constraint: str) -> StorageConstraintDict:
storage: StorageConstraintDict = {'count': 1}
for m in STORAGE.finditer(constraint):
pool = m.group('pool')
if pool:
if 'pool' in storage:
raise Exception("pool already specified")
raise ValueError("pool already specified")
storage['pool'] = pool
count = m.group('count')
if count:
Expand All @@ -145,15 +194,35 @@ def parse_storage_constraint(constraint):


DEVICE = re.compile(
'^(?P<count>[0-9]+)?(?:^|,)(?P<type>[^,]+)(?:$|,(?!$))(?P<attrs>(?:[^=]+=[^;]+)+)*$')
# original regex:
# '^(?P<count>[0-9]+)?(?:^|,)(?P<type>[^,]+)(?:$|,(?!$))(?P<attrs>(?:[^=]+=[^;]+)+)*$'
# with formatting and explanation -- note this regex is used with re.match:
'^' # start of string
'(?P<count>[0-9]+)?' # count is 1+ digits, and is optional
'(?:^|,)' # match start of string or a comma
# -- so type can be at the start or comma separated from count
'(?P<type>[^,]+)' # type is 1+ anything not a comma (including digits), and is required
'(?:$|,(?!$))' # match end of string | or a non-trailing comma
# -- so type can be at the end or followed by attrs
'(?P<attrs>(?:[^=]+=[^;]+)+)*' # attrs is any number of semicolon separated key=value items
# -- value can have spare '=' inside, possible not intended
# -- attrs will be matched with ATTR.finditer afterwards in parse_device_constraint
'$' # end of string
)
ATTR = re.compile(';?(?P<key>[^=]+)=(?P<value>[^;]+)')


def parse_device_constraint(constraint):
class DeviceConstraintDict(TypedDict):
count: Required[int]
type: Required[str]
attributes: NotRequired[Dict[str, str]]


def parse_device_constraint(constraint: str) -> DeviceConstraintDict:
m = DEVICE.match(constraint)
if m is None:
raise Exception("device constraint does not match")
device = {}
raise ValueError("device constraint does not match")
device: DeviceConstraintDict = {}
count = m.group('count')
if count:
count = int(count)
Expand Down

0 comments on commit 3911d1f

Please sign in to comment.