Skip to content

Commit

Permalink
More major improvements into Temp_Folder, Temp_Zip classes
Browse files Browse the repository at this point in the history
Added new class Temp_Zip_In_Memory
  • Loading branch information
DinisCruz committed Oct 27, 2023
1 parent 7a31ac6 commit 61fd9a0
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 27 deletions.
71 changes: 56 additions & 15 deletions osbot_utils/testing/Temp_Folder.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,73 @@
import random

from osbot_utils.utils.Misc import random_string

from osbot_utils.utils.Files import path_combine, temp_folder_current, safe_file_name, folder_exists, folder_create, \
folder_delete_recursively, files_list, file_create, folder_files, folders_recursive, files_find, files_recursive
folder_delete_recursively, files_list, file_create, folder_files, folders_recursive, files_find, files_recursive, \
temp_file_in_folder, create_folder, filter_parent_folder
from osbot_utils.utils.Zip import zip_folder


class Temp_Folder:

def __init__(self, folder_name=None, parent_folder=None, temp_prefix=None, delete_on_exit=True):
def __init__(self, folder_name=None, parent_folder=None, temp_prefix='', delete_on_exit=True, temp_files_to_add=0):
if type(parent_folder) is Temp_Folder:
parent_folder = parent_folder.path()
self.folder_name = folder_name or f"temp_folder_{random_string(prefix=temp_prefix)}"
self.parent_folder = parent_folder or temp_folder_current()
self.full_path = path_combine(self.parent_folder, safe_file_name(self.folder_name))
self.delete_on_exit = delete_on_exit
self.temp_files_to_add = temp_files_to_add


def __enter__(self):
folder_create(self.full_path)
self.add_temp_files_and_folders(max_total_files=self.temp_files_to_add)
return self

def __exit__(self, type, value, traceback):
if self.delete_on_exit:
folder_delete_recursively(self.full_path)

def add_temp_files(self, count=0):
if count is None: count = 1
for i in range(count):
self.add_file()

def add_temp_files_and_folders(self, target_folder=None, max_depth_param=5, max_files_per_folder=4, max_total_files=20, current_depth=0):
if max_total_files <=0: return max_total_files
if target_folder is None: target_folder = self.full_path

# Base case: if max_depth_param is 0 or we've reached max_total_files, we stop
if max_depth_param == 0 or max_total_files <= 0:
return max_total_files

# Randomly decide the number of subfolders and files for this folder
num_subfolders = random.randint(1, max_depth_param)
num_files = random.randint(1, min(max_files_per_folder, max_total_files))
#print(f'Creating {num_subfolders} subfolders and {num_files} files in {target_folder}')
# Create the random files
for _ in range(num_files):
temp_file_in_folder(target_folder,prefix= f'temp_file__{max_total_files}')
max_total_files -= 1
if max_total_files <= 0:
return max_total_files

current_depth +=1
# Recursively create subfolders and their contents
for _ in range(0, num_subfolders):
subfolder_name = f"_[{current_depth}]_temp_folder_{str(random.randint(0, 512))}_"
subfolder_path = path_combine(target_folder, subfolder_name)
create_folder(subfolder_path)

# Recursive call with decremented depth and updated max_total_files
max_total_files = self.add_temp_files_and_folders(subfolder_path, max_depth_param - 1, max_files_per_folder, max_total_files, current_depth=current_depth)

if max_total_files <= 0:
break

return max_total_files

def add_file(self, file_name=None, contents=None):
if file_name is None: file_name = f"temp_file_{random_string()}.txt"
if contents is None: contents = random_string()
Expand All @@ -35,6 +79,7 @@ def add_folder(self, name=None):
new_folder = path_combine(self.path(), safe_file_name(name))
return folder_create(new_folder)


def exists(self):
return folder_exists(self.full_path)

Expand All @@ -43,25 +88,21 @@ def path(self):

def files(self, show_parent_folder=False, include_folders=False):
all_files = files_recursive(self.path(), include_folders=include_folders)
return self.filter_parent_folder(all_files, show_parent_folder)
if show_parent_folder:
return all_files
return filter_parent_folder(all_files, self.path())

def files_and_folders(self, show_parent_folder=False):
all_files = files_find(self.path())
all_folders = folders_recursive(self.path())

return self.filter_parent_folder(all_files, show_parent_folder)

def filter_parent_folder(self, items, show_parent_folder):
all_files_and_folders = files_recursive(self.path(), include_folders=True)
if show_parent_folder:
return items
all_relative_items = []
for item in items:
all_relative_items.append(item.replace(self.path(),'')[1:])
return all_relative_items
return all_files_and_folders
return filter_parent_folder(all_files_and_folders, self.path())

def folders(self, show_parent_folder=False):
all_folders = folders_recursive(self.path())
return self.filter_parent_folder(all_folders, show_parent_folder)
if show_parent_folder:
return all_folders
return filter_parent_folder(all_folders, self.path())

def zip(self):
return zip_folder(self.path())
3 changes: 1 addition & 2 deletions osbot_utils/testing/Temp_Zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@


class Temp_Zip():
def __init__(self, target=None, target_zip_file=None, zip_to_memory=False, delete_zip_file=True):
def __init__(self, target=None, target_zip_file=None, delete_zip_file=True):
if type(target) is Temp_Folder:
target = target.path()
self.target = target
self.delete_zip_file = delete_zip_file
self.target_zip_file = target_zip_file
self.zip_to_memory = zip_to_memory
self.zip_file = None
self.zip_bytes = None
self.target_zipped = False
Expand Down
57 changes: 57 additions & 0 deletions osbot_utils/testing/Temp_Zip_In_Memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from osbot_utils.testing.Temp_Folder import Temp_Folder
from osbot_utils.utils.Files import is_file, is_folder, files_recursive, filter_parent_folder, temp_file
from osbot_utils.utils.Zip import zip_files_to_bytes


class Temp_Zip_In_Memory:

def __init__(self, targets=None):
self.targets = targets or []
self.root_path = None

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
pass

def add_file(self, file):
if is_file(file):
self.targets.append(file)
return self

def add_folder(self, folder):
if type(folder) is Temp_Folder:
folder = folder.path()
if is_folder(folder):
self.targets.append(folder)
return self

def all_source_files(self):
all_files = []
for target in self.targets:
if is_file(target):
all_files.append(target)
elif is_folder(target):
all_files.extend(files_recursive(target))
return all_files

def create_zip_file(self, target_zip_file=None):
if target_zip_file is None:
target_zip_file = temp_file(extension='.zip')
with open(target_zip_file, 'wb') as f:
f.write(self.zip_bytes())
return target_zip_file

def set_root_path(self, root_path):
if type(root_path) is Temp_Folder:
root_path = root_path.path()
self.root_path = root_path
return self

def zip_bytes(self):
return self.zip_buffer().getvalue()

def zip_buffer(self):
target_files = self.all_source_files()
return zip_files_to_bytes(target_files, root_path=self.root_path)
21 changes: 20 additions & 1 deletion osbot_utils/utils/Files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import tempfile
from os.path import abspath, join
from pathlib import Path
from osbot_utils.utils.Misc import bytes_to_base64, base64_to_bytes
from osbot_utils.utils.Misc import bytes_to_base64, base64_to_bytes, random_string


class Files:
Expand Down Expand Up @@ -138,6 +138,13 @@ def file_size(path):
def file_stats(path):
return os.stat(path)

@staticmethod
def filter_parent_folder(items, folder):
all_relative_items = []
for item in items:
all_relative_items.append(item.replace(folder, '')[1:])
return sorted(all_relative_items)

@staticmethod
def files_recursive(parent_dir, include_folders=False):
all_files = []
Expand All @@ -154,6 +161,7 @@ def files_recursive(parent_dir, include_folders=False):

return sorted(all_files)


@staticmethod
def folder_exists(path):
return is_folder(path)
Expand Down Expand Up @@ -354,6 +362,15 @@ def temp_file(extension = '.tmp', contents=None, target_folder=None):
file_create(tmp_file, contents)
return tmp_file

@staticmethod
def temp_file_in_folder(target_folder, prefix="temp_file_", postfix='.txt'):
if is_folder(target_folder):
path_to_file = path_combine(target_folder, random_string(prefix=prefix, postfix=postfix))
file_create(path_to_file, random_string())
return path_to_file



@staticmethod
def temp_filename(extension='.tmp'):
return file_name(temp_file(extension), check_if_exists=False)
Expand Down Expand Up @@ -440,6 +457,7 @@ def write_gz(path=None, contents=None):
file_write = Files.write
file_write_bytes = Files.write_bytes
file_write_gz = Files.write_gz
filter_parent_folder = Files.filter_parent_folder
files_find = Files.find
files_recursive = Files.files_recursive
files_list = Files.files
Expand Down Expand Up @@ -487,6 +505,7 @@ def write_gz(path=None, contents=None):
sub_folders = Files.sub_folders

temp_file = Files.temp_file
temp_file_in_folder = Files.temp_file_in_folder
temp_filename = Files.temp_filename
temp_folder = Files.temp_folder
temp_folder_current = Files.temp_folder_current
Expand Down
13 changes: 7 additions & 6 deletions osbot_utils/utils/Misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,13 @@ def random_password(length=24, prefix=''):
password = password.replace(item, '_')
return password

def random_string(length:int=8,prefix:str=None):
length -= 1 # so that we get the exact length when the value is provided
postfix = '_' + ''.join(random.choices(string.ascii_uppercase, k=length)).lower()
if prefix:
return prefix + postfix
return postfix
def random_string(length:int=8, prefix:str='', postfix:str=''):
if is_int(length):
length -= 1 # so that we get the exact length when the value is provided
else:
length = 7 # default length
value = '_' + ''.join(random.choices(string.ascii_uppercase, k=length)).lower()
return f"{prefix}{value}{postfix}"

def random_string_and_numbers(length:int=6,prefix:str=''):
return prefix + ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))
Expand Down
15 changes: 14 additions & 1 deletion osbot_utils/utils/Zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,22 @@ def unzip_file(zip_file, target_folder=None, format='zip'):
shutil.unpack_archive(zip_file, extract_dir=target_folder, format=format)
return target_folder

def zip_files_to_bytes(file_paths, root_path=None):
zip_buffer = io.BytesIO() # Create a BytesIO buffer to hold the zipped file
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: # Create a ZipFile object with the buffer as the target
for file_path in file_paths:
if root_path:
arcname = file_path.replace(root_path,'') # Define the arcname, which is the name inside the zip file
else:
arcname = file_path # if root_path is not provided, use the full file path
zf.write(file_path, arcname) # Add the file to the zip file
zip_buffer.seek(0)
return zip_buffer

def zip_folder(root_dir, format='zip'):
return shutil.make_archive(base_name=root_dir, format=format, root_dir=root_dir)

def zip_folder_to_bytes(root_dir):
def zip_folder_to_bytes(root_dir): # todo add unit test
zip_buffer = io.BytesIO() # Create a BytesIO buffer to hold the zipped file
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: # Create a ZipFile object with the buffer as the target
for foldername, subfolders, filenames in os.walk(root_dir): # Walk the root_dir and add all files and folders to the zip file
Expand All @@ -24,6 +36,7 @@ def zip_folder_to_bytes(root_dir):
arcname = os.path.relpath(absolute_path, root_dir) # Define the arcname, which is the name inside the zip file
zf.write(absolute_path, arcname) # Add the file to the zip file
zip_buffer.seek(0) # Reset buffer position
return zip_buffer

def zip_file_list(path):
if is_file(path):
Expand Down
3 changes: 1 addition & 2 deletions tests/testing/test_Temp_Zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ def test__with__default_params(self):
target_zip_file = None ,
target_zipped = False ,
zip_bytes = None ,
zip_file = None ,
zip_to_memory = False )
zip_file = None )


def test__using_folder(self):
Expand Down
41 changes: 41 additions & 0 deletions tests/testing/test_Temp_Zip_In_Memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from unittest import TestCase

from osbot_utils.utils.Files import file_exists, file_delete, file_extension

from osbot_utils.utils.Dev import pprint

from osbot_utils.testing.Temp_Folder import Temp_Folder
from osbot_utils.testing.Temp_Zip_In_Memory import Temp_Zip_In_Memory
from osbot_utils.utils.Zip import zip_files_to_bytes, zip_file_list


class test_Temp_Zip_In_Memory(TestCase):

def test__with__default_params(self):
with Temp_Zip_In_Memory() as _:
assert _.targets == []

def test_all_source_files(self):
with Temp_Folder() as temp_folder:
max_total_files = 30
temp_folder.add_temp_files_and_folders(max_total_files=max_total_files)
assert len(temp_folder.files()) == max_total_files
with Temp_Zip_In_Memory() as temp_zip_in_memory:
temp_zip_in_memory.add_folder(temp_folder)
target_files_to_zip = temp_zip_in_memory.all_source_files()

assert temp_folder.files(show_parent_folder=True) == target_files_to_zip
assert len(temp_zip_in_memory.zip_bytes()) > 11000

def test_create_zip_file(self):
with Temp_Folder(temp_files_to_add=3) as temp_folder:
with Temp_Zip_In_Memory() as _:
_.add_folder(temp_folder)
_.set_root_path(temp_folder)
target_zip_file = _.create_zip_file() # save in memory zip into disk
assert file_exists(target_zip_file) # make sure it exists
assert file_extension(target_zip_file) == '.zip' # make sure it has the right extension
assert temp_folder.files() == zip_file_list(target_zip_file) # confirm that all files inside temp_folder are inside the in memory zip file

assert file_delete(target_zip_file) is True

0 comments on commit 61fd9a0

Please sign in to comment.