Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fixes #9887]Data retriever wrongly assign base_file value #9888

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion geonode/storage/data_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ def items(self):
return self.data_items.items()

def _unzip(self, zip_name: str) -> Mapping:
from geonode.utils import get_allowed_extensions

'''
Function to unzip the file. If is a shp or a tiff
is assigned as base_file otherwise will create the expected payloads
Expand All @@ -199,8 +201,11 @@ def _unzip(self, zip_name: str) -> Mapping:
zip_file = self.file_paths['base_file']
the_zip = zipfile.ZipFile(zip_file, allowZip64=True)
the_zip.extractall(self.temporary_folder)
available_choices = get_allowed_extensions()
not_main_files = ['xml', 'sld', 'zip']
base_file_choices = [x for x in available_choices if x not in not_main_files]
for _file in Path(self.temporary_folder).iterdir():
if _file.name.endswith('.shp') or _file.name.endswith('.tif'):
if any([_file.name.endswith(_ext) for _ext in base_file_choices]):
self.file_paths['base_file'] = Path(str(_file))
elif not zipfile.is_zipfile(str(_file)):
ext = _file.name.split(".")[-1]
Expand Down
30 changes: 29 additions & 1 deletion geonode/storage/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#########################################################################
import io
import os
import shutil
import gisdata

from unittest.mock import patch

from django.test.testcases import SimpleTestCase, TestCase
Expand Down Expand Up @@ -401,6 +401,11 @@ def test_storage_manager_replace_single_file(self, path, strg):


class TestDataRetriever(TestCase):
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
cls.project_root = os.path.abspath(os.path.dirname(__file__))

def setUp(self):
self.sut = StorageManager
self.local_files_paths = {
Expand Down Expand Up @@ -569,3 +574,26 @@ def test_storage_manager_rmtree(self):

self.sut().rmtree(_tmpdir)
self.assertFalse(os.path.exists(_tmpdir))

def test_zip_file_should_correctly_recognize_main_extension_with_csv(self):
# reinitiate the storage manager with the zip file
storage_manager = self.sut(remote_files={"base_file": os.path.join(f"{self.project_root}", "tests/data/example.zip")})
storage_manager.clone_remote_files()

self.assertIsNotNone(storage_manager.data_retriever.temporary_folder)
_files = storage_manager.get_retrieved_paths()
self.assertTrue("example.csv" in _files.get("base_file"))

def test_zip_file_should_correctly_recognize_main_extension_with_shp(self):
# zipping files
storage_manager = self.sut(remote_files=self.local_files_paths)
storage_manager.clone_remote_files()
storage_manager.data_retriever.temporary_folder
output = shutil.make_archive(f"{storage_manager.data_retriever.temporary_folder}/output", 'zip', storage_manager.data_retriever.temporary_folder)
# reinitiate the storage manager with the zip file
storage_manager = self.sut(remote_files={"base_file": output})
storage_manager.clone_remote_files()

self.assertIsNotNone(storage_manager.data_retriever.temporary_folder)
_files = storage_manager.get_retrieved_paths()
self.assertTrue("single_point.shp" in _files.get("base_file"))
Binary file added geonode/storage/tests/data/example.zip
Binary file not shown.