Skip to content

Commit

Permalink
File Tests added
Browse files Browse the repository at this point in the history
  • Loading branch information
S-Holstein committed Aug 19, 2024
1 parent b3b416e commit d6f0599
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 34 deletions.
2 changes: 1 addition & 1 deletion lib/lib/base/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def download_data(
url, output_dir, file_name=None, chunk_size=1024 * 1000, timeout=300, auth=None, check_size=True, overwrite=False
):
"""
Download single file from USGS M2M by download url
Download single file from USGS M2M by download url
Parameters:
url: x
Expand Down
47 changes: 25 additions & 22 deletions lib/lib/base/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,39 @@ def unzip_file(zip_file, remove_zip=True, extract_dir=None):
if not os.path.exists(zip_file):
raise Exception("File does not exist: %s" % zip_file)

zip_ref = zipfile.ZipFile(zip_file, "r")
if extract_dir is None:
extract_dir = os.path.dirname(zip_file)

failed_files = dict()
failed_logs = ""
for name in zip_ref.namelist():
try:
zip_ref.extract(name, extract_dir)
except Exception as e:
failed_files[name] = str(e)
failed_logs += name + ": " + str(e) + "\n"

if len(failed_files) > 0:
raise Exception("Exceptions during unzipping: %s\n\n%s" % (zip_file, failed_logs))
else:
with zipfile.ZipFile(zip_file, "r") as zip_ref: # with block added bc os.remove raised WinErr32
for name in zip_ref.namelist():
try:
zip_ref.extract(name, extract_dir)
except Exception as e:
failed_files[name] = str(e)
failed_logs += name + ": " + str(e) + "\n"

if len(failed_files) > 0:
raise Exception("Exceptions during unzipping: %s\n\n%s" % (zip_file, failed_logs))
else: # removed try block bc sub-folder error wasn't caught
zip_folder_list = zip_ref.namelist()[0].split("/")
if len(zip_folder_list) < 2:
raise Exception("Could not find sub-folder in zip file %s" % zip_file)
zip_folder = zip_folder_list[0]

response_dict = {"scene_path": os.path.join(extract_dir, zip_folder)}

if remove_zip:
try:
zip_folder = zip_ref.namelist()[0].split("/")[0]
os.remove(zip_file)
response_dict["zip_file_removed"] = True
except Exception:
raise Exception("Could not find sub-folder in zip file" % zip_file)
response_dict = {"scene_path": os.path.join(extract_dir, zip_folder)}

if remove_zip:
try:
os.remove(zip_file)
response_dict["zip_file_removed"] = True
except Exception:
response_dict["zip_file_removed"] = False
else:
response_dict["zip_file_removed"] = False
return response_dict
else:
response_dict["zip_file_removed"] = False
return response_dict


def untar_file(tar_file, remove_tar=True, create_folder=False, base_folder=None):
Expand Down
1 change: 0 additions & 1 deletion lib/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ testpaths = [
"test",
"integration",
]
python_paths = ["lib"]

[tool.black]
line-length = 120
Expand Down
209 changes: 199 additions & 10 deletions lib/test/base/test_file.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,46 @@
import os
import shutil
import tempfile
from unittest import mock

import pytest
import zipfile
from lib.base.file import zip_directory, delete_file
from lib.base.file import zip_directory, delete_file, unzip_file, calculate_checksum, get_folder_size


# @pytest.fixture(autouse=True)
# def temp_directory():
# # Create a temporary directory for testing
# temp_dir = tempfile.mkdtemp()
# yield temp_dir
#
# # Clean up the temporary directory after the test
# for filename in os.listdir(temp_dir):
# file_path = os.path.join(temp_dir, filename)
# # Check if the path points to a file (not a directory)
# if os.path.isfile(file_path):
# # Delete the file
# os.remove(file_path)
# os.rmdir(temp_dir)


def cleanup_directory(directory):
shutil.rmtree(directory) # Löscht rekursiv das Verzeichnis und alle Inhalte


@pytest.fixture(autouse=True)
def temp_directory():
# Create a temporary directory for testing
temp_dir = tempfile.mkdtemp()
yield temp_dir
cleanup_directory(temp_dir)

# Clean up the temporary directory after the test
for filename in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, filename)
# Check if the path points to a file (not a directory)
if os.path.isfile(file_path):
# Delete the file
os.remove(file_path)
os.rmdir(temp_dir)

@pytest.fixture
def zip_file_path(temp_directory):
zip_file_path = os.path.join(temp_directory, "test.zip")
with zipfile.ZipFile(zip_file_path, "w") as zipf:
zipf.writestr("folder/test.txt", "Test content")
return zip_file_path


def test_zip_directory(temp_directory):
Expand Down Expand Up @@ -87,3 +109,170 @@ def test_delete_file_is_dir():
with pytest.raises(Exception):
delete_file(temp_dir)
os.rmdir(temp_dir)


# Test if the function raises an exception when the ZIP file does not exist
def test_unzip_file_not_existing():
with pytest.raises(Exception, match="File does not exist"):
unzip_file("non_existing_file.zip")


# Test if the function correctly unzips a valid ZIP file and returns the correct path and removal status
def test_unzip_file_success(zip_file_path):
result = unzip_file(zip_file_path)
assert os.path.exists(result["scene_path"])
assert os.path.isfile(os.path.join(result["scene_path"], "test.txt"))
assert result["zip_file_removed"] is True


# Test if the function handles errors during file extraction properly
def test_unzip_file_with_failed_files(zip_file_path):
# Simuliere einen Fehler beim Entpacken
with mock.patch("zipfile.ZipFile.extract", side_effect=Exception("Extraction failed")):
with pytest.raises(Exception, match="Exceptions during unzipping"):
unzip_file(zip_file_path)


# Test if the function removes the ZIP file when 'remove_zip=True'
def test_unzip_file_remove_zip(zip_file_path):
result = unzip_file(zip_file_path, remove_zip=True)
assert result["zip_file_removed"] is True
assert not os.path.exists(zip_file_path)


# Test if the function does not remove the ZIP file when 'remove_zip=False'
def test_unzip_file_keep_zip(zip_file_path):
result = unzip_file(zip_file_path, remove_zip=False)
assert result["zip_file_removed"] is False
assert os.path.exists(zip_file_path)


# Test if the function handles ZIP files without subfolders correctly
def test_unzip_file_no_subfolder(temp_directory):
zip_file_path = os.path.join(temp_directory, "test.zip")
with zipfile.ZipFile(zip_file_path, "w") as zipf:
zipf.writestr("test.txt", "Test content")

with pytest.raises(Exception, match="Could not find sub-folder in zip file"):
unzip_file(zip_file_path)


# Test if the function raises an exception for invalid or corrupted ZIP files
def test_unzip_file_invalid_zip(temp_directory):
invalid_zip_file = os.path.join(temp_directory, "invalid.zip")
with open(invalid_zip_file, "w") as f:
f.write("This is not a valid zip file")

with pytest.raises(zipfile.BadZipFile):
unzip_file(invalid_zip_file)


# Test if the function raises an exception for an unsupported checksum algorithm
def test_calculate_checksum_unsupported_algorithm():
# Test with an unsupported algorithm name
with pytest.raises(Exception, match="Checksum algorithm not available"):
calculate_checksum("unsupported_algorithm", "file.txt")

# Test with an empty algorithm name
with pytest.raises(Exception, match="Checksum algorithm not available"):
calculate_checksum("", "file.txt")


# Test if the function calculates the checksum correctly using a supported algorithm
def test_calculate_checksum_correct_algorithm(temp_directory):
test_file = os.path.join(temp_directory, "test.txt")
with open(test_file, "w") as f:
f.write("Test")

# Expected checksum value (e.g., MD5 hash)
expected_checksum = "0cbc6611f5540bd0809a388dc95a615b" # Example value for "Test" with MD5
result = calculate_checksum("MD5", test_file)
assert result == expected_checksum


# Test if the function handles missing files correctly
def test_calculate_checksum_file_not_found():
with pytest.raises(FileNotFoundError):
calculate_checksum("MD5", "non_existing_file.txt")


# Test if the function handles empty files correctly
def test_calculate_checksum_empty_file(temp_directory):
empty_file = os.path.join(temp_directory, "empty.txt")
open(empty_file, "w").close()

# Expected checksum value for an empty file (e.g., MD5 hash of an empty string)
expected_checksum = "d41d8cd98f00b204e9800998ecf8427e" # Example value for an empty file with MD5
result = calculate_checksum("MD5", empty_file)
assert result == expected_checksum


# Test if the function handles binary files correctly
def test_calculate_checksum_binary_file(temp_directory):
binary_file = os.path.join(temp_directory, "binary.bin")
with open(binary_file, "wb") as f:
f.write(b"\x00\x01\x02\x03")

# Expected checksum value for the binary content
expected_checksum = "37b59afd592725f9305e484a5d7f5168" # Example value for binary content with MD5
result = calculate_checksum("MD5", binary_file)
assert result == expected_checksum


# Test if the function raises an exception for a non-existent folder
def test_get_folder_size_folder_not_exist():
with pytest.raises(Exception, match="Folder .* does not exist!"):
get_folder_size("non_existing_folder")


# Test if the function returns 0 for an empty folder
def test_get_folder_size_empty_folder(temp_directory):
empty_folder = os.path.join(temp_directory, "empty")
os.makedirs(empty_folder)
size = get_folder_size(empty_folder)
assert size == 0


# Test if the function calculates the size correctly for a folder with a single file
def test_get_folder_size_single_file(temp_directory):
single_file_folder = os.path.join(temp_directory, "single_file")
os.makedirs(single_file_folder)
file_path = os.path.join(single_file_folder, "file.txt")
with open(file_path, "w") as f:
f.write("Test content")
size = get_folder_size(single_file_folder)
assert size == os.path.getsize(file_path)


# Test if the function calculates the total size correctly for a folder with multiple files
def test_get_folder_size_multiple_files(temp_directory):
multiple_files_folder = os.path.join(temp_directory, "multiple_files")
os.makedirs(multiple_files_folder)
file_paths = [os.path.join(multiple_files_folder, "file1.txt"), os.path.join(multiple_files_folder, "file2.txt")]
with open(file_paths[0], "w") as f:
f.write("Content 1")
with open(file_paths[1], "w") as f:
f.write("Content 2")
total_size = sum(os.path.getsize(fp) for fp in file_paths)
size = get_folder_size(multiple_files_folder)
assert size == total_size


# Test if the function calculates the size correctly for a folder with subfolders
def test_get_folder_size_with_subfolders(temp_directory):
main_folder = os.path.join(temp_directory, "main_folder")
os.makedirs(main_folder)
subfolder = os.path.join(main_folder, "subfolder")
os.makedirs(subfolder)

file_in_main = os.path.join(main_folder, "main_file.txt")
file_in_sub = os.path.join(subfolder, "sub_file.txt")

with open(file_in_main, "w") as f:
f.write("Main file content")
with open(file_in_sub, "w") as f:
f.write("Subfolder file content")

total_size = os.path.getsize(file_in_main) + os.path.getsize(file_in_sub)
size = get_folder_size(main_folder)
assert size == total_size

0 comments on commit d6f0599

Please sign in to comment.