Skip to content

Commit

Permalink
Add Endpoint enum, migrate add_user, refactor MacAddress.
Browse files Browse the repository at this point in the history
  - Also differentiates between get_list and get_list_unique where one expects many or one hit.
  - Further improve models with better cleaning of macaddress.
  - Avoid string literals for endpoints, use an Enum with optional identifier application.
  • Loading branch information
terjekv committed Apr 9, 2024
1 parent 683379a commit 5457b3d
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 40 deletions.
13 changes: 9 additions & 4 deletions mreg_cli/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@

from typing import Dict, Union

from mreg_cli.api.endpoints import Endpoint
from mreg_cli.api.models import HostList, HostModel
from mreg_cli.utilities.api import get, get_list
from mreg_cli.utilities.api import get, get_list, post
from mreg_cli.utilities.host import clean_hostname


def get_host(name: str) -> HostModel:
"""Get a host by name."""
hostname = clean_hostname(name)
data = get(f"/api/v1/hosts/{hostname}")
data = get(Endpoint.Hosts.with_id(hostname))
return HostModel(**data.json())


def get_hosts(params: Dict[str, Union[str, int]]) -> HostList:
"""Get a list of hosts."""
endpoint = "/api/v1/hosts/"
data = get_list(endpoint, params=params)
data = get_list(Endpoint.Hosts, params=params)
return HostList(results=[HostModel(**host_data) for host_data in data])


def add_host(data: Dict[str, Union[str, None]]) -> None:
"""Add a host."""
post(Endpoint.Hosts, params=None, **data)
15 changes: 15 additions & 0 deletions mreg_cli/api/endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""API endpoints for mreg."""

from enum import Enum
from typing import Union


class Endpoint(str, Enum):
"""API endpoints."""

Hosts = "/api/v1/hosts/"
Ipaddresses = "/api/v1/ipaddresses/"

def with_id(self, identity: Union[str, int]) -> str:
"""Return the endpoint with an ID."""
return f"{self.value}{identity}"
99 changes: 89 additions & 10 deletions mreg_cli/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,56 @@


import ipaddress
import re
from typing import Annotated, Any, Dict, List, Optional, Union

from pydantic import BaseModel, EmailStr, root_validator, validator
from pydantic.types import StringConstraints

from mreg_cli.api.endpoints import Endpoint
from mreg_cli.log import cli_warning
from mreg_cli.outputmanager import OutputManager
from mreg_cli.utilities.api import get_list, patch

MACAddressT = Annotated[
str, StringConstraints(pattern=r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$")
]

IPAddressT = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
HostT = Annotated[str, StringConstraints(min_length=1, max_length=255)]

_mac_regex = re.compile(r"^([0-9A-Fa-f]{2}[.:-]){5}([0-9A-Fa-f]{2})$")


class MACAddressField(BaseModel):
"""Represents a MAC address."""

address: str

@validator("address", pre=True)
def validate_and_format_mac(cls, v: str) -> str:
"""Validate and normalize MAC address to 'aa:bb:cc:dd:ee:ff' format.
:param v: The input MAC address string.
:raises ValueError: If the input does not match the expected MAC address pattern.
:returns: The normalized MAC address.
"""
# Validate input format
if not _mac_regex.match(v):
raise ValueError("Invalid MAC address format")

# Normalize MAC address
v = re.sub(r"[.:-]", "", v).lower()
return ":".join(v[i : i + 2] for i in range(0, 12, 2))

def __str__(self) -> str:
"""Return the MAC address as a string."""
return self.address


class IPAddressField(BaseModel):
"""Represents an IP address, automatically determines if it's IPv4 or IPv6."""

address: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
address: IPAddressT

@validator("address", pre=True)
def parse_ip_address(cls, value: str) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]:
def parse_ip_address(cls, value: str) -> IPAddressT:
"""Parse and validate the IP address."""
try:
return ipaddress.ip_address(value)
Expand All @@ -46,14 +74,18 @@ def __str__(self) -> str:
class IPAddress(BaseModel):
"""Represents an IP address with associated details."""

macaddress: Optional[MACAddressT] = None
id: int # noqa: A003
macaddress: Optional[MACAddressField] = None
ipaddress: IPAddressField
host: int

@validator("macaddress", pre=True, allow_reuse=True)
def empty_string_to_none(cls, v: str):
"""Convert empty strings to None."""
return v or None
def create_valid_macadress_or_none(cls, v: str):
"""Create macaddress or convert empty strings to None."""
if v:
return MACAddressField(address=v)

return None

@root_validator(pre=True)
def convert_ip_address(cls, values: Any):
Expand All @@ -75,6 +107,21 @@ def is_ipv6(self) -> bool:
"""Return True if the IP address is IPv6."""
return self.ipaddress.is_ipv6()

def ip(self) -> IPAddressT:
"""Return the IP address."""
return self.ipaddress.address

def associate_mac(self, mac: Union[MACAddressField, str], force: bool = False):
"""Associate a MAC address with the IP address."""
if isinstance(mac, str):
mac = MACAddressField(address=mac)

if self.macaddress and not force:
cli_warning(f"IP address {self.ipaddress} already has MAC address {self.macaddress}.")

self.macaddress = mac
patch(Endpoint.Ipaddresses.with_id(self.id), macaddress=mac.address)


class CNAME(BaseModel):
"""Represents a CNAME record."""
Expand All @@ -98,6 +145,7 @@ class HostModel(BaseModel):
This is the endpoint at /api/v1/hosts/<id>.
"""

id: int # noqa: A003
name: HostT
ipaddresses: List[IPAddress]
cnames: List[CNAME] = []
Expand Down Expand Up @@ -125,6 +173,37 @@ def ipv6_addresses(self):
"""Return a list of IPv6 addresses."""
return [ip for ip in self.ipaddresses if ip.is_ipv6()]

def associate_mac_to_ip(
self, mac: Union[MACAddressField, str], ip: IPAddressField, force: bool = False
):
"""Associate a MAC address to an IP address."""
if isinstance(mac, str):
mac = MACAddressField(address=mac)

params = {
"macaddress": mac.address,
"ordering": "ipaddress",
}

data = get_list(Endpoint.Ipaddresses, params=params)
ipadresses = [IPAddress(**ip) for ip in data]

if ip in [ip.ipaddress for ip in ipadresses]:
cli_warning(f"IP address {ip} already has MAC address {mac} associated.")

if len(ipadresses) and not force:
cli_warning(
"mac {} already in use by: {}. Use force to add {} -> {} as well.".format(
mac, ipadresses, ip.address, mac
)
)

for myip in self.ipaddresses:
if myip.ipaddress.address == ip.address:
myip.associate_mac(mac, force=force)

cli_warning(f"IP address {ip} not found in host {self.name}.")

def output_host_info(self, names: bool = False):
"""Output host information to the console with padding."""
output_manager = OutputManager()
Expand Down
18 changes: 10 additions & 8 deletions mreg_cli/commands/host_submodules/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from mreg_cli.log import cli_info, cli_warning
from mreg_cli.outputmanager import OutputManager
from mreg_cli.types import Flag
from mreg_cli.utilities.api import delete, get, get_list, patch, post
from mreg_cli.utilities.api import delete, get, get_list, patch
from mreg_cli.utilities.history import format_history_items, get_history_items
from mreg_cli.utilities.host import (
assoc_mac_to_ip,
clean_hostname,
cname_exists,
get_host_by_name,
Expand Down Expand Up @@ -109,20 +108,23 @@ def add(args: argparse.Namespace) -> None:
)

# Create the new host with an ip address
path = "/api/v1/hosts/"
data = {
data: Dict[str, Union[str, None]] = {
"name": name,
"contact": args.contact or None,
"comment": args.comment or None,
}

if args.ip and ip:
data["ipaddress"] = ip

post(path, params=None, **data)
from mreg_cli.api import add_host, get_host

add_host(data)

if args.macaddress is not None:
# It can only be one, as it was just created.
ipdata = get(f"{path}{name}").json()["ipaddresses"][0]
assoc_mac_to_ip(args.macaddress, ipdata, force=args.force)
# There can only be one, as it was just created.
host = get_host(name)
host.associate_mac_to_ip(args.macaddress, host.ipaddresses[0].ipaddress)
msg = f"created host {name}"
if args.ip:
msg += f" with IP {ip}"
Expand Down
96 changes: 78 additions & 18 deletions mreg_cli/utilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,47 +272,107 @@ def get_list(
Will iterate over paginated results and return result as list. If the number of hits is
greater than max_hits_to_allow, the function will raise an exception.
Parameters
----------
path : str
The path to the API endpoint.
params : dict, optional
The parameters to pass to the API endpoint.
ok404 : bool, optional
Whether to allow 404 responses.
max_hits_to_allow : int, optional
The maximum number of hits to allow. If the number of hits is greater than this, the
function will raise an exception.
Returns
-------
* A list of dictionaries.
:param path: The path to the API endpoint.
:param params: The parameters to pass to the API endpoint.
:param ok404: Whether to allow 404 responses.
:param max_hits_to_allow: The maximum number of hits to allow. If the number of hits is
greater than this, the function will raise an exception.
:returns: A list of dictionaries.
"""
ret = get_list_generic(path, params, ok404, max_hits_to_allow, expect_one_result=False)

if not isinstance(ret, list):
raise CliError(f"Expected a list of results, got {type(ret)}.")

return ret


def get_list_unique(
path: str,
params: Optional[Dict[str, Any]] = None,
ok404: bool = False,
) -> Dict[str, Any]:
"""Do a get request that returns a single result from a search.
:param path: The path to the API endpoint.
:param params: The parameters to pass to the API endpoint.
:param ok404: Whether to allow 404 responses.
:returns: A single dictionary.
"""
ret = get_list_generic(path, params, ok404, expect_one_result=True)

if not isinstance(ret, dict):
raise CliError(f"Expected a single result, got {type(ret)}.")

return ret


def get_list_generic(
path: str,
params: Optional[Dict[str, Any]] = None,
ok404: bool = False,
max_hits_to_allow: Optional[int] = 500,
expect_one_result: Optional[bool] = False,
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""Make a get request that produces a list.
Will iterate over paginated results and return result as list. If the number of hits is
greater than max_hits_to_allow, the function will raise an exception.
:param path: The path to the API endpoint.
:param params: The parameters to pass to the API endpoint.
:param ok404: Whether to allow 404 responses.
:param max_hits_to_allow: The maximum number of hits to allow. If the number of hits is
greater than this, the function will raise an exception.
:param expect_one_result: If True, expect exactly one result and return it as a list.
:returns: A list of dictionaries or a dictionary if expect_one_result is True.
"""

def _check_expect_one_result(
ret: List[Dict[str, Any]]
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
if expect_one_result:
if len(ret) != 1:
raise CliError(f"Expected exactly one result, got {len(ret)}.")
if "results" not in ret[0]:
raise CliError("Expected 'results' in response, got none.")

return ret[0]["results"]

return ret

if params is None:
params = {}

ret: List[Dict[str, Any]] = []

# Get the first page to check the number of hits, and raise an exception if it is too high.
get_params = params.copy()
get_params["page_size"] = 1
# get_params["page_size"] = 1
resp = get(path, get_params).json()
if "count" in resp and resp["count"] > max_hits_to_allow:
raise cli_warning(f"Too many hits ({resp['count']}), please refine your search criteria.")

# Short circuit if there are no more pages. This means that there are no more results to
# be had so we can return the results we already have.
if "next" in resp and not resp["next"]:
return _check_expect_one_result(resp["results"])

while True:
resp = get(path, params=params, ok404=ok404)
if resp is None:
return ret
return _check_expect_one_result(ret)
result = resp.json()

ret.extend(result["results"])

if "next" in result and result["next"]:
path = result["next"]
else:
return ret
return _check_expect_one_result(ret)


def post(
Expand Down

0 comments on commit 5457b3d

Please sign in to comment.