Skip to content

Commit

Permalink
Initial pydantic testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
terjekv committed Jan 29, 2024
1 parent f5ad0ee commit 368a6aa
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 0 deletions.
18 changes: 18 additions & 0 deletions mreg_cli/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""API glue code for the mreg_cli package.
Originally the API code took whatever JSON data it received and returned it as a dictionary.
This led to horrible code that was hard to maintain and debug. This module is an attempt to
fix that by using pydantic models to validate incoming data so the client code has
guarantees about the data it is working with.
"""

from mreg_cli.api.models import HostModel
from mreg_cli.utilities.api import get
from mreg_cli.utilities.host import clean_hostname


def get_host(name: str) -> HostModel:
"""Get a host by name."""
hostname = clean_hostname(name)
data = get(f"/api/v1/hosts/{hostname}")
return HostModel(**data.json())
126 changes: 126 additions & 0 deletions mreg_cli/api/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Pydantic models for the mreg_cli package."""

from typing import Annotated, List, Optional

from pydantic import BaseModel, EmailStr, validator
from pydantic.types import StringConstraints

from mreg_cli.outputmanager import OutputManager
from mreg_cli.utilities.validators import is_valid_ipv4, is_valid_ipv6

MACAddressT = Annotated[
str, StringConstraints(pattern=r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$")
]

HostT = Annotated[str, StringConstraints(min_length=1, max_length=255)]


class IPAddress(BaseModel):
"""Represents an IP address with associated details."""

macaddress: Optional[MACAddressT] = None
ipaddress: str
host: int

@validator("macaddress", pre=True, allow_reuse=True)
def empty_string_to_none(cls, v: str):
"""Convert empty strings to None."""
return v or None

@validator("ipaddress")
def validate_ipaddress(cls, v: str):
"""Validate the IP address format."""
if is_valid_ipv4(v):
return v
elif is_valid_ipv6(v):
return v
raise ValueError("Invalid IP address format")

def __str__(self):
"""Return the IP address as a string."""
return self.ipaddress

def is_ipv4(self):
"""Return True if the IP address is IPv4."""
return is_valid_ipv4(self.ipaddress)

def is_ipv6(self):
"""Return True if the IP address is IPv6."""
return is_valid_ipv6(self.ipaddress)


class CNAME(BaseModel):
"""Represents a CNAME record."""

name: HostT
ttl: Optional[int] = None
zone: int
host: int


class TXT(BaseModel):
"""Represents a TXT record."""

txt: str
host: int


class HostModel(BaseModel):
"""Model for an individual host.
This is the endpoint at /api/v1/hosts/<id>.
"""

name: HostT
ipaddresses: List[IPAddress]
cnames: List[CNAME] = []
mxs: List[str] = []
txts: List[TXT] = []
ptr_overrides: List[str] = []
hinfo: Optional[str] = None
loc: Optional[str] = None
bacnetid: Optional[str] = None
contact: EmailStr
ttl: Optional[int] = None
comment: Optional[str] = None
zone: int

@validator("comment", pre=True, allow_reuse=True)
def empty_string_to_none(cls, v: str):
"""Convert empty strings to None."""
return v or None

def ipv4_addresses(self):
"""Return a list of IPv4 addresses."""
return [ip for ip in self.ipaddresses if ip.is_ipv4()]

def ipv6_addresses(self):
"""Return a list of IPv6 addresses."""
return [ip for ip in self.ipaddresses if ip.is_ipv6()]

def output_host_info(self, names: bool = False):
"""Output host information to the console with padding."""
output_manager = OutputManager()
output_manager.add_line(f"Name: {self.name}")
output_manager.add_line(f"Contact: {self.contact}")

# Calculate padding
len_ip = max(14, max([len(ip.ipaddress) for ip in self.ipaddresses], default=0) + 1)
len_names = (
14
if not names
else max(14, max([len(str(ip.host)) for ip in self.ipaddresses], default=0) + 1)
)

# Separate and output A and AAAA records
for record_type, records in (
("A_Records", self.ipv4_addresses()),
("AAAA_Records", self.ipv6_addresses()),
):
if records:
output_manager.add_line(f"{record_type:<{len_names}}IP{' ' * (len_ip - 2)}MAC")
for record in records:
ip = record.ipaddress
mac = record.macaddress if record.macaddress else "<not set>"
name = str(record.host) if names else ""
output_manager.add_line(f"{name:<{len_names}}{ip:<{len_ip}}{mac}")
23 changes: 23 additions & 0 deletions mreg_cli/commands/host_submodules/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,29 @@ def remove(args: argparse.Namespace) -> None:
cli_info("removed {}".format(info["name"]), print_msg=True)


@command_registry.register_command(
prog="info_pydantic",
description="Print info about one or more hosts.",
short_desc="Print info about one or more hosts.",
flags=[
Flag(
"hosts",
description="One or more hosts given by their name, ip or mac.",
short_desc="One or more names, ips or macs.",
nargs="+",
metavar="NAME/IP/MAC",
)
],
)
def host_info_pydantic(args: argparse.Namespace) -> None:
"""Print information about host."""
import mreg_cli.api as api

host = api.get_host(args.hosts[0])
print(host)
host.output_host_info()


@command_registry.register_command(
prog="info",
description="Print info about one or more hosts.",
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
python-dateutil
prompt_toolkit>=2
requests
pydantic
pydantic[email]

0 comments on commit 368a6aa

Please sign in to comment.