diff --git a/CHANGELOG.md b/CHANGELOG.md index f6fb896a7..8de554638 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ ### Core - `intelmq.lib.utils.drop_privileges`: When IntelMQ is called as `root` and dropping the privileges to user `intelmq`, also set the non-primary groups associated with the `intelmq` user. Makes the behaviour of running intelmqctl as `root` closer to the behaviour of `sudo -u intelmq ...` (PR#2507 by Mikk Margus Möll). +- `intelmq.lib.utils.unzip`: Filter out directory entries when extracting data fixing the issue that + archives with directories causes extracting empty data for a directory entry (PR# by Kamil Mankowski). ### Development diff --git a/intelmq/lib/utils.py b/intelmq/lib/utils.py index 42d551ad9..de59a223a 100644 --- a/intelmq/lib/utils.py +++ b/intelmq/lib/utils.py @@ -538,7 +538,7 @@ def extract_tar(file): def extract(filename): return tar.extractfile(filename).read() - return tuple(file.name for file in tar.getmembers()), tar, extract + return tuple(file.name for file in tar.getmembers() if file.isfile()), tar, extract def extract_gzip(file): @@ -547,7 +547,7 @@ def extract_gzip(file): def extract_zip(file): zfp = zipfile.ZipFile(io.BytesIO(file), "r") - return zfp.namelist(), zfp, zfp.read + return [member.filename for member in zfp.infolist() if not member.is_dir()], zfp, zfp.read def unzip(file: bytes, extract_files: Union[bool, list], logger=None, diff --git a/intelmq/tests/assets/subdir.tar.gz b/intelmq/tests/assets/subdir.tar.gz new file mode 100644 index 000000000..03daf10d5 Binary files /dev/null and b/intelmq/tests/assets/subdir.tar.gz differ diff --git a/intelmq/tests/assets/subdir.tar.gz.license b/intelmq/tests/assets/subdir.tar.gz.license new file mode 100644 index 000000000..056d32ec6 --- /dev/null +++ b/intelmq/tests/assets/subdir.tar.gz.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: 2024 CERT.at GmbH + +SPDX-License-Identifier: AGPL-3.0-or-later diff --git a/intelmq/tests/assets/subdir.zip b/intelmq/tests/assets/subdir.zip new file mode 100644 index 000000000..5fba87a85 Binary files /dev/null and b/intelmq/tests/assets/subdir.zip differ diff --git a/intelmq/tests/assets/subdir.zip.license b/intelmq/tests/assets/subdir.zip.license new file mode 100644 index 000000000..056d32ec6 --- /dev/null +++ b/intelmq/tests/assets/subdir.zip.license @@ -0,0 +1,3 @@ +SPDX-FileCopyrightText: 2024 CERT.at GmbH + +SPDX-License-Identifier: AGPL-3.0-or-later diff --git a/intelmq/tests/bots/collectors/http/test_collector.py b/intelmq/tests/bots/collectors/http/test_collector.py index fa315c693..cefdacc86 100644 --- a/intelmq/tests/bots/collectors/http/test_collector.py +++ b/intelmq/tests/bots/collectors/http/test_collector.py @@ -143,6 +143,25 @@ def test_zip(self, mocker): self.assertMessageEqual(0, output0) self.assertMessageEqual(1, output1) + def test_zip_subdirs(self, mocker): + """ + Test unzipping when the zip has subdirectories + """ + prepare_mocker(mocker) + self.run_bot(parameters={'http_url': 'http://localhost/subdir.zip', + 'name': 'Example feed', + }, + iterations=1) + + output0 = OUTPUT[0].copy() + output0['feed.url'] = 'http://localhost/subdir.zip' + output0['extra.file_name'] = 'subdir/bar' + output1 = OUTPUT[1].copy() + output1['feed.url'] = 'http://localhost/subdir.zip' + output1['extra.file_name'] = 'subdir/foo' + self.assertMessageEqual(0, output0) + self.assertMessageEqual(1, output1) + @test.skip_exotic() def test_pgp(self, mocker): """ diff --git a/intelmq/tests/lib/test_utils.py b/intelmq/tests/lib/test_utils.py index b99a50138..daba62996 100644 --- a/intelmq/tests/lib/test_utils.py +++ b/intelmq/tests/lib/test_utils.py @@ -260,6 +260,14 @@ def test_unzip_tar_gz_return_names(self): self.assertEqual(tuple(result), (('bar', b'bar text\n'), ('foo', b'foo text\n'))) + def test_unzip_tar_gz_with_subdir(self): + """ Test the unzip function with a tar gz file and return_names. """ + filename = os.path.join(os.path.dirname(__file__), '../assets/subdir.tar.gz') + with open(filename, 'rb') as fh: + result = utils.unzip(fh.read(), extract_files=True, return_names=True) + self.assertEqual(tuple(result), (('subdir/foo', b'foo text\n'), + ('subdir/bar', b'bar text\n'))) + def test_unzip_gz(self): """ Test the unzip function with a gz file. """ filename = os.path.join(os.path.dirname(__file__), '../assets/foobar.gz') @@ -289,6 +297,14 @@ def test_unzip_zip_return_names(self): self.assertEqual(tuple(result), (('bar', b'bar text\n'), ('foo', b'foo text\n'))) + def test_unzip_zip_with_subdir(self): + """ Test the unzip function with a zip containing a subdirectory and returning names.""" + filename = os.path.join(os.path.dirname(__file__), '../assets/subdir.zip') + with open(filename, 'rb') as fh: + result = utils.unzip(fh.read(), extract_files=True, return_names=True) + self.assertEqual(tuple(result), (('subdir/bar', b'bar text\n'), + ('subdir/foo', b'foo text\n'))) + def test_file_name_from_response(self): """ test file_name_from_response """ response = requests.Response()