Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend QueueAdapter to support dynamic configuration #345

Merged
merged 8 commits into from
Sep 28, 2024
147 changes: 93 additions & 54 deletions pysqa/queueadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from jinja2 import Template

from pysqa.base.abstract import QueueAdapterAbstractClass
from pysqa.base.config import QueueAdapterWithConfig, read_config
from pysqa.base.core import execute_command
from pysqa.base.config import QueueAdapterWithConfig, Queues, read_config
from pysqa.base.core import QueueAdapterCore, execute_command
from pysqa.base.modular import ModularQueueAdapter


Expand Down Expand Up @@ -39,7 +39,10 @@ class QueueAdapter(QueueAdapterAbstractClass):
"""

def __init__(
self, directory: str = "~/.queues", execute_command: callable = execute_command
self,
directory: Optional[str] = None,
queue_type: Optional[str] = None,
execute_command: callable = execute_command,
):
"""
Comment on lines +42 to 47
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the __init__ docstring to include the queue_type parameter.

The __init__ method now accepts queue_type as an optional parameter, but the docstring does not mention it. Please update the docstring to reflect this new parameter and its purpose.

Apply this diff to update the docstring:

 def __init__(
     self,
     directory: Optional[str] = None,
     queue_type: Optional[str] = None,
     execute_command: callable = execute_command,
 ):
     """
     Initialize the QueueAdapter.

     Args:
-        directory (str): Directory containing the queue.yaml files and corresponding templates.
-        execute_command (callable): Function to execute commands.
+        directory (Optional[str]): Directory containing the queue.yaml files and corresponding templates.
+        queue_type (Optional[str]): Type of the queue system (e.g., "SLURM", "SGE"). Required if `directory` is not provided.
+        execute_command (callable): Function to execute commands.
     """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self,
directory: Optional[str] = None,
queue_type: Optional[str] = None,
execute_command: callable = execute_command,
):
"""
self,
directory: Optional[str] = None,
queue_type: Optional[str] = None,
execute_command: callable = execute_command,
):
"""
Initialize the QueueAdapter.
Args:
directory (Optional[str]): Directory containing the queue.yaml files and corresponding templates.
queue_type (Optional[str]): Type of the queue system (e.g., "SLURM", "SGE"). Required if `directory` is not provided.
execute_command (callable): Function to execute commands.
"""

Initialize the QueueAdapter.
Expand All @@ -48,35 +51,41 @@ def __init__(
directory (str): Directory containing the queue.yaml files and corresponding templates.
execute_command (callable): Function to execute commands.
"""
queue_yaml = os.path.join(directory, "queue.yaml")
clusters_yaml = os.path.join(directory, "clusters.yaml")
self._adapter = None
if os.path.exists(queue_yaml):
self._queue_dict = {
"default": set_queue_adapter(
config=read_config(file_name=queue_yaml),
directory=directory,
execute_command=execute_command,
)
}
primary_queue = "default"
elif os.path.exists(clusters_yaml):
config = read_config(file_name=clusters_yaml)
self._queue_dict = {
k: set_queue_adapter(
config=read_config(file_name=os.path.join(directory, v)),
directory=directory,
execute_command=execute_command,
if directory is not None:
queue_yaml = os.path.join(directory, "queue.yaml")
clusters_yaml = os.path.join(directory, "clusters.yaml")
self._adapter = None
if os.path.exists(queue_yaml):
self._queue_dict = {
"default": set_queue_adapter(
config=read_config(file_name=queue_yaml),
directory=directory,
execute_command=execute_command,
)
}
primary_queue = "default"
elif os.path.exists(clusters_yaml):
config = read_config(file_name=clusters_yaml)
self._queue_dict = {
k: set_queue_adapter(
config=read_config(file_name=os.path.join(directory, v)),
directory=directory,
execute_command=execute_command,
)
for k, v in config["cluster"].items()
}
primary_queue = config["cluster_primary"]
else:
raise ValueError(
"Neither a queue.yaml file nor a clusters.yaml file were found in "
+ directory
)
for k, v in config["cluster"].items()
}
primary_queue = config["cluster_primary"]
self._adapter = self._queue_dict[primary_queue]
elif queue_type is not None:
self._queue_dict = {}
self._adapter = QueueAdapterCore(queue_type=queue_type.upper())
else:
raise ValueError(
"Neither a queue.yaml file nor a clusters.yaml file were found in "
+ directory
)
self._adapter = self._queue_dict[primary_queue]
raise ValueError()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle the case when both directory and queue_type are provided.

Currently, if both directory and queue_type are provided, the directory takes precedence, and queue_type is ignored. Consider raising a warning or error if both are provided to avoid confusion.

Add a check to alert the user:

 if directory is not None:
     # Existing logic
+    if queue_type is not None:
+        raise ValueError("Provide only one of 'directory' or 'queue_type', not both.")
 elif queue_type is not None:
     # Existing logic

Committable suggestion was skipped due to low confidence.

Comment on lines +88 to 89
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add an informative error message to ValueError.

When neither directory nor queue_type is provided, the constructor raises a ValueError without a message. Including an informative message will help users understand the error.

Apply this diff to add an error message:

 else:
-    raise ValueError()
+    raise ValueError("Either 'directory' or 'queue_type' must be provided to initialize QueueAdapter.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise ValueError()
else:
raise ValueError("Either 'directory' or 'queue_type' must be provided to initialize QueueAdapter.")

def list_clusters(self) -> List[str]:
"""
Expand All @@ -97,14 +106,17 @@ def switch_cluster(self, cluster_name: str):
self._adapter = self._queue_dict[cluster_name]

@property
def config(self) -> dict:
def config(self) -> Union[dict, None]:
"""
Get the QueueAdapter configuration.

Returns:
dict: The QueueAdapter configuration.
"""
return self._adapter.config
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.config
else:
return None
Comment on lines +109 to +119
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the docstring of the config property to reflect the possible None return value.

The config property may return None if _adapter is not an instance of QueueAdapterWithConfig. The docstring should indicate this possibility.

Apply this diff to update the docstring:

 @property
 def config(self) -> Union[dict, None]:
     """
     Get the QueueAdapter configuration.

     Returns:
-        dict: The QueueAdapter configuration.
+        Union[dict, None]: The QueueAdapter configuration, or `None` if not available.
     """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def config(self) -> Union[dict, None]:
"""
Get the QueueAdapter configuration.
Returns:
dict: The QueueAdapter configuration.
"""
return self._adapter.config
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.config
else:
return None
def config(self) -> Union[dict, None]:
"""
Get the QueueAdapter configuration.
Returns:
Union[dict, None]: The QueueAdapter configuration, or `None` if not available.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.config
else:
return None


@property
def ssh_delete_file_on_remote(self) -> bool:
Expand All @@ -114,7 +126,10 @@ def ssh_delete_file_on_remote(self) -> bool:
Returns:
bool: The value of ssh_delete_file_on_remote property.
"""
return self._adapter.ssh_delete_file_on_remote
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.ssh_delete_file_on_remote
else:
return False

@property
def remote_flag(self) -> bool:
Expand All @@ -124,37 +139,49 @@ def remote_flag(self) -> bool:
Returns:
bool: The value of remote_flag property.
"""
return self._adapter.remote_flag
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.remote_flag
else:
return False

@property
def queue_list(self) -> List[str]:
def queue_list(self) -> Union[List[str], None]:
"""
Get the list of available queues.

Returns:
List[str]: The list of available queues.
"""
return self._adapter.queue_list
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_list
else:
return None
Comment on lines +148 to +158
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the docstring of the queue_list property to reflect the possible None return value.

Since queue_list may return None, the docstring should be updated accordingly.

Apply this diff to update the docstring:

 @property
 def queue_list(self) -> Union[List[str], None]:
     """
     Get the list of available queues.

     Returns:
-        List[str]: The list of available queues.
+        Union[List[str], None]: The list of available queues, or `None` if not available.
     """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def queue_list(self) -> Union[List[str], None]:
"""
Get the list of available queues.
Returns:
List[str]: The list of available queues.
"""
return self._adapter.queue_list
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_list
else:
return None
def queue_list(self) -> Union[List[str], None]:
"""
Get the list of available queues.
Returns:
Union[List[str], None]: The list of available queues, or `None` if not available.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_list
else:
return None


@property
def queue_view(self) -> pandas.DataFrame:
def queue_view(self) -> Union[pandas.DataFrame, None]:
"""
Get the Pandas DataFrame representation of the available queues.

Returns:
pandas.DataFrame: The Pandas DataFrame representation of the available queues.
"""
return self._adapter.queue_view
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_view
else:
return None
Comment on lines +161 to +171
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the docstring of the queue_view property to reflect the possible None return value.

The queue_view property might return None; the docstring should indicate this possibility.

Apply this diff to update the docstring:

 @property
 def queue_view(self) -> Union[pandas.DataFrame, None]:
     """
     Get the Pandas DataFrame representation of the available queues.

     Returns:
-        pandas.DataFrame: The Pandas DataFrame representation of the available queues.
+        Union[pandas.DataFrame, None]: The DataFrame representation of the available queues, or `None` if not available.
     """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def queue_view(self) -> Union[pandas.DataFrame, None]:
"""
Get the Pandas DataFrame representation of the available queues.
Returns:
pandas.DataFrame: The Pandas DataFrame representation of the available queues.
"""
return self._adapter.queue_view
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_view
else:
return None
def queue_view(self) -> Union[pandas.DataFrame, None]:
"""
Get the Pandas DataFrame representation of the available queues.
Returns:
Union[pandas.DataFrame, None]: The DataFrame representation of the available queues, or `None` if not available.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queue_view
else:
return None


@property
def queues(self) -> List[str]:
def queues(self) -> Union[Queues, None]:
"""
Get the list of available queues.

Returns:
List[str]: The list of available queues.
"""
return self._adapter.queues
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queues
else:
return None
Comment on lines +174 to +184
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the docstring of the queues property to reflect the possible None return value.

Since queues may return None, the docstring should be updated to represent this.

Apply this diff to update the docstring:

 @property
 def queues(self) -> Union[Queues, None]:
     """
     Get the available queues.

     Returns:
-        List[str]: The list of available queues.
+        Union[Queues, None]: The available queues object, or `None` if not available.
     """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def queues(self) -> Union[Queues, None]:
"""
Get the list of available queues.
Returns:
List[str]: The list of available queues.
"""
return self._adapter.queues
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queues
else:
return None
def queues(self) -> Union[Queues, None]:
"""
Get the available queues.
Returns:
Union[Queues, None]: The available queues object, or `None` if not available.
"""
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.queues
else:
return None


def submit_job(
self,
Expand Down Expand Up @@ -220,7 +247,10 @@ def get_job_from_remote(self, working_directory: str):
Args:
working_directory (str): The working directory.
"""
self._adapter.get_job_from_remote(working_directory=working_directory)
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.get_job_from_remote(working_directory=working_directory)
else:
raise TypeError()

Comment on lines +250 to 254
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add an informative error message to TypeError in get_job_from_remote.

Raising a TypeError without a message can make debugging difficult. Include a message to clarify the issue.

Apply this diff to add an error message:

 else:
-    raise TypeError()
+    raise TypeError("'get_job_from_remote' is not available when '_adapter' is not an instance of 'QueueAdapterWithConfig'.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.get_job_from_remote(working_directory=working_directory)
else:
raise TypeError()
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.get_job_from_remote(working_directory=working_directory)
else:
raise TypeError("'get_job_from_remote' is not available when '_adapter' is not an instance of 'QueueAdapterWithConfig'.")

def transfer_file_to_remote(
self,
Expand All @@ -236,11 +266,14 @@ def transfer_file_to_remote(
transfer_back (bool): Whether to transfer the file back.
delete_file_on_remote (bool): Whether to delete the file on the remote host.
"""
self._adapter.transfer_file(
file=file,
transfer_back=transfer_back,
delete_file_on_remote=delete_file_on_remote,
)
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.transfer_file(
file=file,
transfer_back=transfer_back,
delete_file_on_remote=delete_file_on_remote,
)
else:
raise TypeError()
Comment on lines +269 to +276
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add an informative error message to TypeError in transfer_file_to_remote.

Include a message in the raised TypeError to provide clarity on why the error occurred.

Apply this diff to add an error message:

 else:
-    raise TypeError()
+    raise TypeError("'transfer_file_to_remote' is not available when '_adapter' is not an instance of 'QueueAdapterWithConfig'.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.transfer_file(
file=file,
transfer_back=transfer_back,
delete_file_on_remote=delete_file_on_remote,
)
else:
raise TypeError()
if isinstance(self._adapter, QueueAdapterWithConfig):
self._adapter.transfer_file(
file=file,
transfer_back=transfer_back,
delete_file_on_remote=delete_file_on_remote,
)
else:
raise TypeError("'transfer_file_to_remote' is not available when '_adapter' is not an instance of 'QueueAdapterWithConfig'.")


def convert_path_to_remote(self, path: str) -> str:
"""
Expand All @@ -252,7 +285,10 @@ def convert_path_to_remote(self, path: str) -> str:
Returns:
str: The remote path.
"""
return self._adapter.convert_path_to_remote(path=path)
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.convert_path_to_remote(path=path)
else:
raise TypeError()
Comment on lines +288 to +291
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add an informative error message to TypeError in convert_path_to_remote.

Providing an error message will help users understand why the method is not available.

Apply this diff to add an error message:

 else:
-    raise TypeError()
+    raise TypeError("'convert_path_to_remote' is not available when '_adapter' is not an instance of 'QueueAdapterWithConfig'.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.convert_path_to_remote(path=path)
else:
raise TypeError()
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.convert_path_to_remote(path=path)
else:
raise TypeError("'convert_path_to_remote' is not available when '_adapter' is not an instance of 'QueueAdapterWithConfig'.")


def delete_job(self, process_id: int) -> str:
"""
Expand Down Expand Up @@ -334,13 +370,16 @@ def check_queue_parameters(
Returns:
List: A list containing the checked parameters [cores, run_time_max, memory_max].
"""
return self._adapter.check_queue_parameters(
queue=queue,
cores=cores,
run_time_max=run_time_max,
memory_max=memory_max,
active_queue=active_queue,
)
if isinstance(self._adapter, QueueAdapterWithConfig):
return self._adapter.check_queue_parameters(
queue=queue,
cores=cores,
run_time_max=run_time_max,
memory_max=memory_max,
active_queue=active_queue,
)
else:
return cores, run_time_max, memory_max
Comment on lines +373 to +382
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the return type in the docstring of check_queue_parameters to match the actual return type.

The method returns a tuple, but the docstring specifies a list. Additionally, the method may return None values. Update the docstring to reflect this accurately.

Apply this diff to correct the docstring:

 Returns:
-    List: A list containing the checked parameters [cores, run_time_max, memory_max].
+    Tuple[Union[float, int, None], Union[float, int, None], Union[float, int, None]]: A tuple containing the checked parameters (cores, run_time_max, memory_max).

Committable suggestion was skipped due to low confidence.



def set_queue_adapter(
Expand Down
4 changes: 4 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def test_missing_config(self):
with self.assertRaises(ValueError):
QueueAdapter(directory=os.path.join(self.path, "config/error"))

def test_no_config(self):
with self.assertRaises(ValueError):
QueueAdapter()

def test_bad_queue_template(self):
with self.assertRaises(TemplateSyntaxError):
QueueAdapter(directory=os.path.join(self.path, "config/bad_template"))
Expand Down
Loading
Loading