From 4d9f0e21b89249420706691e78ce67eb9898de69 Mon Sep 17 00:00:00 2001 From: Nikita Manovich Date: Tue, 8 Oct 2019 16:16:03 +0300 Subject: [PATCH] Fix https://github.com/opencv/cvat/issues/750 --- cvat/apps/engine/serializers.py | 20 +++- cvat/apps/engine/tests/test_rest_api.py | 141 +++++++++++------------- cvat/apps/engine/views.py | 34 ++++-- 3 files changed, 109 insertions(+), 86 deletions(-) diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 24a38c1cbf06..53729a0615b7 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -286,6 +286,24 @@ class Meta: read_only_fields = ('created_date', 'updated_date', 'status') ordering = ['-id'] +class BasicUserSerializer(serializers.ModelSerializer): + def validate(self, data): + if hasattr(self, 'initial_data'): + unknown_keys = set(self.initial_data.keys()) - set(self.fields.keys()) + if unknown_keys: + if set(['is_staff', 'is_superuser', 'groups']) & unknown_keys: + message = 'You do not have permissions to access some of' + \ + ' these fields: {}'.format(unknown_keys) + else: + message = 'Got unknown fields: {}'.format(unknown_keys) + raise serializers.ValidationError(message) + return data + + class Meta: + model = User + fields = ('url', 'id', 'username', 'first_name', 'last_name', 'email') + ordering = ['-id'] + class UserSerializer(serializers.ModelSerializer): groups = serializers.SlugRelatedField(many=True, slug_field='name', queryset=Group.objects.all()) @@ -294,7 +312,7 @@ class Meta: model = User fields = ('url', 'id', 'username', 'first_name', 'last_name', 'email', 'groups', 'is_staff', 'is_superuser', 'is_active', 'last_login', - 'date_joined', 'groups') + 'date_joined') read_only_fields = ('last_login', 'date_joined') write_only_fields = ('password', ) ordering = ['-id'] diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index ad6cac4c79ba..63abea42f259 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -462,7 +462,7 @@ def test_api_v1_server_logs_no_auth(self): self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) -class UserListAPITestCase(APITestCase): +class UserAPITestCase(APITestCase): def setUp(self): self.client = APIClient() @@ -470,45 +470,62 @@ def setUp(self): def setUpTestData(cls): create_db_users(cls) + def _check_response(self, user, response, is_full=True): + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["id"], user.id) + self.assertEqual(response.data["username"], user.username) + self.assertEqual(response.data["first_name"], user.first_name) + self.assertEqual(response.data["last_name"], user.last_name) + self.assertEqual(response.data["email"], user.email) + extra_check = self.assertIn if is_full else self.assertNotIn + extra_check("groups", response.data) + extra_check("is_staff", response.data) + extra_check("is_superuser", response.data) + extra_check("is_active", response.data) + extra_check("last_login", response.data) + extra_check("date_joined", response.data) + + +class UserListAPITestCase(UserAPITestCase): def _run_api_v1_users(self, user): with ForceLogin(user, self.client): response = self.client.get('/api/v1/users') return response - def test_api_v1_users_admin(self): - response = self._run_api_v1_users(self.admin) + def _check_response(self, user, response): self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertListEqual( ["admin", "user1", "user2", "user3", "user4", "user5"], [res["username"] for res in response.data["results"]]) + def test_api_v1_users_admin(self): + response = self._run_api_v1_users(self.admin) + self._check_response(self.admin, response) + def test_api_v1_users_user(self): response = self._run_api_v1_users(self.user) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self._check_response(self.user, response) + + def test_api_v1_users_annotator(self): + response = self._run_api_v1_users(self.annotator) + self._check_response(self.annotator, response) + + def test_api_v1_users_observer(self): + response = self._run_api_v1_users(self.observer) + self._check_response(self.observer, response) def test_api_v1_users_no_auth(self): response = self._run_api_v1_users(None) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) -class UserSelfAPITestCase(APITestCase): - def setUp(self): - self.client = APIClient() - - @classmethod - def setUpTestData(cls): - create_db_users(cls) - +class UserSelfAPITestCase(UserAPITestCase): def _run_api_v1_users_self(self, user): with ForceLogin(user, self.client): response = self.client.get('/api/v1/users/self') return response - def _check_response(self, user, response): - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(response.data["username"], user.username) - def test_api_v1_users_self_admin(self): response = self._run_api_v1_users_self(self.admin) self._check_response(self.admin, response) @@ -521,98 +538,54 @@ def test_api_v1_users_self_annotator(self): response = self._run_api_v1_users_self(self.annotator) self._check_response(self.annotator, response) + def test_api_v1_users_self_observer(self): + response = self._run_api_v1_users_self(self.observer) + self._check_response(self.observer, response) def test_api_v1_users_self_no_auth(self): response = self._run_api_v1_users_self(None) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) -class UserGetAPITestCase(APITestCase): - def setUp(self): - self.client = APIClient() - - @classmethod - def setUpTestData(cls): - create_db_users(cls) - +class UserGetAPITestCase(UserAPITestCase): def _run_api_v1_users_id(self, user, user_id): with ForceLogin(user, self.client): response = self.client.get('/api/v1/users/{}'.format(user_id)) return response - def _check_response(self, user, response): - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(response.data["id"], user.id) - self.assertEqual(response.data["username"], user.username) - def test_api_v1_users_id_admin(self): response = self._run_api_v1_users_id(self.admin, self.user.id) - self._check_response(self.user, response) + self._check_response(self.user, response, True) response = self._run_api_v1_users_id(self.admin, self.admin.id) - self._check_response(self.admin, response) + self._check_response(self.admin, response, True) response = self._run_api_v1_users_id(self.admin, self.owner.id) - self._check_response(self.owner, response) + self._check_response(self.owner, response, True) def test_api_v1_users_id_user(self): response = self._run_api_v1_users_id(self.user, self.user.id) - self._check_response(self.user, response) + self._check_response(self.user, response, True) response = self._run_api_v1_users_id(self.user, self.owner.id) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self._check_response(self.owner, response, False) def test_api_v1_users_id_annotator(self): response = self._run_api_v1_users_id(self.annotator, self.annotator.id) - self._check_response(self.annotator, response) + self._check_response(self.annotator, response, True) response = self._run_api_v1_users_id(self.annotator, self.user.id) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self._check_response(self.user, response, False) def test_api_v1_users_id_no_auth(self): response = self._run_api_v1_users_id(None, self.user.id) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) -class UserUpdateAPITestCase(APITestCase): +class UserPartialUpdateAPITestCase(APITestCase): def setUp(self): self.client = APIClient() create_db_users(self) - def _run_api_v1_users_id(self, user, user_id, data): - with ForceLogin(user, self.client): - response = self.client.put('/api/v1/users/{}'.format(user_id), data=data) - - return response - - def test_api_v1_users_id_admin(self): - data = {"username": "user09", "groups": ["user", "admin"], - "first_name": "my name"} - response = self._run_api_v1_users_id(self.admin, self.user.id, data) - - self.assertEqual(response.status_code, status.HTTP_200_OK) - user09 = User.objects.get(id=self.user.id) - self.assertEqual(user09.username, data["username"]) - self.assertEqual(user09.first_name, data["first_name"]) - - def test_api_v1_users_id_user(self): - data = {"username": "user10", "groups": ["user", "annotator"], - "first_name": "my name"} - response = self._run_api_v1_users_id(self.user, self.user.id, data) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - - def test_api_v1_users_id_annotator(self): - data = {"username": "user11", "groups": ["annotator"], - "first_name": "my name"} - response = self._run_api_v1_users_id(self.annotator, self.user.id, data) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - - def test_api_v1_users_id_no_auth(self): - data = {"username": "user12", "groups": ["user", "observer"], - "first_name": "my name"} - response = self._run_api_v1_users_id(None, self.user.id, data) - self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) - -class UserPartialUpdateAPITestCase(UserUpdateAPITestCase): def _run_api_v1_users_id(self, user, user_id, data): with ForceLogin(user, self.client): response = self.client.patch('/api/v1/users/{}'.format(user_id), data=data) @@ -624,13 +597,31 @@ def test_api_v1_users_id_admin_partial(self): response = self._run_api_v1_users_id(self.admin, self.user.id, data) self.assertEqual(response.status_code, status.HTTP_200_OK) - user09 = User.objects.get(id=self.user.id) - self.assertEqual(user09.username, data["username"]) - self.assertEqual(user09.last_name, data["last_name"]) + user = User.objects.get(id=self.user.id) + self.assertEqual(user.username, data["username"]) + self.assertEqual(user.last_name, data["last_name"]) def test_api_v1_users_id_user_partial(self): data = {"username": "user10", "first_name": "my name"} response = self._run_api_v1_users_id(self.user, self.user.id, data) + user = User.objects.get(id=self.user.id) + self.assertEqual(user.username, data["username"]) + self.assertEqual(user.first_name, data["first_name"]) + + data = {"is_staff": True} + response = self._run_api_v1_users_id(self.user, self.user.id, data) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + data = {"username": "admin", "is_superuser": True} + response = self._run_api_v1_users_id(self.user, self.user.id, data) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + data = {"username": "non_active", "is_active": False} + response = self._run_api_v1_users_id(self.user, self.user.id, data) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + data = {"username": "annotator01", "first_name": "slave"} + response = self._run_api_v1_users_id(self.user, self.annotator.id, data) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_api_v1_users_id_no_auth_partial(self): diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 2f548c3777e4..b7c8d621e4c1 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -37,7 +37,7 @@ ExceptionSerializer, AboutSerializer, JobSerializer, ImageMetaSerializer, RqStatusSerializer, TaskDataSerializer, LabeledDataSerializer, PluginSerializer, FileInfoSerializer, LogEventSerializer, - ProjectSerializer) + ProjectSerializer, BasicUserSerializer) from cvat.apps.annotation.serializers import AnnotationFileSerializer from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist @@ -503,23 +503,37 @@ def annotations(self, request, pk): return Response(data) class UserViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, - mixins.RetrieveModelMixin, mixins.UpdateModelMixin): + mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin): queryset = User.objects.all().order_by('id') - serializer_class = UserSerializer + http_method_names = ['get', 'post', 'head', 'patch', 'delete'] + + def get_serializer_class(self): + user = self.request.user + if user.is_staff: + return UserSerializer + else: + is_self = int(self.kwargs.get("pk", 0)) == user.id or \ + self.action == "self" + if is_self and self.request.method in SAFE_METHODS: + return UserSerializer + else: + return BasicUserSerializer def get_permissions(self): permissions = [IsAuthenticated] - if not self.action in ["self"]: - user = self.request.user - if self.action != "retrieve" or int(self.kwargs.get("pk", 0)) != user.id: + user = self.request.user + + if not self.request.method in SAFE_METHODS: + is_self = int(self.kwargs.get("pk", 0)) == user.id + if not is_self: permissions.append(auth.AdminRolePermission) return [perm() for perm in permissions] - @staticmethod - @action(detail=False, methods=['GET'], serializer_class=UserSerializer) - def self(request): - serializer = UserSerializer(request.user, context={ "request": request }) + @action(detail=False, methods=['GET']) + def self(self, request): + serializer_class = self.get_serializer_class() + serializer = serializer_class(request.user, context={ "request": request }) return Response(serializer.data) class PluginViewSet(viewsets.ModelViewSet):