diff --git a/hostpolicy/api/permissions.py b/hostpolicy/api/permissions.py index f81cda89..b620956d 100644 --- a/hostpolicy/api/permissions.py +++ b/hostpolicy/api/permissions.py @@ -1,6 +1,6 @@ from rest_framework.permissions import IsAuthenticated, SAFE_METHODS -from mreg.api.permissions import user_is_superuser, user_in_settings_group +from mreg.api.permissions import user_object_is_superuser, user_in_settings_group from mreg.models.host import Host from mreg.models.network import NetGroupRegexPermission @@ -12,7 +12,7 @@ def user_is_hostpolicy_adminuser(user): def is_super_or_hostpolicy_admin(user): - return user_is_superuser(user) or user_is_hostpolicy_adminuser(user) + return user_object_is_superuser(user) or user_is_hostpolicy_adminuser(user) class IsSuperOrHostPolicyAdminOrReadOnly(IsAuthenticated): diff --git a/mreg/api/errors.py b/mreg/api/errors.py deleted file mode 100644 index 638c068f..00000000 --- a/mreg/api/errors.py +++ /dev/null @@ -1,5 +0,0 @@ -from rest_framework import (exceptions, status) - - -class ValidationError409(exceptions.APIException): - status_code = status.HTTP_409_CONFLICT diff --git a/mreg/api/exceptions.py b/mreg/api/exceptions.py new file mode 100644 index 00000000..bade3cbe --- /dev/null +++ b/mreg/api/exceptions.py @@ -0,0 +1,32 @@ +from rest_framework import (exceptions, status) + +from typing import Any + +class CustomAPIExceptionError(exceptions.APIException): + def __init__(self, detail: Any = None): + detail = {"ERROR": detail if detail is not None else self.default_detail} + super().__init__(detail) + + +class ValidationError400(CustomAPIExceptionError): + status_code = 400 + default_detail:str = 'Bad Request' + +class ValidationError403(CustomAPIExceptionError): + status_code = status.HTTP_403_FORBIDDEN + default_detail:str = 'Forbidden' + +class ValidationError404(CustomAPIExceptionError): + status_code = status.HTTP_404_NOT_FOUND + default_detail:str = 'Not Found' + +class ValidationError409(CustomAPIExceptionError): + status_code = status.HTTP_409_CONFLICT + default_detail:str = 'Conflict' + +class InternalServerError500(CustomAPIExceptionError): + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + default_detail:str = 'Internal Server Error' + +class NoIpAddressesError404(ValidationError404): + default_detail:str = 'No free ip addresses found in the network.' \ No newline at end of file diff --git a/mreg/api/permissions.py b/mreg/api/permissions.py index 22b46f29..0aca1196 100644 --- a/mreg/api/permissions.py +++ b/mreg/api/permissions.py @@ -2,10 +2,15 @@ from rest_framework import exceptions from rest_framework.permissions import IsAuthenticated, SAFE_METHODS +from typing import cast, TYPE_CHECKING + from mreg.api.v1.serializers import HostSerializer from mreg.models.host import HostGroup from mreg.models.network import NetGroupRegexPermission, Network +if TYPE_CHECKING: + from mreg.models.auth import User + NETWORK_ADMIN_GROUP = 'NETWORK_ADMIN_GROUP' SUPERUSER_GROUP = 'SUPERUSER_GROUP' ADMINUSER_GROUP = 'ADMINUSER_GROUP' @@ -36,24 +41,23 @@ def _list_in_list(list_a, list_b): return any(i in list_b for i in list_a) -def user_is_superuser(user): +def user_object_is_superuser(user: "User") -> bool: return user_in_settings_group(user, 'SUPERUSER_GROUP') - -def user_is_adminuser(user): +def user_object_is_adminuser(user: "User") -> bool: return user_in_settings_group(user, 'ADMINUSER_GROUP') - -def user_is_group_adminuser(user): +def user_object_is_group_adminuser(user: "User") -> bool: return user_in_settings_group(user, 'GROUPADMINUSER_GROUP') +def user_object_is_network_adminuser(user: "User") -> bool: + return user_in_settings_group(user, 'NETWORK_ADMIN_GROUP') -def is_super_or_admin(user): - return user_is_superuser(user) or user_is_adminuser(user) - +def is_super_or_admin(user: "User") -> bool: + return user_object_is_superuser(user) or user_object_is_adminuser(user) -def is_super_or_group_admin(user): - return user_is_superuser(user) or user_is_group_adminuser(user) +def is_super_or_group_admin(user: "User") -> bool: + return user_object_is_superuser(user) or user_object_is_group_adminuser(user) class IsAuthenticatedAndReadOnly(IsAuthenticated): @@ -84,7 +88,7 @@ def has_permission(self, request, view): return False if request.method in SAFE_METHODS: return True - return is_super_or_admin(request.user) + return is_super_or_admin(cast("User", request.user)) class IsSuperOrNetworkAdminMember(IsAuthenticated): @@ -97,7 +101,7 @@ def has_permission(self, request, view): import mreg.api.v1.views if not super().has_permission(request, view): return False - if user_is_superuser(request.user): + if user_object_is_superuser(cast("User", request.user)): return True if request_in_settings_group(request, NETWORK_ADMIN_GROUP): if isinstance(view, mreg.api.v1.views.NetworkDetail): @@ -122,7 +126,7 @@ def has_permission(self, request, view): return False if request.method in SAFE_METHODS: return True - return is_super_or_group_admin(request.user) + return is_super_or_group_admin(cast("User", request.user)) def _deny_superuser_only_names(data=None, name=None, view=None, request=None): @@ -135,6 +139,9 @@ def _deny_superuser_only_names(data=None, name=None, view=None, request=None): if 'host' in data: name = data['host'].name + if name is None: + return False + # Underscore is allowed for non-superuser in SRV records, # and for members of in all records. if '_' in name and not isinstance(view, (mreg.api.v1.views.SrvDetail, @@ -150,8 +157,10 @@ def _deny_superuser_only_names(data=None, name=None, view=None, request=None): return False -def is_reserved_ip(ip): - network = Network.objects.filter(network__net_contains=ip).first() +def is_reserved_ip(ip, network=None): + if network is None: + network = Network.objects.filter(network__net_contains=ip).first() + if network: return any(ip == str(i) for i in network.get_reserved_ipaddresses()) return False @@ -174,17 +183,18 @@ class IsGrantedNetGroupRegexPermission(IsAuthenticated): """ def has_permission(self, request, view): + user = cast("User", request.user) # This method is called before the view is executed, so # just do some preliminary checks. if not super().has_permission(request, view): return False if request.method in SAFE_METHODS: return True - if is_super_or_admin(request.user): + if is_super_or_admin(user): return True # Will do do more object checks later, but initially refuse any # unwarranted requests. - if NetGroupRegexPermission.objects.filter(group__in=request.user.group_list + if NetGroupRegexPermission.objects.filter(group__in=user.group_list ).exists(): return True return False @@ -200,7 +210,7 @@ def has_obj_perm(self, user, obj): def has_create_permission(self, request, view, validated_serializer): import mreg.api.v1.views - if user_is_superuser(request.user): + if user_object_is_superuser(request.user): return True hostname = None @@ -211,7 +221,7 @@ def has_create_permission(self, request, view, validated_serializer): if 'ipaddress' in data: if _deny_reserved_ipaddress(data['ipaddress'], request): return False - if user_is_adminuser(request.user): + if user_object_is_adminuser(request.user): return True if isinstance(view, (mreg.api.v1.views.IpaddressList, mreg.api.v1.views.PtrOverrideList)): @@ -245,7 +255,7 @@ def has_create_permission(self, request, view, validated_serializer): def has_destroy_permission(self, request, view, validated_serializer): import mreg.api.v1.views - if user_is_superuser(request.user): + if user_object_is_superuser(request.user): return True obj = view.get_object() if isinstance(view, mreg.api.v1.views.HostDetail): @@ -261,13 +271,13 @@ def has_destroy_permission(self, request, view, validated_serializer): if hasattr(obj, 'ipaddress'): if _deny_reserved_ipaddress(obj.ipaddress, request): return False - if user_is_adminuser(request.user): + if user_object_is_adminuser(request.user): return True return self.has_obj_perm(request.user, obj) def has_update_permission(self, request, view, validated_serializer): import mreg.api.v1.views - if user_is_superuser(request.user): + if user_object_is_superuser(request.user): return True data = validated_serializer.validated_data if _deny_superuser_only_names(data=data, view=view, request=request): @@ -275,7 +285,7 @@ def has_update_permission(self, request, view, validated_serializer): if 'ipaddress' in data: if _deny_reserved_ipaddress(data['ipaddress'], request): return False - if user_is_adminuser(request.user): + if user_object_is_adminuser(request.user): return True obj = view.get_object() if isinstance(view, mreg.api.v1.views.HostDetail): @@ -308,17 +318,18 @@ def _get_hostname_and_ips(self, hostobject): class HostGroupPermission(IsAuthenticated): def has_permission(self, request, view): + user = cast("User", request.user) # This method is called before the view is executed, so # just do some preliminary checks. if not super().has_permission(request, view): return False if request.method in SAFE_METHODS: return True - if is_super_or_group_admin(request.user): + if is_super_or_group_admin(user): return True # Will do do more object checks later, but initially refuse any # unwarranted requests. - if HostGroup.objects.filter(owners__name__in=request.user.group_list).exists(): + if HostGroup.objects.filter(owners__name__in=user.group_list).exists(): return True return False diff --git a/mreg/api/v1/serializers.py b/mreg/api/v1/serializers.py index fb2050a1..dffefc67 100644 --- a/mreg/api/v1/serializers.py +++ b/mreg/api/v1/serializers.py @@ -2,6 +2,7 @@ from django.contrib.auth.models import Group from django.utils import timezone +from django.db import transaction from rest_framework import serializers @@ -14,8 +15,7 @@ from mreg.utils import (nonify, normalize_mac) from mreg.validators import (validate_keys, validate_normalizeable_mac_address) -from mreg.api.errors import ValidationError409 - +from mreg.api.exceptions import ValidationError409, ValidationError400 class ValidationMixin: """Provides standard validation of data fields""" @@ -363,18 +363,74 @@ class Meta: model = HostGroup fields = '__all__' - def _validate_ip_not_in_network_excluded_range(ip): if ip is None: return - qs = NetworkExcludedRange.objects.filter(start_ip__lte=ip, - end_ip__gte=ip) + qs = NetworkExcludedRange.objects.filter(start_ip__lte=ip, end_ip__gte=ip) if qs.exists(): - raise serializers.ValidationError( - f"IP {ip} in an excluded range: {qs.first()}") + raise ValidationError400(f"IP {ip} in an excluded range: {qs.first()}") class LabelSerializer(serializers.ModelSerializer): class Meta: model = Label fields = '__all__' + +class HostCreateSerializer(serializers.ModelSerializer): + name = serializers.CharField(required=True) + ipaddress = serializers.CharField(write_only=True, required=False) + network = serializers.CharField(write_only=True, required=False) + allocation_method = serializers.CharField(write_only=True, required=False) + + class Meta: + model = Host + fields = ['id', 'name', 'ipaddress', 'network', 'allocation_method'] + extra_kwargs = {'id': {'read_only': True}} + + def validate_name(self, value): + # Check if name is already in use in Host + if Host.objects.filter(name=value).exists(): + raise ValidationError409("name already in use") + # Check if name is already in use as a CNAME + if Cname.objects.filter(name=value).exists(): + raise ValidationError409("name already in use as a cname") + return value + + def validate_ipaddress(self, value): + try: + ipaddress.ip_address(value) + except ValueError as error: + raise ValidationError400(str(error)) + + return value + + def validate(self, data): + # No need to call super().validate(data) since we're handling all validations here + ipaddress = data.get('ipaddress') + network = data.get('network') + allocation_method = data.get('allocation_method') + + # 'ipaddress' and 'network' are mutually exclusive + if ipaddress and network: + raise ValidationError400("'ipaddress' and 'network' is mutually exclusive") + + # 'allocation_method' is only allowed with 'network' + if allocation_method and not network: + raise ValidationError400("allocation_method is only allowed with 'network'") + + return data + + def create(self, validated_data): + ipaddress = validated_data.pop('ipaddress', None) + + # Start atomic transaction + with transaction.atomic(): + host = Host.objects.create(**validated_data) + + if ipaddress: + self.validate_ipaddress(ipaddress) + Ipaddress.objects.create(host=host, ipaddress=ipaddress) + else: + pass + + return host \ No newline at end of file diff --git a/mreg/api/v1/tests/test_zonefile.py b/mreg/api/v1/tests/test_zonefile.py index 5db7ced5..2c338c11 100644 --- a/mreg/api/v1/tests/test_zonefile.py +++ b/mreg/api/v1/tests/test_zonefile.py @@ -166,10 +166,12 @@ def test_excluding_private_addresses(self): {'name': 'delta', 'ip': '129.240.130.240', 'private': False}, {'name': 'echo', 'ip': '2001:700:100:4003::29', 'private': False} ] + print("Initial:", self.forward) for h in testhosts: - self._add_host('{}.{}'.format(h['name'], self.forward.name), h['ip']) + print(self._add_host('{}.{}'.format(h['name'], self.forward.name), h['ip'])) # get the forward zone file, verify it contains both private and non-private addresses data = self._get_zone(self.forward) + print("Zone after:", data) for h in testhosts: self.assertIn(h['ip'], data) # get the forward zone file but with private addresses excluded, verify it contains only public addresses diff --git a/mreg/api/v1/tests/tests.py b/mreg/api/v1/tests/tests.py index 60f09d0a..68e94166 100644 --- a/mreg/api/v1/tests/tests.py +++ b/mreg/api/v1/tests/tests.py @@ -92,6 +92,8 @@ def _assert_post_and_status(self, path, status_code, data=None, client=None): if client is None: client = self.client response = client.post(self._create_path(path), data) + if response.status_code != status_code: + print(f"Expected {status_code}. got {response.status_code}: {response.data}") self.assertEqual(response.status_code, status_code) return response diff --git a/mreg/api/v1/views.py b/mreg/api/v1/views.py index 115161d4..a33f8418 100644 --- a/mreg/api/v1/views.py +++ b/mreg/api/v1/views.py @@ -1,6 +1,7 @@ import bisect import ipaddress from collections import Counter, defaultdict +from typing import cast from django.db import transaction from django.db.models import Prefetch @@ -17,8 +18,10 @@ from mreg.models.base import NameServer, History from mreg.models.host import Host, Ipaddress, PtrOverride from mreg.models.network import Network, NetGroupRegexPermission -from mreg.models.resource_records import Cname, Loc, Naptr, Srv, Sshfp, Txt, Hinfo, Mx from mreg.types import IPAllocationMethod +from mreg.models.resource_records import Cname, Loc, Naptr, Srv, Sshfp, Txt, Hinfo, Mx +from mreg.models.auth import User +from mreg.api.exceptions import NoIpAddressesError404, ValidationError400, ValidationError404 from mreg.api.permissions import ( IsAuthenticatedAndReadOnly, @@ -64,6 +67,7 @@ SrvSerializer, SshfpSerializer, TxtSerializer, + HostCreateSerializer, ) from mreg.mixins import LowerCaseLookupMixin @@ -295,99 +299,78 @@ def get_queryset(self): qs = _host_prefetcher(super().get_queryset()) return HostFilterSet(data=self.request.GET, queryset=qs).qs - def post(self, request, *args, **kwargs): - if "name" in request.data: - if self.queryset.filter(name=request.data["name"]).exists(): - content = {"ERROR": "name already in use"} - return Response(content, status=status.HTTP_409_CONFLICT) + def get_serializer_class(self): + if self.request.method == 'POST': + return HostCreateSerializer + return HostSerializer - if "ipaddress" in request.data and "network" in request.data: - content = {"ERROR": "'ipaddress' and 'network' is mutually exclusive"} - return Response(content, status=status.HTTP_400_BAD_REQUEST) + def post(self, request, *args, **kwargs): + ip_address = request.data.get('ipaddress') + network = request.data.get('network') + allocation_method = request.data.get('allocation_method') + name = request.data.get('name') + user = cast(User, request.user) - if "allocation_method" in request.data and "network" not in request.data: - return Response( - {"ERROR": "allocation_method is only allowed with 'network'"}, - status=status.HTTP_400_BAD_REQUEST) + print(f"ip_address: {ip_address} name: {name} network: {network} allocation_method: {allocation_method}") - # request.data is immutable - hostdata = request.data.copy() + if not name: + raise ValidationError400("You must specify a name for the new host.") - # Hostdata *may* be MultiValueDict, which means that pop will return a list, even if get - # would return a single value... + user.is_permitted_to_use_dnsname_or_raise(name) - if "network" in hostdata: - network_key = hostdata.pop("network") - if isinstance(network_key, list): - network_key = network_key[0] + if ip_address and network: + raise ValidationError400("You can't specify both 'ipaddress' and 'network'") + + if allocation_method and not network: + raise ValidationError400("You must specify a network if you want to use 'allocation_method'") + if network: + # Check that we have a valid network try: - ipaddress.ip_network(network_key) + ipaddress.ip_network(network) except ValueError as error: - content = {"ERROR": str(error)} - return Response(content, status=status.HTTP_400_BAD_REQUEST) - - network = Network.objects.filter(network=network_key).first() - if not network: - content = {"ERROR": "no such network"} - return Response(content, status=status.HTTP_404_NOT_FOUND) + raise ValidationError400(str(error)) try: - allocation_key = hostdata.pop("allocation_method", IPAllocationMethod.FIRST.value) - if isinstance(allocation_key, list): - allocation_key = allocation_key[0] - request_ip_allocator = IPAllocationMethod(allocation_key.lower()) - except ValueError: - options = [method.value for method in IPAllocationMethod] - content = {"ERROR": f"allocation_method must be one of {', '.join(options)}"} - return Response(content, status=status.HTTP_400_BAD_REQUEST) - - if request_ip_allocator == IPAllocationMethod.RANDOM: - ip = network.get_random_unused() + net = Network.objects.get(network=network) + except Network.DoesNotExist: + raise ValidationError404(f"Network {network} not found.") + + if allocation_method: + try: + allocation_method_enum = IPAllocationMethod(allocation_method.lower()) + except ValueError: + options = [method.value for method in IPAllocationMethod] + raise ValidationError400(f"allocation_method must be one of {', '.join(options)}") else: - ip = network.get_first_unused() - - if not ip: - content = {"ERROR": "no available IP in network"} - return Response(content, status=status.HTTP_404_NOT_FOUND) - - hostdata["ipaddress"] = ip - - if "ipaddress" in hostdata: - ipkey = hostdata.pop("ipaddress") - if isinstance(ipkey, list): - ipkey = ipkey[0] - - host = Host() - hostserializer = HostSerializer(host, data=hostdata) - - if hostserializer.is_valid(raise_exception=True): - with transaction.atomic(): - # XXX: must fix. perform_creates failes as it has no ipaddress and the - # the permissions fails for most users. Maybe a nested serializer should fix it? - # self.perform_create(hostserializer) - hostserializer.save() - self.save_log_create(hostserializer) - ipdata = {"host": host.pk, "ipaddress": ipkey} - ip = Ipaddress() - ipserializer = IpaddressSerializer(ip, data=ipdata) - ipserializer.is_valid(raise_exception=True) - self.perform_create(ipserializer) - location = request.path + host.name - return Response( - status=status.HTTP_201_CREATED, - headers={"Location": location}, - ) + allocation_method_enum = IPAllocationMethod.FIRST + + if allocation_method_enum == IPAllocationMethod.RANDOM: + ip_address = net.get_random_unused() + else: + ip_address = net.get_first_unused() + + if not ip_address: + raise NoIpAddressesError404("No free ip addresses found in {net}.") + + if not ip_address: + user.is_permitted_to_create_host_without_ipaddress_or_raise() + serializer = self.get_serializer(data={"name": name}) else: - host = Host() - hostserializer = HostSerializer(host, data=hostdata) - if hostserializer.is_valid(raise_exception=True): - self.perform_create(hostserializer) - location = request.path + host.name - return Response( - status=status.HTTP_201_CREATED, headers={"Location": location} - ) + serializer = self.get_serializer(data={"ipaddress": ip_address, "name": name}) + user.is_permitted_to_use_ipaddress_or_raise(ip_address) + + serializer.is_valid(raise_exception=True) + + with transaction.atomic(): + host = serializer.save() + self.save_log_create(serializer) + location = f"{request.path}{host.name}" + return Response( + status=status.HTTP_201_CREATED, + headers={"Location": location}, + ) class HostDetail(HostPermissionsUpdateDestroy, LowerCaseLookupMixin, diff --git a/mreg/models/auth.py b/mreg/models/auth.py index 29e08c83..f653145f 100644 --- a/mreg/models/auth.py +++ b/mreg/models/auth.py @@ -1,5 +1,17 @@ from django.contrib.auth.models import AbstractUser +from mreg.api.exceptions import ValidationError403, ValidationError400 +from mreg.models.network import Network, NetworkExcludedRange, NetGroupRegexPermission +from mreg.api.permissions import ( + user_object_is_adminuser, + user_object_is_group_adminuser, + user_object_is_superuser, + user_object_is_network_adminuser, + user_in_settings_group, + is_reserved_ip, + DNS_UNDERSCORE_GROUP, + DNS_WILDCARD_GROUP, +) class User(AbstractUser): @@ -10,3 +22,136 @@ def group_list(self): if self._group_list is None: self._group_list = list(self.groups.values_list("name", flat=True)) return self._group_list + + @property + def is_mreg_admin(self): + return user_object_is_adminuser(self) + + @property + def is_mreg_group_admin(self): + return user_object_is_group_adminuser(self) + + @property + def is_mreg_superuser(self): + return user_object_is_superuser(self) + + @property + def is_mreg_network_admin(self): + return user_object_is_network_adminuser(self) + + @property + def is_mreg_network_admin_or_admin(self): + return self.is_mreg_network_admin or self.is_mreg_admin + + @property + def is_mreg_admin_or_superuser(self): + return self.is_mreg_admin or self.is_mreg_superuser + + def is_permitted_to_create_host_without_ipaddress_or_raise(self): + """Check if a user is permitted to create a host without an IP address. + + A user is permitted to create a host without an IP address if: + - The user is an admin user. + - The user is a network admin user. + - The user is a superuser. + + :raises: ValidationError403 if the user is not permitted to create a host without an IP address. + + :return: Nothing + """ + if not (self.is_mreg_admin_or_superuser or self.is_mreg_network_admin): + raise ValidationError403("Only admins, superusers, and network admins can create a host without an IP address.") + + return + + def is_permitted_to_use_dnsname_or_raise(self, dnsname: str): + """Check if a user is permitted to use a DNS name. + + A user is permitted to use a DNS name if: + - The name does not contain underscores or asterisks. + - The name contains an underscore and the user is in the DNS_UNDERSCORE_GROUP or is a superuser. + - The name contains an asterisk and the user is in the DNS_WILDCARD_GROUP and the name + has more than than 3 dots, *or* the user is a superuser. + + :param dnsname: The DNS name to check, as a string. + + :raises: ValidationError403 if the user is not permitted to use the DNS name, with a message explaining why. + + :return: Nothing + """ + if '_' not in dnsname and '*' not in dnsname: + return + + if '_' in dnsname and not (user_in_settings_group(self, DNS_UNDERSCORE_GROUP) or self.is_mreg_superuser): + raise ValidationError403("The DNS name contains an underscore, only allowed for DNS_UNDERSCORE_GROUP or superusers.") + + if '*' in dnsname: + if not (user_in_settings_group(self, DNS_WILDCARD_GROUP) or self.is_mreg_superuser): + raise ValidationError403("The DNS name contains an asterisk, only allowd for DNS_WILDCARD_GROUP or superusers.") + + if dnsname.count('.') < 3 and not self.is_mreg_superuser: + raise ValidationError403("The DNS name contains an asterisk, but it must have more than 3 dots.") + + return + + def is_permitted_to_use_ipaddress_or_raise(self, ipaddress: str): + """Check if a user is permitted to use a given IP address. + + A user is permitted to use an IP if: + - The IP is not reserved. + - The IP is not in a frozen network. + - The IP is not in an excluded range. + + For other cases the following rules apply: + - If the network is frozen, noone is permitted to use the IP address. + - If the IP address is reserved, only superusers and network admins are permitted to use it. + - If the IP address is in an excluded range, noone is permitted to use it. + - If the IP address is not in a network, admins, superusers, and network admins are permitted to use it, and + users may use the IP address if they have a permission entry for range containing the IP address. + + :param ip: The IP address to check, as a string. + + :raises: ValidationError403 if the user is not permitted to use the IP address, with a message explaining why. + + :return: Nothing + """ + network = Network.objects.filter(network__net_contains=ipaddress).first() + if network: + if network.frozen: + raise ValidationError403("The network is frozen.") + + if is_reserved_ip(ipaddress, network) and not (self.is_mreg_superuser or self.is_mreg_network_admin): + raise ValidationError403("The IP address is reserved, only superusers and network admins can use it.") + + if NetworkExcludedRange.objects.filter(start_ip__lte=ipaddress, end_ip__gte=ipaddress).exists(): + raise ValidationError400("The IP address is in an excluded range, no-one can use it.") + + return + + # The IP address is not contained in a network known to mreg, but the user may still have a permission + # entry for the range containing the IP address. This is a very weird use case, but it is used in + # test_can_create_change_and_delete_host in test_host_permissions.py # 57. + # This test maintains feature parity with the original code. + if NetGroupRegexPermission.objects.filter(group__in=self.group_list).filter(range__net_contains=ipaddress).exists(): + return + + # We now know that the IP address is not in a network, and that the user does not have a permission mask + # that would allow them to use the IP address. In this case, only admins, superusers, and network admins + # are permitted to use the IP address. + if self.is_mreg_admin_or_superuser or self.is_mreg_network_admin: + return + else: + raise ValidationError403("The IP address is not in a network, only admins, superusers, and network admins can use it.") + + def display(self): + permissions_list = [] + if self.is_mreg_admin: + permissions_list.append("Admin") + if self.is_mreg_superuser: + permissions_list.append("Superuser") + if self.is_mreg_group_admin: + permissions_list.append("Group Admin") + if self.is_mreg_network_admin: + permissions_list.append("Network Admin") + + return f"{self.username} ({', '.join(permissions_list)} from groups: {', '.join(self.group_list)})" diff --git a/mreg/signals.py b/mreg/signals.py index 0a9a9afc..bc855540 100644 --- a/mreg/signals.py +++ b/mreg/signals.py @@ -1,5 +1,5 @@ -import functools import re +import functools from django.conf import settings from django.contrib.auth import get_user_model @@ -117,7 +117,8 @@ def _create_ptr_if_ipaddress_in_use(): def _common_update_zone(signal, sender, instance): @functools.lru_cache() def _get_zone_for_ip(ip): - return ReverseZone.get_zone_by_ip(ip) + zone = ReverseZone.get_zone_by_ip(ip) + return zone zones = set() @@ -133,7 +134,7 @@ def _get_zone_for_ip(ip): oldzone = Host.objects.get(id=instance.host.id).zone zones.add(oldzone) - if sender in (Ipaddress, PtrOverride): + if sender in (Ipaddress, PtrOverride): zone = _get_zone_for_ip(instance.ipaddress) zones.add(zone) diff --git a/mreg/types.py b/mreg/types.py index 80d57585..fe146106 100644 --- a/mreg/types.py +++ b/mreg/types.py @@ -3,3 +3,4 @@ class IPAllocationMethod(Enum): RANDOM = "random" FIRST = "first" + diff --git a/pyproject.toml b/pyproject.toml index e06d4672..62c45a17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ py-modules = ["mreg", "mregsite", "hostpolicy"] [tool.ruff] # https://beta.ruff.rs/docs/rules/ select = ["E", "F"] -line-length = 119 +line-length = 145 exclude = [ "mreg/migrations/", "hostpolicy/migrations/", diff --git a/requirements-dev.txt b/requirements-dev.txt index 9d643313..71f6fa56 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,2 +1,3 @@ coverage coveralls +unittest_parametrize \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt index e4ddd128..b1d4f351 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,12 +1,12 @@ -Django>4 -djangorestframework==3.14.0 -django-auth-ldap==4.8.0 +Django>5 +djangorestframework==3.15.2 +django-auth-ldap==5.1.0 django-logging-json==1.15 django-netfields==1.3.2 django-filter==24.3 structlog==24.4.0 rich==13.7.1 -gunicorn==22.0.0 +gunicorn==23.0.0 idna==3.7 psycopg2-binary==2.9.9 pika==1.3.2 diff --git a/requirements.txt b/requirements.txt index b2d47ca9..362eb0c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ -Django==5.0.8 -djangorestframework==3.14.0 -django-auth-ldap==4.8.0 +Django==5.1.3 +djangorestframework==3.15.2 +django-auth-ldap==5.1.0 django-netfields==1.3.2 django-filter==24.3 structlog==24.4.0 rich==13.7.1 -gunicorn==22.0.0 +gunicorn==23.0.0 idna==3.7 psycopg2-binary==2.9.9 pika==1.3.2