Skip to content

Commit

Permalink
Merge pull request #11009 from rtibbles/facility_id
Browse files Browse the repository at this point in the history
Adds error handling for badly formed UUIDs sent to API endpoints
  • Loading branch information
rtibbles authored Aug 4, 2023
2 parents 234c3de + 398d200 commit d9602d3
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 17 deletions.
7 changes: 7 additions & 0 deletions kolibri/core/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from django.db.backends.signals import connection_created
from django.db.models.query import F
from django.db.utils import DatabaseError
from django_filters.filters import UUIDFilter
from django_filters.rest_framework.filterset import FilterSet

from kolibri.core.sqlite.pragmas import CONNECTION_PRAGMAS
from kolibri.core.sqlite.pragmas import START_PRAGMAS
Expand Down Expand Up @@ -40,6 +42,11 @@ def ready(self):
)
)
self.check_redis_settings()
# Do this to add an automapping from the Morango UUIDField to the UUIDFilter so that it automatically
# maps to this filter when using the UUIDField in a filter.
from morango.models import UUIDField

FilterSet.FILTER_DEFAULTS.update({UUIDField: {"filter_class": UUIDFilter}})
# Register any django apps that may have kolibri plugin
# modules inside them
registered_plugins.register_non_plugins(settings.INSTALLED_APPS)
Expand Down
37 changes: 26 additions & 11 deletions kolibri/core/auth/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime
from datetime import timedelta
from itertools import groupby
from uuid import UUID
from uuid import uuid4

from django.contrib.auth import authenticate
Expand Down Expand Up @@ -33,6 +34,7 @@
from django_filters.rest_framework import DjangoFilterBackend
from django_filters.rest_framework import FilterSet
from django_filters.rest_framework import ModelChoiceFilter
from django_filters.rest_framework import UUIDFilter
from morango.api.permissions import BasicMultiArgumentAuthentication
from morango.constants import transfer_stages
from morango.constants import transfer_statuses
Expand Down Expand Up @@ -163,9 +165,22 @@ def has_object_permission(self, request, view, obj):
return self.has_permission(request, view)


class FacilityDatasetFilter(FilterSet):

facility_id = UUIDFilter(field_name="collection")

class Meta:
model = FacilityDataset
fields = ["facility_id"]


class FacilityDatasetViewSet(ValuesViewset):
permission_classes = (KolibriAuthPermissions,)
filter_backends = (KolibriAuthPermissionsFilter,)
filter_backends = (
KolibriAuthPermissionsFilter,
DjangoFilterBackend,
)
filter_class = FacilityDatasetFilter
serializer_class = FacilityDatasetSerializer

values = (
Expand All @@ -187,13 +202,9 @@ class FacilityDatasetViewSet(ValuesViewset):
field_map = {"allow_guest_access": lambda x: allow_guest_access()}

def get_queryset(self):
queryset = FacilityDataset.objects.filter(
return FacilityDataset.objects.filter(
collection__kind=collection_kinds.FACILITY
)
facility_id = self.request.query_params.get("facility_id", None)
if facility_id is not None:
queryset = queryset.filter(collection__id=facility_id)
return queryset

@decorators.action(methods=["post"], detail=True)
def resetsettings(self, request, pk):
Expand Down Expand Up @@ -327,9 +338,13 @@ class PublicFacilityUserViewSet(ReadOnlyValuesViewset):
}

def get_queryset(self):
facility_id = self.request.query_params.get("facility_id", None)
if facility_id is None:
facility_id = self.request.user.facility_id
facility_id = self.request.query_params.get(
"facility_id", self.request.user.facility_id
)
try:
facility_id = UUID(facility_id).hex
except ValueError:
return self.queryset.none()

# if user has admin rights for the facility returns the list of users
queryset = self.queryset.filter(facility_id=facility_id)
Expand Down Expand Up @@ -809,7 +824,7 @@ def post(self, request):

try:
user = FacilityUser.objects.get(username=username, facility=facility_id)
except ObjectDoesNotExist:
except (ValueError, ObjectDoesNotExist):
raise Http404(error_message)

if user.password != NOT_SPECIFIED:
Expand Down Expand Up @@ -844,7 +859,7 @@ def create(self, request):
unauthenticated_user = FacilityUser.objects.get(
username__iexact=username, facility=facility_id
)
except ObjectDoesNotExist:
except (ValueError, ObjectDoesNotExist):
return Response(
[
{
Expand Down
16 changes: 16 additions & 0 deletions kolibri/core/auth/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,11 @@ def test_public_facilityuser_endpoint(self):
models.FacilityUser.objects.filter(facility_id=self.facility1.id).count(),
len(response.data),
)
for item in response.data:
self.assertEqual(
self.facility1.id,
item["facility"],
)


class UserCreationTestCase(APITestCase):
Expand Down Expand Up @@ -1278,6 +1283,17 @@ def test_return_all_datasets_for_an_admin(self):
response = self.client.get(reverse("kolibri:core:facilitydataset-list"))
self.assertEqual(len(response.data), len(models.FacilityDataset.objects.all()))

def test_filter_facility_id_for_an_admin(self):
self.client.login(username=self.admin.username, password=DUMMY_PASSWORD)
response = self.client.get(
reverse("kolibri:core:facilitydataset-list"),
{"facility_id": self.facility.id},
)
self.assertEqual(
len(response.data),
len(models.FacilityDataset.objects.filter(collection=self.facility.id)),
)

def test_admin_can_edit_dataset_for_which_they_are_admin(self):
self.client.login(username=self.admin.username, password=DUMMY_PASSWORD)
response = self.client.patch(
Expand Down
16 changes: 11 additions & 5 deletions kolibri/core/content/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,11 +980,17 @@ def get_child_ids(self, parent_id, next__gt):
def get_tree_queryset(self, request, pk):
# Get the model for the parent node here - we do this so that we trigger a 404 immediately if the node
# does not exist (or exists but is not available, or is filtered).
parent_id = (
pk
if pk and self.filter_queryset(self.get_queryset()).filter(id=pk).exists()
else None
)
try:
parent_id = (
pk
if pk
and self.filter_queryset(self.get_queryset()).filter(id=pk).exists()
else None
)
except ValueError:
# If the pk is a badly formed uuid, we will get a ValueError here, so we catch it and set to None
# so that it raises a 404 below.
parent_id = None

if parent_id is None:
raise Http404
Expand Down
26 changes: 26 additions & 0 deletions kolibri/core/content/test/test_content_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,15 @@ def test_contentnode_tree_filtered_queryset_node(self):
)
self.assertEqual(response.status_code, 404)

def test_contentnode_tree_bad_pk(self):
response = self.client.get(
reverse(
"kolibri:core:contentnode_tree-detail",
kwargs={"pk": "this is not a UUID"},
)
)
self.assertEqual(response.status_code, 404)

@unittest.skipIf(
getattr(settings, "DATABASES")["default"]["ENGINE"]
== "django.db.backends.postgresql",
Expand Down Expand Up @@ -1113,6 +1122,23 @@ def test_contentnode_ids(self):
for i in range(len(titles)):
self.assertEqual(response.data[i]["title"], titles[i])

def test_contentnode_content_id(self):
node = content.ContentNode.objects.get(title="c2c2")
response = self.client.get(
reverse("kolibri:core:contentnode-list"),
data={"content_id": node.content_id},
)
self.assertEqual(len(response.data), 1)
self.assertEqual(response.data[0]["title"], node.title)

def test_contentnode_bad_content_id(self):
response = self.client.get(
reverse("kolibri:core:contentnode-list"),
data={"content_id": "this is not a uuid"},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)

def test_contentnode_parent(self):
parent = content.ContentNode.objects.get(title="c2")
children = parent.get_children()
Expand Down
2 changes: 1 addition & 1 deletion kolibri/core/logger/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ def _get_session_log(self, session_id, user):
return ContentSessionLog.objects.get(id=session_id, user__isnull=True)
else:
return ContentSessionLog.objects.get(id=session_id, user=user)
except ContentSessionLog.DoesNotExist:
except (ValueError, ContentSessionLog.DoesNotExist):
raise Http404(
"ContentSessionLog with id {} does not exist".format(session_id)
)
Expand Down

0 comments on commit d9602d3

Please sign in to comment.