Skip to content

Commit

Permalink
Fix #750
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Manovich committed Oct 8, 2019
1 parent ff1f6cf commit 4d9f0e2
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 86 deletions.
20 changes: 19 additions & 1 deletion cvat/apps/engine/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,24 @@ class Meta:
read_only_fields = ('created_date', 'updated_date', 'status')
ordering = ['-id']

class BasicUserSerializer(serializers.ModelSerializer):
def validate(self, data):
if hasattr(self, 'initial_data'):
unknown_keys = set(self.initial_data.keys()) - set(self.fields.keys())
if unknown_keys:
if set(['is_staff', 'is_superuser', 'groups']) & unknown_keys:
message = 'You do not have permissions to access some of' + \
' these fields: {}'.format(unknown_keys)
else:
message = 'Got unknown fields: {}'.format(unknown_keys)
raise serializers.ValidationError(message)
return data

class Meta:
model = User
fields = ('url', 'id', 'username', 'first_name', 'last_name', 'email')
ordering = ['-id']

class UserSerializer(serializers.ModelSerializer):
groups = serializers.SlugRelatedField(many=True,
slug_field='name', queryset=Group.objects.all())
Expand All @@ -294,7 +312,7 @@ class Meta:
model = User
fields = ('url', 'id', 'username', 'first_name', 'last_name', 'email',
'groups', 'is_staff', 'is_superuser', 'is_active', 'last_login',
'date_joined', 'groups')
'date_joined')
read_only_fields = ('last_login', 'date_joined')
write_only_fields = ('password', )
ordering = ['-id']
Expand Down
141 changes: 66 additions & 75 deletions cvat/apps/engine/tests/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,53 +462,70 @@ def test_api_v1_server_logs_no_auth(self):
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)


class UserListAPITestCase(APITestCase):
class UserAPITestCase(APITestCase):
def setUp(self):
self.client = APIClient()

@classmethod
def setUpTestData(cls):
create_db_users(cls)

def _check_response(self, user, response, is_full=True):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], user.id)
self.assertEqual(response.data["username"], user.username)
self.assertEqual(response.data["first_name"], user.first_name)
self.assertEqual(response.data["last_name"], user.last_name)
self.assertEqual(response.data["email"], user.email)
extra_check = self.assertIn if is_full else self.assertNotIn
extra_check("groups", response.data)
extra_check("is_staff", response.data)
extra_check("is_superuser", response.data)
extra_check("is_active", response.data)
extra_check("last_login", response.data)
extra_check("date_joined", response.data)


class UserListAPITestCase(UserAPITestCase):
def _run_api_v1_users(self, user):
with ForceLogin(user, self.client):
response = self.client.get('/api/v1/users')

return response

def test_api_v1_users_admin(self):
response = self._run_api_v1_users(self.admin)
def _check_response(self, user, response):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertListEqual(
["admin", "user1", "user2", "user3", "user4", "user5"],
[res["username"] for res in response.data["results"]])

def test_api_v1_users_admin(self):
response = self._run_api_v1_users(self.admin)
self._check_response(self.admin, response)

def test_api_v1_users_user(self):
response = self._run_api_v1_users(self.user)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self._check_response(self.user, response)

def test_api_v1_users_annotator(self):
response = self._run_api_v1_users(self.annotator)
self._check_response(self.annotator, response)

def test_api_v1_users_observer(self):
response = self._run_api_v1_users(self.observer)
self._check_response(self.observer, response)

def test_api_v1_users_no_auth(self):
response = self._run_api_v1_users(None)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

class UserSelfAPITestCase(APITestCase):
def setUp(self):
self.client = APIClient()

@classmethod
def setUpTestData(cls):
create_db_users(cls)

class UserSelfAPITestCase(UserAPITestCase):
def _run_api_v1_users_self(self, user):
with ForceLogin(user, self.client):
response = self.client.get('/api/v1/users/self')

return response

def _check_response(self, user, response):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["username"], user.username)

def test_api_v1_users_self_admin(self):
response = self._run_api_v1_users_self(self.admin)
self._check_response(self.admin, response)
Expand All @@ -521,98 +538,54 @@ def test_api_v1_users_self_annotator(self):
response = self._run_api_v1_users_self(self.annotator)
self._check_response(self.annotator, response)

def test_api_v1_users_self_observer(self):
response = self._run_api_v1_users_self(self.observer)
self._check_response(self.observer, response)

def test_api_v1_users_self_no_auth(self):
response = self._run_api_v1_users_self(None)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

class UserGetAPITestCase(APITestCase):
def setUp(self):
self.client = APIClient()

@classmethod
def setUpTestData(cls):
create_db_users(cls)

class UserGetAPITestCase(UserAPITestCase):
def _run_api_v1_users_id(self, user, user_id):
with ForceLogin(user, self.client):
response = self.client.get('/api/v1/users/{}'.format(user_id))

return response

def _check_response(self, user, response):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["id"], user.id)
self.assertEqual(response.data["username"], user.username)

def test_api_v1_users_id_admin(self):
response = self._run_api_v1_users_id(self.admin, self.user.id)
self._check_response(self.user, response)
self._check_response(self.user, response, True)

response = self._run_api_v1_users_id(self.admin, self.admin.id)
self._check_response(self.admin, response)
self._check_response(self.admin, response, True)

response = self._run_api_v1_users_id(self.admin, self.owner.id)
self._check_response(self.owner, response)
self._check_response(self.owner, response, True)

def test_api_v1_users_id_user(self):
response = self._run_api_v1_users_id(self.user, self.user.id)
self._check_response(self.user, response)
self._check_response(self.user, response, True)

response = self._run_api_v1_users_id(self.user, self.owner.id)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self._check_response(self.owner, response, False)

def test_api_v1_users_id_annotator(self):
response = self._run_api_v1_users_id(self.annotator, self.annotator.id)
self._check_response(self.annotator, response)
self._check_response(self.annotator, response, True)

response = self._run_api_v1_users_id(self.annotator, self.user.id)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self._check_response(self.user, response, False)

def test_api_v1_users_id_no_auth(self):
response = self._run_api_v1_users_id(None, self.user.id)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

class UserUpdateAPITestCase(APITestCase):
class UserPartialUpdateAPITestCase(APITestCase):
def setUp(self):
self.client = APIClient()
create_db_users(self)

def _run_api_v1_users_id(self, user, user_id, data):
with ForceLogin(user, self.client):
response = self.client.put('/api/v1/users/{}'.format(user_id), data=data)

return response

def test_api_v1_users_id_admin(self):
data = {"username": "user09", "groups": ["user", "admin"],
"first_name": "my name"}
response = self._run_api_v1_users_id(self.admin, self.user.id, data)

self.assertEqual(response.status_code, status.HTTP_200_OK)
user09 = User.objects.get(id=self.user.id)
self.assertEqual(user09.username, data["username"])
self.assertEqual(user09.first_name, data["first_name"])

def test_api_v1_users_id_user(self):
data = {"username": "user10", "groups": ["user", "annotator"],
"first_name": "my name"}
response = self._run_api_v1_users_id(self.user, self.user.id, data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_api_v1_users_id_annotator(self):
data = {"username": "user11", "groups": ["annotator"],
"first_name": "my name"}
response = self._run_api_v1_users_id(self.annotator, self.user.id, data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_api_v1_users_id_no_auth(self):
data = {"username": "user12", "groups": ["user", "observer"],
"first_name": "my name"}
response = self._run_api_v1_users_id(None, self.user.id, data)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

class UserPartialUpdateAPITestCase(UserUpdateAPITestCase):
def _run_api_v1_users_id(self, user, user_id, data):
with ForceLogin(user, self.client):
response = self.client.patch('/api/v1/users/{}'.format(user_id), data=data)
Expand All @@ -624,13 +597,31 @@ def test_api_v1_users_id_admin_partial(self):
response = self._run_api_v1_users_id(self.admin, self.user.id, data)

self.assertEqual(response.status_code, status.HTTP_200_OK)
user09 = User.objects.get(id=self.user.id)
self.assertEqual(user09.username, data["username"])
self.assertEqual(user09.last_name, data["last_name"])
user = User.objects.get(id=self.user.id)
self.assertEqual(user.username, data["username"])
self.assertEqual(user.last_name, data["last_name"])

def test_api_v1_users_id_user_partial(self):
data = {"username": "user10", "first_name": "my name"}
response = self._run_api_v1_users_id(self.user, self.user.id, data)
user = User.objects.get(id=self.user.id)
self.assertEqual(user.username, data["username"])
self.assertEqual(user.first_name, data["first_name"])

data = {"is_staff": True}
response = self._run_api_v1_users_id(self.user, self.user.id, data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

data = {"username": "admin", "is_superuser": True}
response = self._run_api_v1_users_id(self.user, self.user.id, data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

data = {"username": "non_active", "is_active": False}
response = self._run_api_v1_users_id(self.user, self.user.id, data)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

data = {"username": "annotator01", "first_name": "slave"}
response = self._run_api_v1_users_id(self.user, self.annotator.id, data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_api_v1_users_id_no_auth_partial(self):
Expand Down
34 changes: 24 additions & 10 deletions cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ExceptionSerializer, AboutSerializer, JobSerializer, ImageMetaSerializer,
RqStatusSerializer, TaskDataSerializer, LabeledDataSerializer,
PluginSerializer, FileInfoSerializer, LogEventSerializer,
ProjectSerializer)
ProjectSerializer, BasicUserSerializer)
from cvat.apps.annotation.serializers import AnnotationFileSerializer
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
Expand Down Expand Up @@ -503,23 +503,37 @@ def annotations(self, request, pk):
return Response(data)

class UserViewSet(viewsets.GenericViewSet, mixins.ListModelMixin,
mixins.RetrieveModelMixin, mixins.UpdateModelMixin):
mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin):
queryset = User.objects.all().order_by('id')
serializer_class = UserSerializer
http_method_names = ['get', 'post', 'head', 'patch', 'delete']

def get_serializer_class(self):
user = self.request.user
if user.is_staff:
return UserSerializer
else:
is_self = int(self.kwargs.get("pk", 0)) == user.id or \
self.action == "self"
if is_self and self.request.method in SAFE_METHODS:
return UserSerializer
else:
return BasicUserSerializer

def get_permissions(self):
permissions = [IsAuthenticated]
if not self.action in ["self"]:
user = self.request.user
if self.action != "retrieve" or int(self.kwargs.get("pk", 0)) != user.id:
user = self.request.user

if not self.request.method in SAFE_METHODS:
is_self = int(self.kwargs.get("pk", 0)) == user.id
if not is_self:
permissions.append(auth.AdminRolePermission)

return [perm() for perm in permissions]

@staticmethod
@action(detail=False, methods=['GET'], serializer_class=UserSerializer)
def self(request):
serializer = UserSerializer(request.user, context={ "request": request })
@action(detail=False, methods=['GET'])
def self(self, request):
serializer_class = self.get_serializer_class()
serializer = serializer_class(request.user, context={ "request": request })
return Response(serializer.data)

class PluginViewSet(viewsets.ModelViewSet):
Expand Down

0 comments on commit 4d9f0e2

Please sign in to comment.