Skip to content

Commit

Permalink
Implement host find.
Browse files Browse the repository at this point in the history
  - Also makes get_list do the logic of checking hit limits and exception handling.
  - Only the API module has any idea of endpoint addresses for pydantic calls. The rest of the client code does not care.
  - Implements a IPAddressField for Hosts to reduce the number of str-fields we have. We should ideally have none.
  • Loading branch information
terjekv committed Apr 9, 2024
1 parent 368a6aa commit faeb23c
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 29 deletions.
13 changes: 11 additions & 2 deletions mreg_cli/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
guarantees about the data it is working with.
"""

from mreg_cli.api.models import HostModel
from mreg_cli.utilities.api import get
from typing import Dict, Union

from mreg_cli.api.models import HostList, HostModel
from mreg_cli.utilities.api import get, get_list
from mreg_cli.utilities.host import clean_hostname


Expand All @@ -16,3 +18,10 @@ def get_host(name: str) -> HostModel:
hostname = clean_hostname(name)
data = get(f"/api/v1/hosts/{hostname}")
return HostModel(**data.json())


def get_hosts(params: Dict[str, Union[str, int]]) -> HostList:
"""Get a list of hosts."""
endpoint = "/api/v1/hosts/"
data = get_list(endpoint, params=params)
return HostList(results=[HostModel(**host_data) for host_data in data])
120 changes: 101 additions & 19 deletions mreg_cli/api/models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Pydantic models for the mreg_cli package."""

from typing import Annotated, List, Optional

from pydantic import BaseModel, EmailStr, validator
import ipaddress
from typing import Annotated, Any, Dict, List, Optional, Union

from pydantic import BaseModel, EmailStr, root_validator, validator
from pydantic.types import StringConstraints

from mreg_cli.log import cli_warning
from mreg_cli.outputmanager import OutputManager
from mreg_cli.utilities.validators import is_valid_ipv4, is_valid_ipv6

MACAddressT = Annotated[
str, StringConstraints(pattern=r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$")
Expand All @@ -15,38 +17,63 @@
HostT = Annotated[str, StringConstraints(min_length=1, max_length=255)]


class IPAddressField(BaseModel):
"""Represents an IP address, automatically determines if it's IPv4 or IPv6."""

address: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]

@validator("address", pre=True)
def parse_ip_address(cls, value: str) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""Parse and validate the IP address."""
try:
return ipaddress.ip_address(value)
except ValueError as e:
raise ValueError(f"Invalid IP address '{value}'.") from e

def is_ipv4(self) -> bool:
"""Check if the IP address is IPv4."""
return isinstance(self.address, ipaddress.IPv4Address)

def is_ipv6(self) -> bool:
"""Check if the IP address is IPv6."""
return isinstance(self.address, ipaddress.IPv6Address)

def __str__(self) -> str:
"""Return the IP address as a string."""
return str(self.address)


class IPAddress(BaseModel):
"""Represents an IP address with associated details."""

macaddress: Optional[MACAddressT] = None
ipaddress: str
ipaddress: IPAddressField
host: int

@validator("macaddress", pre=True, allow_reuse=True)
def empty_string_to_none(cls, v: str):
"""Convert empty strings to None."""
return v or None

@validator("ipaddress")
def validate_ipaddress(cls, v: str):
"""Validate the IP address format."""
if is_valid_ipv4(v):
return v
elif is_valid_ipv6(v):
return v
raise ValueError("Invalid IP address format")
@root_validator(pre=True)
def convert_ip_address(cls, values: Any):
"""Convert ipaddress string to IPAddressField if necessary."""
ip_address = values.get("ipaddress")
if isinstance(ip_address, str):
values["ipaddress"] = {"address": ip_address}
return values

def __str__(self):
"""Return the IP address as a string."""
return self.ipaddress
return self.ipaddress.__str__()

def is_ipv4(self):
def is_ipv4(self) -> bool:
"""Return True if the IP address is IPv4."""
return is_valid_ipv4(self.ipaddress)
return self.ipaddress.is_ipv4()

def is_ipv6(self):
def is_ipv6(self) -> bool:
"""Return True if the IP address is IPv6."""
return is_valid_ipv6(self.ipaddress)
return self.ipaddress.is_ipv6()


class CNAME(BaseModel):
Expand Down Expand Up @@ -105,7 +132,9 @@ def output_host_info(self, names: bool = False):
output_manager.add_line(f"Contact: {self.contact}")

# Calculate padding
len_ip = max(14, max([len(ip.ipaddress) for ip in self.ipaddresses], default=0) + 1)
len_ip = max(
14, max([len(ip.ipaddress.__str__()) for ip in self.ipaddresses], default=0) + 1
)
len_names = (
14
if not names
Expand All @@ -120,7 +149,60 @@ def output_host_info(self, names: bool = False):
if records:
output_manager.add_line(f"{record_type:<{len_names}}IP{' ' * (len_ip - 2)}MAC")
for record in records:
ip = record.ipaddress
ip = record.ipaddress.__str__()
mac = record.macaddress if record.macaddress else "<not set>"
name = str(record.host) if names else ""
output_manager.add_line(f"{name:<{len_names}}{ip:<{len_ip}}{mac}")


class HostList(BaseModel):
"""Model for a list of hosts.
This is the endpoint at /api/v1/hosts/.
"""

results: List[HostModel]

@validator("results", pre=True)
def check_results(cls, v: List[Dict[str, str]]):
"""Check that the results are valid."""
return v

def __len__(self):
"""Return the number of results."""
return len(self.results)

def __getitem__(self, key: int) -> HostModel:
"""Get a result by index."""
return self.results[key]

def __str__(self):
"""Return a string representation of the results."""
return str(self.results)

def __repr__(self):
"""Return a string representation of the results."""
return repr(self.results)

def count(self):
"""Return the number of results."""
return len(self.results)

def output_host_list(self):
"""Output a list of hosts to the console."""
if not self.results:
cli_warning("No hosts found.")

max_name = max_contact = 20
for i in self.results:
max_name = max(max_name, len(i.name))
max_contact = max(max_contact, len(i.contact))

def _format(name: str, contact: str, comment: str) -> None:
OutputManager().add_line(
"{0:<{1}} {2:<{3}} {4}".format(name, max_name, contact, max_contact, comment)
)

_format("Name", "Contact", "Comment")
for i in self.results:
_format(i.name, i.contact, i.comment or "")
53 changes: 52 additions & 1 deletion mreg_cli/commands/host_submodules/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ def host_info_pydantic(args: argparse.Namespace) -> None:
import mreg_cli.api as api

host = api.get_host(args.hosts[0])
print(host)
host.output_host_info()


Expand Down Expand Up @@ -356,6 +355,58 @@ def _print(name: str, contact: str, comment: str) -> None:
_print(i["name"], i["contact"], i["comment"])


@command_registry.register_command(
prog="find_pydantic",
description="Lists hosts matching search criteria",
short_desc="Lists hosts matching search criteria",
flags=[
Flag(
"-name",
description="Name or part of name",
short_desc="Name or part of name",
metavar="NAME",
),
Flag(
"-comment",
description="Comment or part of comment",
short_desc="Comment or part of comment",
metavar="CONTACT",
),
Flag(
"-contact",
description="Contact or part of contact",
short_desc="Contact or part of contact",
metavar="CONTACT",
),
],
)
def find_pydantic(args: argparse.Namespace) -> None:
"""List hosts maching search criteria.
:param args: argparse.Namespace (name, comment, contact)
"""
import mreg_cli.api as api

def _add_param(param: str, value: str) -> None:
param, value = convert_wildcard_to_regex(param, value, True)
params[param] = value

if not any([args.name, args.comment, args.contact]):
cli_warning("Need at least one search critera")

params: Dict[str, Union[str, int]] = {
"ordering": "name",
}

for param in ("contact", "comment", "name"):
value = getattr(args, param)
if value:
_add_param(param, value)

hostlist = api.get_hosts(params)
hostlist.output_host_list()


@command_registry.register_command(
prog="rename",
description="Rename host. If the old name is an alias then the alias is renamed.",
Expand Down
44 changes: 37 additions & 7 deletions mreg_cli/utilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,27 +221,57 @@ def get(


def get_list(
path: Optional[str], params: Optional[Dict[str, Any]] = None, ok404: bool = False
path: str,
params: Optional[Dict[str, Any]] = None,
ok404: bool = False,
max_hits_to_allow: Optional[int] = 500,
) -> List[Dict[str, Any]]:
"""Make a get request that produces a list.
Will iterate over paginated results and return result as list.
Will iterate over paginated results and return result as list. If the number of hits is
greater than max_hits_to_allow, the function will raise an exception.
Parameters
----------
path : str
The path to the API endpoint.
params : dict, optional
The parameters to pass to the API endpoint.
ok404 : bool, optional
Whether to allow 404 responses.
max_hits_to_allow : int, optional
The maximum number of hits to allow. If the number of hits is greater than this, the
function will raise an exception.
Returns
-------
* A list of dictionaries.
"""
if params is None:
params = {}

ret: List[Dict[str, Any]] = []
while path:

# Get the first page to check the number of hits, and raise an exception if it is too high.
get_params = params.copy()
get_params["page_size"] = 1
resp = get(path, get_params).json()
if "count" in resp and resp["count"] > max_hits_to_allow:
raise cli_warning(f"Too many hits ({resp['count']}), please refine your search criteria.")

while True:
resp = get(path, params=params, ok404=ok404)
if resp is None:
return ret
result = resp.json()

if "next" in result:
ret.extend(result["results"])

if "next" in result and result["next"]:
path = result["next"]
ret.extend(result["results"])
else:
path = None
return ret
return ret


def post(
Expand Down

0 comments on commit faeb23c

Please sign in to comment.