Skip to content

Commit

Permalink
[Backport to 3.2.x][Fixes GeoNode#7173] Improve the "get_visibile_reo…
Browse files Browse the repository at this point in the history
…urces" method: replace for… (GeoNode#7179)

* [Fixes GeoNode#7173] Improve the "get_visibile_reources" method: replace for loop with DB filtering

(cherry picked from commit 33c18dd)

* [Fixes GeoNode#7173] Improve the "get_visibile_reources" method: replace for loop with DB filtering

(cherry picked from commit 236fade)

* [Fixes GeoNode#7173] Improve the "get_visibile_reources" method: replace for loop with DB filtering

(cherry picked from commit 8386552)

# Conflicts:
#	geonode/security/tests.py
(cherry picked from commit 1ce89cb)
  • Loading branch information
Alessio Fabiani committed Mar 29, 2021
1 parent 69638fb commit 43ecdeb
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 16 deletions.
97 changes: 96 additions & 1 deletion geonode/security/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from django.conf import settings
from django.http import HttpRequest
from django.urls import reverse
from django.db.models import Q
from django.contrib.auth import get_user_model
from django.test.utils import override_settings

Expand Down Expand Up @@ -60,12 +61,12 @@

from .utils import (
get_visible_resources,
purge_geofence_all,
get_users_with_perms,
get_geofence_rules,
get_geofence_rules_count,
get_highest_priority,
set_geofence_all,
purge_geofence_all,
sync_geofence_with_guardian,
sync_resources_with_guardian
)
Expand Down Expand Up @@ -1670,6 +1671,7 @@ def test_sync_resources_with_guardian_delay_true(self):


class TestGetVisibleResources(ResourceTestCaseMixin, GeoNodeBaseTestSupport):

def setUp(self):
super(TestGetVisibleResources, self).setUp()
self.user = get_user_model().objects.get(username="admin")
Expand All @@ -1687,3 +1689,96 @@ def test_get_visible_resources_should_return_updated_resource_with_metadata_only
layers = Layer.objects.all()
actual = get_visible_resources(queryset=layers, user=self.user)
self.assertEqual(9, len(actual))

@override_settings(
ADMIN_MODERATE_UPLOADS=True,
RESOURCE_PUBLISHING=True,
GROUP_PRIVATE_RESOURCES=True)
def test_get_visible_resources_advanced_workflow(self):
admin_user = get_user_model().objects.get(username="admin")
standard_user = get_user_model().objects.get(username="bobby")

self.assertIsNotNone(admin_user)
self.assertIsNotNone(standard_user)
admin_user.is_superuser = True
admin_user.save()
layers = Layer.objects.all()

actual = get_visible_resources(
queryset=Layer.objects.all(),
user=admin_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count() - 1, actual.count())
actual = get_visible_resources(
queryset=Layer.objects.all(),
user=standard_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count() - 1, actual.count())

# Test 'is_approved=False' 'is_published=False'
Layer.objects.filter(
~Q(owner=standard_user)).update(
is_approved=False, is_published=False)

actual = get_visible_resources(
queryset=Layer.objects.all(),
user=admin_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count() - 1, actual.count())
actual = get_visible_resources(
queryset=Layer.objects.all(),
user=standard_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count() - 1, actual.count())
actual = get_visible_resources(
queryset=Layer.objects.all(),
user=None,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(2, actual.count())

# Test private groups
private_groups = GroupProfile.objects.filter(
access="private")
private_groups.first().leave(standard_user)
Layer.objects.filter(
~Q(owner=standard_user)).update(
group=private_groups.first().group)
actual = get_visible_resources(
queryset=Layer.objects.all(),
user=admin_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count() - 1, actual.count())
actual = get_visible_resources(
queryset=Layer.objects.all(),
user=standard_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(2, actual.count())
actual = get_visible_resources(
queryset=Layer.objects.all(),
user=None,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True)
# The method returns only 'metadata_only=False' resources
self.assertEqual(2, actual.count())
25 changes: 10 additions & 15 deletions geonode/security/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
from django.conf import settings
from django.db.models import Q
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Group, Permission
from django.core.exceptions import ObjectDoesNotExist
from guardian.utils import get_user_obj_perms_model
from guardian.shortcuts import assign_perm, get_anonymous_user
from guardian.shortcuts import (
assign_perm,
get_anonymous_user,
get_objects_for_user)

from geonode.utils import get_layer_workspace
from geonode.groups.models import GroupProfile
Expand Down Expand Up @@ -73,7 +75,7 @@ def get_visible_resources(queryset,

if not is_admin:
if admin_approval_required:
if not user or user.is_anonymous:
if not user or not user.is_authenticated or user.is_anonymous:
filter_set = filter_set.filter(
Q(is_published=True) |
Q(group__in=public_groups) |
Expand All @@ -82,7 +84,7 @@ def get_visible_resources(queryset,

# Hide Unpublished Resources to Anonymous Users
if unpublished_not_visible:
if not user or user.is_anonymous:
if not user or not user.is_authenticated or user.is_anonymous:
filter_set = filter_set.exclude(Q(is_published=False))

# Hide Resources Belonging to Private Groups
Expand All @@ -102,20 +104,13 @@ def get_visible_resources(queryset,
Q(dirty_state=True) & ~(
Q(owner__username__iexact=str(user)) | Q(group__in=group_list_all))
)
elif not user or user.is_anonymous:
else:
filter_set = filter_set.exclude(Q(dirty_state=True))

if admin_approval_required or unpublished_not_visible or private_groups_not_visibile:
_allowed_resources = []
for _obj in filter_set.all():
try:
resource = _obj.get_self_resource()
if user.has_perm('base.view_resourcebase', resource) or \
user.has_perm('view_resourcebase', resource):
_allowed_resources.append(resource.id)
except (PermissionDenied, Exception) as e:
logger.debug(e)
return filter_set.filter(id__in=_allowed_resources)
if user:
_allowed_resources = get_objects_for_user(user, 'base.view_resourcebase')
return filter_set.filter(id__in=_allowed_resources.values('id'))

return filter_set

Expand Down

0 comments on commit 43ecdeb

Please sign in to comment.