Skip to content

Commit

Permalink
First pass, updated releavant dependencies.
Browse files Browse the repository at this point in the history
  - Signals or something is broken, zone generation fails.
  - Probably other issues as well.
  • Loading branch information
terjekv committed Nov 11, 2024
1 parent 7182664 commit 1390dfe
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 133 deletions.
4 changes: 2 additions & 2 deletions hostpolicy/api/permissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from rest_framework.permissions import IsAuthenticated, SAFE_METHODS

from mreg.api.permissions import user_is_superuser, user_in_settings_group
from mreg.api.permissions import user_object_is_superuser, user_in_settings_group

from mreg.models.host import Host
from mreg.models.network import NetGroupRegexPermission
Expand All @@ -12,7 +12,7 @@ def user_is_hostpolicy_adminuser(user):


def is_super_or_hostpolicy_admin(user):
return user_is_superuser(user) or user_is_hostpolicy_adminuser(user)
return user_object_is_superuser(user) or user_is_hostpolicy_adminuser(user)


class IsSuperOrHostPolicyAdminOrReadOnly(IsAuthenticated):
Expand Down
5 changes: 0 additions & 5 deletions mreg/api/errors.py

This file was deleted.

32 changes: 32 additions & 0 deletions mreg/api/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from rest_framework import (exceptions, status)

from typing import Any

class CustomAPIExceptionError(exceptions.APIException):
def __init__(self, detail: Any = None):
detail = {"ERROR": detail if detail is not None else self.default_detail}
super().__init__(detail)


class ValidationError400(CustomAPIExceptionError):
status_code = 400
default_detail:str = 'Bad Request'

class ValidationError403(CustomAPIExceptionError):
status_code = status.HTTP_403_FORBIDDEN
default_detail:str = 'Forbidden'

class ValidationError404(CustomAPIExceptionError):
status_code = status.HTTP_404_NOT_FOUND
default_detail:str = 'Not Found'

class ValidationError409(CustomAPIExceptionError):
status_code = status.HTTP_409_CONFLICT
default_detail:str = 'Conflict'

class InternalServerError500(CustomAPIExceptionError):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail:str = 'Internal Server Error'

class NoIpAddressesError404(ValidationError404):
default_detail:str = 'No free ip addresses found in the network.'
61 changes: 36 additions & 25 deletions mreg/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
from rest_framework import exceptions
from rest_framework.permissions import IsAuthenticated, SAFE_METHODS

from typing import cast, TYPE_CHECKING

from mreg.api.v1.serializers import HostSerializer
from mreg.models.host import HostGroup
from mreg.models.network import NetGroupRegexPermission, Network

if TYPE_CHECKING:
from mreg.models.auth import User

NETWORK_ADMIN_GROUP = 'NETWORK_ADMIN_GROUP'
SUPERUSER_GROUP = 'SUPERUSER_GROUP'
ADMINUSER_GROUP = 'ADMINUSER_GROUP'
Expand Down Expand Up @@ -36,24 +41,23 @@ def _list_in_list(list_a, list_b):
return any(i in list_b for i in list_a)


def user_is_superuser(user):
def user_object_is_superuser(user: "User") -> bool:
return user_in_settings_group(user, 'SUPERUSER_GROUP')


def user_is_adminuser(user):
def user_object_is_adminuser(user: "User") -> bool:
return user_in_settings_group(user, 'ADMINUSER_GROUP')


def user_is_group_adminuser(user):
def user_object_is_group_adminuser(user: "User") -> bool:
return user_in_settings_group(user, 'GROUPADMINUSER_GROUP')

def user_object_is_network_adminuser(user: "User") -> bool:
return user_in_settings_group(user, 'NETWORK_ADMIN_GROUP')

def is_super_or_admin(user):
return user_is_superuser(user) or user_is_adminuser(user)

def is_super_or_admin(user: "User") -> bool:
return user_object_is_superuser(user) or user_object_is_adminuser(user)

def is_super_or_group_admin(user):
return user_is_superuser(user) or user_is_group_adminuser(user)
def is_super_or_group_admin(user: "User") -> bool:
return user_object_is_superuser(user) or user_object_is_group_adminuser(user)


class IsAuthenticatedAndReadOnly(IsAuthenticated):
Expand Down Expand Up @@ -84,7 +88,7 @@ def has_permission(self, request, view):
return False
if request.method in SAFE_METHODS:
return True
return is_super_or_admin(request.user)
return is_super_or_admin(cast("User", request.user))


class IsSuperOrNetworkAdminMember(IsAuthenticated):
Expand All @@ -97,7 +101,7 @@ def has_permission(self, request, view):
import mreg.api.v1.views
if not super().has_permission(request, view):
return False
if user_is_superuser(request.user):
if user_object_is_superuser(cast("User", request.user)):
return True
if request_in_settings_group(request, NETWORK_ADMIN_GROUP):
if isinstance(view, mreg.api.v1.views.NetworkDetail):
Expand All @@ -122,7 +126,7 @@ def has_permission(self, request, view):
return False
if request.method in SAFE_METHODS:
return True
return is_super_or_group_admin(request.user)
return is_super_or_group_admin(cast("User", request.user))


def _deny_superuser_only_names(data=None, name=None, view=None, request=None):
Expand All @@ -135,6 +139,9 @@ def _deny_superuser_only_names(data=None, name=None, view=None, request=None):
if 'host' in data:
name = data['host'].name

if name is None:
return False

# Underscore is allowed for non-superuser in SRV records,
# and for members of <DNS_UNDERSCORE_GROUP> in all records.
if '_' in name and not isinstance(view, (mreg.api.v1.views.SrvDetail,
Expand All @@ -150,8 +157,10 @@ def _deny_superuser_only_names(data=None, name=None, view=None, request=None):
return False


def is_reserved_ip(ip):
network = Network.objects.filter(network__net_contains=ip).first()
def is_reserved_ip(ip, network=None):
if network is None:
network = Network.objects.filter(network__net_contains=ip).first()

if network:
return any(ip == str(i) for i in network.get_reserved_ipaddresses())
return False
Expand All @@ -174,17 +183,18 @@ class IsGrantedNetGroupRegexPermission(IsAuthenticated):
"""

def has_permission(self, request, view):
user = cast("User", request.user)
# This method is called before the view is executed, so
# just do some preliminary checks.
if not super().has_permission(request, view):
return False
if request.method in SAFE_METHODS:
return True
if is_super_or_admin(request.user):
if is_super_or_admin(user):
return True
# Will do do more object checks later, but initially refuse any
# unwarranted requests.
if NetGroupRegexPermission.objects.filter(group__in=request.user.group_list
if NetGroupRegexPermission.objects.filter(group__in=user.group_list
).exists():
return True
return False
Expand All @@ -200,7 +210,7 @@ def has_obj_perm(self, user, obj):
def has_create_permission(self, request, view, validated_serializer):
import mreg.api.v1.views

if user_is_superuser(request.user):
if user_object_is_superuser(request.user):
return True

hostname = None
Expand All @@ -211,7 +221,7 @@ def has_create_permission(self, request, view, validated_serializer):
if 'ipaddress' in data:
if _deny_reserved_ipaddress(data['ipaddress'], request):
return False
if user_is_adminuser(request.user):
if user_object_is_adminuser(request.user):
return True
if isinstance(view, (mreg.api.v1.views.IpaddressList,
mreg.api.v1.views.PtrOverrideList)):
Expand Down Expand Up @@ -245,7 +255,7 @@ def has_create_permission(self, request, view, validated_serializer):
def has_destroy_permission(self, request, view, validated_serializer):
import mreg.api.v1.views

if user_is_superuser(request.user):
if user_object_is_superuser(request.user):
return True
obj = view.get_object()
if isinstance(view, mreg.api.v1.views.HostDetail):
Expand All @@ -261,21 +271,21 @@ def has_destroy_permission(self, request, view, validated_serializer):
if hasattr(obj, 'ipaddress'):
if _deny_reserved_ipaddress(obj.ipaddress, request):
return False
if user_is_adminuser(request.user):
if user_object_is_adminuser(request.user):
return True
return self.has_obj_perm(request.user, obj)

def has_update_permission(self, request, view, validated_serializer):
import mreg.api.v1.views
if user_is_superuser(request.user):
if user_object_is_superuser(request.user):
return True
data = validated_serializer.validated_data
if _deny_superuser_only_names(data=data, view=view, request=request):
return False
if 'ipaddress' in data:
if _deny_reserved_ipaddress(data['ipaddress'], request):
return False
if user_is_adminuser(request.user):
if user_object_is_adminuser(request.user):
return True
obj = view.get_object()
if isinstance(view, mreg.api.v1.views.HostDetail):
Expand Down Expand Up @@ -308,17 +318,18 @@ def _get_hostname_and_ips(self, hostobject):
class HostGroupPermission(IsAuthenticated):

def has_permission(self, request, view):
user = cast("User", request.user)
# This method is called before the view is executed, so
# just do some preliminary checks.
if not super().has_permission(request, view):
return False
if request.method in SAFE_METHODS:
return True
if is_super_or_group_admin(request.user):
if is_super_or_group_admin(user):
return True
# Will do do more object checks later, but initially refuse any
# unwarranted requests.
if HostGroup.objects.filter(owners__name__in=request.user.group_list).exists():
if HostGroup.objects.filter(owners__name__in=user.group_list).exists():
return True
return False

Expand Down
70 changes: 63 additions & 7 deletions mreg/api/v1/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.contrib.auth.models import Group
from django.utils import timezone
from django.db import transaction

from rest_framework import serializers

Expand All @@ -14,8 +15,7 @@

from mreg.utils import (nonify, normalize_mac)
from mreg.validators import (validate_keys, validate_normalizeable_mac_address)
from mreg.api.errors import ValidationError409

from mreg.api.exceptions import ValidationError409, ValidationError400

class ValidationMixin:
"""Provides standard validation of data fields"""
Expand Down Expand Up @@ -363,18 +363,74 @@ class Meta:
model = HostGroup
fields = '__all__'


def _validate_ip_not_in_network_excluded_range(ip):
if ip is None:
return
qs = NetworkExcludedRange.objects.filter(start_ip__lte=ip,
end_ip__gte=ip)
qs = NetworkExcludedRange.objects.filter(start_ip__lte=ip, end_ip__gte=ip)
if qs.exists():
raise serializers.ValidationError(
f"IP {ip} in an excluded range: {qs.first()}")
raise ValidationError400(f"IP {ip} in an excluded range: {qs.first()}")


class LabelSerializer(serializers.ModelSerializer):
class Meta:
model = Label
fields = '__all__'

class HostCreateSerializer(serializers.ModelSerializer):
name = serializers.CharField(required=True)
ipaddress = serializers.CharField(write_only=True, required=False)
network = serializers.CharField(write_only=True, required=False)
allocation_method = serializers.CharField(write_only=True, required=False)

class Meta:
model = Host
fields = ['id', 'name', 'ipaddress', 'network', 'allocation_method']
extra_kwargs = {'id': {'read_only': True}}

def validate_name(self, value):
# Check if name is already in use in Host
if Host.objects.filter(name=value).exists():
raise ValidationError409("name already in use")
# Check if name is already in use as a CNAME
if Cname.objects.filter(name=value).exists():
raise ValidationError409("name already in use as a cname")
return value

def validate_ipaddress(self, value):
try:
ipaddress.ip_address(value)
except ValueError as error:
raise ValidationError400(str(error))

return value

def validate(self, data):
# No need to call super().validate(data) since we're handling all validations here
ipaddress = data.get('ipaddress')
network = data.get('network')
allocation_method = data.get('allocation_method')

# 'ipaddress' and 'network' are mutually exclusive
if ipaddress and network:
raise ValidationError400("'ipaddress' and 'network' is mutually exclusive")

# 'allocation_method' is only allowed with 'network'
if allocation_method and not network:
raise ValidationError400("allocation_method is only allowed with 'network'")

return data

def create(self, validated_data):
ipaddress = validated_data.pop('ipaddress', None)

# Start atomic transaction
with transaction.atomic():
host = Host.objects.create(**validated_data)

if ipaddress:
self.validate_ipaddress(ipaddress)
Ipaddress.objects.create(host=host, ipaddress=ipaddress)
else:
pass

return host
4 changes: 3 additions & 1 deletion mreg/api/v1/tests/test_zonefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ def test_excluding_private_addresses(self):
{'name': 'delta', 'ip': '129.240.130.240', 'private': False},
{'name': 'echo', 'ip': '2001:700:100:4003::29', 'private': False}
]
print("Initial:", self.forward)
for h in testhosts:
self._add_host('{}.{}'.format(h['name'], self.forward.name), h['ip'])
print(self._add_host('{}.{}'.format(h['name'], self.forward.name), h['ip']))
# get the forward zone file, verify it contains both private and non-private addresses
data = self._get_zone(self.forward)
print("Zone after:", data)
for h in testhosts:
self.assertIn(h['ip'], data)
# get the forward zone file but with private addresses excluded, verify it contains only public addresses
Expand Down
2 changes: 2 additions & 0 deletions mreg/api/v1/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def _assert_post_and_status(self, path, status_code, data=None, client=None):
if client is None:
client = self.client
response = client.post(self._create_path(path), data)
if response.status_code != status_code:
print(f"Expected {status_code}. got {response.status_code}: {response.data}")
self.assertEqual(response.status_code, status_code)
return response

Expand Down
Loading

0 comments on commit 1390dfe

Please sign in to comment.