From ddfca3f19521fee22b3f0e08b2ab2b674bc6d0ed Mon Sep 17 00:00:00 2001 From: pem70 Date: Thu, 30 May 2024 09:56:16 -0400 Subject: [PATCH 1/6] Refactor files.py into separate classes Signed-off-by: pem70 --- .../zowe/zos_files_for_zowe_sdk/__init__.py | 3 + .../zowe/zos_files_for_zowe_sdk/datasets.py | 614 +++++++++++++ .../zos_files_for_zowe_sdk/file_system.py | 150 ++++ .../zowe/zos_files_for_zowe_sdk/files.py | 828 ++---------------- .../zowe/zos_files_for_zowe_sdk/uss.py | 163 ++++ tests/unit/test_zos_files.py | 30 +- 6 files changed, 1025 insertions(+), 763 deletions(-) create mode 100644 src/zos_files/zowe/zos_files_for_zowe_sdk/datasets.py create mode 100644 src/zos_files/zowe/zos_files_for_zowe_sdk/file_system.py create mode 100644 src/zos_files/zowe/zos_files_for_zowe_sdk/uss.py diff --git a/src/zos_files/zowe/zos_files_for_zowe_sdk/__init__.py b/src/zos_files/zowe/zos_files_for_zowe_sdk/__init__.py index a772ba07..518be574 100644 --- a/src/zos_files/zowe/zos_files_for_zowe_sdk/__init__.py +++ b/src/zos_files/zowe/zos_files_for_zowe_sdk/__init__.py @@ -4,3 +4,6 @@ from . import constants, exceptions from .files import Files +from .datasets import Datasets +from .uss import USSFiles +from .file_system import FileSystems \ No newline at end of file diff --git a/src/zos_files/zowe/zos_files_for_zowe_sdk/datasets.py b/src/zos_files/zowe/zos_files_for_zowe_sdk/datasets.py new file mode 100644 index 00000000..4881c4f4 --- /dev/null +++ b/src/zos_files/zowe/zos_files_for_zowe_sdk/datasets.py @@ -0,0 +1,614 @@ +"""Zowe Python Client SDK. + +This program and the accompanying materials are made available under the terms of the +Eclipse Public License v2.0 which accompanies this distribution, and is available at + +https://www.eclipse.org/legal/epl-v20.html + +SPDX-License-Identifier: EPL-2.0 + +Copyright Contributors to the Zowe Project. +""" + +import os + +from zowe.core_for_zowe_sdk import SdkApi +from zowe.core_for_zowe_sdk.exceptions import FileNotFound +from zowe.zos_files_for_zowe_sdk import constants, exceptions +from zowe.zos_files_for_zowe_sdk.constants import FileType, zos_file_constants + +_ZOWE_FILES_DEFAULT_ENCODING = "utf-8" + +class Datasets(SdkApi): + """ + Class used to represent the base z/OSMF Datasets API + which includes all operations related to datasets. + + ... + + Attributes + ---------- + connection + connection object + """ + + def __init__(self, connection): + """ + Construct a Datasets object. + + Parameters + ---------- + connection + The z/OSMF connection object (generated by the ZoweSDK object) + + Also update header to accept gzip encoded responses + """ + super().__init__(connection, "/zosmf/restfiles/", logger_name=__name__) + self.default_headers["Accept-Encoding"] = "gzip" + + def list(self, name_pattern, return_attributes=False): + """Retrieve a list of datasets based on a given pattern. + + Parameters + ---------- + name_pattern : str + The pattern to match dataset names. + return_attributes : bool, optional + Whether to return dataset attributes along with the names. Defaults to False. + + Returns + ------- + list of dict + + A JSON with a list of dataset names (and attributes if specified) matching the given pattern. + """ + custom_args = self._create_custom_request_arguments() + custom_args["params"] = {"dslevel": self._encode_uri_component(name_pattern)} + custom_args["url"] = "{}ds".format(self.request_endpoint) + + if return_attributes: + custom_args["headers"]["X-IBM-Attributes"] = "base" + + response_json = self.request_handler.perform_request("GET", custom_args) + return response_json + + def list_members(self, dataset_name, member_pattern=None, member_start=None, limit=1000, attributes="member"): + """Retrieve the list of members on a given PDS/PDSE. + + Returns + ------- + json + A JSON with a list of members from a given PDS/PDSE + """ + custom_args = self._create_custom_request_arguments() + additional_parms = {} + if member_start is not None: + additional_parms["start"] = member_start + if member_pattern is not None: + additional_parms["pattern"] = member_pattern + custom_args["params"] = additional_parms + custom_args["url"] = "{}ds/{}/member".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + custom_args["headers"]["X-IBM-Max-Items"] = "{}".format(limit) + custom_args["headers"]["X-IBM-Attributes"] = attributes + response_json = self.request_handler.perform_request("GET", custom_args) + return response_json["items"] # type: ignore + + def copy_dataset_or_member( + self, + from_dataset_name, + to_dataset_name, + from_member_name=None, + volser=None, + alias=None, + to_member_name=None, + enq=None, + replace=False, + ): + """ + Copy a dataset or member to another dataset or member. + Parameters + ---------- + from_dataset_name: str + Name of the dataset to copy from + to_dataset_name: str + Name of the dataset to copy to + from_member_name: str + Name of the member to copy from + volser: str + Volume serial number of the dataset to copy from + alias: bool + Alias of the dataset to copy from + to_member_name: str + Name of the member to copy to + enq: str + Enqueue type for the dataset to copy from + replace: bool + If true, members in the target data set are replaced. + Returns + ------- + json + A JSON containing the result of the operation + """ + + data = { + "request": "copy", + "from-dataset": {"dsn": from_dataset_name.strip(), "member": from_member_name}, + "replace": replace, + } + + path_to_member = f"{to_dataset_name}({to_member_name})" if to_member_name else to_dataset_name + if enq: + if enq in ("SHR", "SHRW", "EXCLU"): + data["enq"] = enq + else: + self.logger.error("Invalid value for enq.") + raise ValueError("Invalid value for enq.") + if volser: + data["from-dataset"]["volser"] = volser + if alias is not None: # because it can be false so + data["from-dataset"]["alias"] = alias + + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(path_to_member)) + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) + return response_json + + def get_content(self, dataset_name, stream=False): + """Retrieve the contents of a given dataset. + + Returns + ------- + json + A JSON with the contents of a given dataset + """ + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + response_json = self.request_handler.perform_request("GET", custom_args, stream=stream) + return response_json + + def create(self, dataset_name, options={}): + """ + Create a sequential or partitioned dataset. + Parameters + ---------- + dataset_name + Returns + ------- + json + """ + + if options.get("like") is None: + if options.get("primary") is None or options.get("lrecl") is None: + self.logger.error("If 'like' is not specified, you must specify 'primary' or 'lrecl'.") + raise ValueError("If 'like' is not specified, you must specify 'primary' or 'lrecl'.") + + for opt in ( + "volser", + "unit", + "dsorg", + "alcunit", + "primary", + "secondary", + "dirblk", + "avgblk", + "recfm", + "blksize", + "lrecl", + "storclass", + "mgntclass", + "dataclass", + "dsntype", + "like", + ): + if opt == "dsorg": + if options.get(opt) is not None and options[opt] not in ("PO", "PS"): + self.logger.error(f"{opt} is not 'PO' or 'PS'.") + raise KeyError + + elif opt == "alcunit": + if options.get(opt) is None: + options[opt] = "TRK" + else: + if options[opt] not in ("CYL", "TRK"): + self.logger.error(f"{opt} is not 'CYL' or 'TRK'.") + raise KeyError + + elif opt == "primary": + if options.get(opt) is not None: + if options["primary"] > 16777215: + self.logger.error("Specified value exceeds limit.") + raise ValueError + + elif opt == "secondary": + if options.get("primary") is not None: + if options.get(opt) is None: + options["secondary"] = int(options["primary"] / 10) + if options["secondary"] > 16777215: + self.logger.error("Specified value exceeds limit.") + raise ValueError + + elif opt == "dirblk": + if options.get(opt) is not None: + if options.get("dsorg") == "PS": + if options["dirblk"] != 0: + self.logger.error("Can't allocate directory blocks for files.") + raise ValueError + elif options.get("dsorg") == "PO": + if options["dirblk"] == 0: + self.logger.error("Can't allocate empty directory blocks.") + raise ValueError + + elif opt == "recfm": + if options.get(opt) is None: + options[opt] = "F" + else: + if options[opt] not in ("F", "FB", "V", "VB", "U", "FBA", "FBM", "VBA", "VBM"): + self.logger.error("Invalid record format.") + raise KeyError + + elif opt == "blksize": + if options.get(opt) is None and options.get("lrecl") is not None: + options[opt] = options["lrecl"] + + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + custom_args["json"] = options + response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) + return response_json + + def create_default(self, dataset_name: str, default_type: str): + """ + Create a dataset with default options set. + Default options depend on the requested type. + + Parameters + ---------- + dataset_name: str + default_type: str + "partitioned", "sequential", "classic", "c" or "binary" + + Returns + ------- + json - A JSON containing the result of the operation + """ + + if default_type not in ("partitioned", "sequential", "classic", "c", "binary"): + self.logger.error("Invalid type for default data set.") + raise ValueError("Invalid type for default data set.") + + custom_args = self._create_custom_request_arguments() + + if default_type == "partitioned": + custom_args["json"] = { + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "dirblk": 5, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + } + elif default_type == "sequential": + custom_args["json"] = { + "alcunit": "CYL", + "dsorg": "PS", + "primary": 1, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + } + elif default_type == "classic": + custom_args["json"] = { + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + "dirblk": 25, + } + elif default_type == "c": + custom_args["json"] = { + "dsorg": "PO", + "alcunit": "CYL", + "primary": 1, + "recfm": "VB", + "blksize": 32760, + "lrecl": 260, + "dirblk": 25, + } + elif default_type == "binary": + custom_args["json"] = { + "dsorg": "PO", + "alcunit": "CYL", + "primary": 10, + "recfm": "U", + "blksize": 27998, + "lrecl": 27998, + "dirblk": 25, + } + + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) + return response_json + + def get_binary_content(self, dataset_name, stream = False, with_prefixes=False): + """ + Retrieve the contents of a given dataset as a binary bytes object. + + Parameters + ---------- + dataset_name: str - Name of the dataset to retrieve + with_prefixes: boolean - if True include a 4 byte big endian record len prefix + default: False + Returns + ------- + response + A response object from the requests library + """ + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + custom_args["headers"]["Accept"] = "application/octet-stream" + if with_prefixes: + custom_args["headers"]["X-IBM-Data-Type"] = "record" + else: + custom_args["headers"]["X-IBM-Data-Type"] = "binary" + response = self.request_handler.perform_request("GET", custom_args, stream=stream) + return response + + def write(self, dataset_name, data, encoding=_ZOWE_FILES_DEFAULT_ENCODING): + """Write content to an existing dataset. + + Returns + ------- + json + A JSON containing the result of the operation + """ + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + custom_args["data"] = data + custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204, 201]) + return response_json + + def download(self, dataset_name, output_file): + """Retrieve the contents of a dataset and saves it to a given file.""" + response = self.get_content(dataset_name, stream = True) + with open(output_file, "w", encoding="utf-8") as f: + for chunk in response.iter_content(chunk_size=4096, decode_unicode=True): + f.write(chunk) + + def download_binary(self, dataset_name, output_file, with_prefixes=False): + """Retrieve the contents of a binary dataset and saves it to a given file. + + Parameters + ---------- + dataset_name:str - Name of the dataset to download + output_file:str - Name of the local file to create + with_prefixes:boolean - If true, include a four big endian bytes record length prefix. + The default is False + """ + response = self.get_binary_content(dataset_name, with_prefixes=with_prefixes, stream=True) + with open(output_file, "wb") as f: + for chunk in response.iter_content(chunk_size=4096): + f.write(chunk) + + def upload_file(self, input_file, dataset_name, encoding=_ZOWE_FILES_DEFAULT_ENCODING): + """Upload contents of a given file and uploads it to a dataset.""" + if os.path.isfile(input_file): + with open(input_file, "rb") as in_file: + response_json = self.write(dataset_name, in_file) + else: + self.logger.error(f"File {input_file} not found.") + raise FileNotFound(input_file) + + def recall_migrated(self, dataset_name: str, wait=False): + """ + Recalls a migrated data set. + + Parameters + ---------- + dataset_name: str + Name of the data set + + wait: bool + If true, the function waits for completion of the request, otherwise the request is queued + + Returns + ------- + json - A JSON containing the result of the operation + """ + + data = {"request": "hrecall", "wait": wait} + + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) + return response_json + + def delete_migrated(self, dataset_name: str, purge=False, wait=False): + """ + Deletes migrated data set. + + Parameters + ---------- + dataset_name: str + Name of the data set + + purge: bool + If true, the function uses the PURGE=YES on ARCHDEL request, otherwise it uses the PURGE=NO. + + wait: bool + If true, the function waits for completion of the request, otherwise the request is queued. + + Returns + ------- + json - A JSON containing the result of the operation + """ + + data = { + "request": "hdelete", + "purge": purge, + "wait": wait, + } + + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) + return response_json + + def migrate(self, dataset_name: str, wait=False): + """ + Migrates the data set. + + Parameters + ---------- + dataset_name: str + Name of the data set + + wait: bool + If true, the function waits for completion of the request, otherwise the request is queued. + + Returns + ------- + json - A JSON containing the result of the operation + """ + + data = {"request": "hmigrate", "wait": wait} + + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) + return response_json + + def rename(self, before_dataset_name: str, after_dataset_name: str): + """ + Renames the data set. + + Parameters + ---------- + before_dataset_name: str + The source data set name. + + after_dataset_name: str + New name for the source data set. + + Returns + ------- + json - A JSON containing the result of the operation + """ + + data = {"request": "rename", "from-dataset": {"dsn": before_dataset_name.strip()}} + + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}ds/{}".format( + self.request_endpoint, self._encode_uri_component(after_dataset_name).strip() + ) + + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) + return response_json + + def rename_member(self, dataset_name: str, before_member_name: str, after_member_name: str, enq=""): + """ + Renames the data set member. + + Parameters + ---------- + dataset_name: str + Name of the data set. + + before_member_name: str + The source member name. + + after_member_name: str + New name for the source member. + + enq: str + Values can be SHRW or EXCLU. SHRW is the default for PDS members, EXCLU otherwise. + + Returns + ------- + json - A JSON containing the result of the operation + """ + + data = { + "request": "rename", + "from-dataset": { + "dsn": dataset_name.strip(), + "member": before_member_name.strip(), + }, + } + + path_to_member = dataset_name.strip() + "(" + after_member_name.strip() + ")" + + if enq: + if enq in ("SHRW", "EXCLU"): + data["enq"] = enq.strip() + else: + self.logger.error("Invalid value for enq.") + raise ValueError("Invalid value for enq.") + + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(path_to_member)) + + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) + return response_json + + def delete(self, dataset_name, volume=None, member_name=None): + """Deletes a sequential or partitioned data.""" + custom_args = self._create_custom_request_arguments() + if member_name is not None: + dataset_name = f"{dataset_name}({member_name})" + url = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) + if volume is not None: + url = "{}ds/-{}/{}".format(self.request_endpoint, volume, self._encode_uri_component(dataset_name)) + custom_args["url"] = url + response_json = self.request_handler.perform_request("DELETE", custom_args, expected_code=[200, 202, 204]) + return response_json + + def copy_uss_to_dataset( + self, from_filename, to_dataset_name, to_member_name=None, type=FileType.TEXT, replace=False + ): + """ + Copy a USS file to dataset. + + Parameters + ---------- + from_filename: str + Name of the file to copy from. + to_dataset_name: str + Name of the dataset to copy to. + to_member_name: str + Name of the member to copy to. + type: FileType, optional + Type of the file to copy from. Default is FileType.TEXT. + replace: bool, optional + If true, members in the target dataset are replaced. + + Returns + ------- + json + A JSON containing the result of the operation. + """ + + data = { + "request": "copy", + "from-file": {"filename": from_filename.strip(), "type": type.value}, + "replace": replace, + } + + path_to_member = f"{to_dataset_name}({to_member_name})" if to_member_name else to_dataset_name + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(path_to_member)) + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) + return response_json \ No newline at end of file diff --git a/src/zos_files/zowe/zos_files_for_zowe_sdk/file_system.py b/src/zos_files/zowe/zos_files_for_zowe_sdk/file_system.py new file mode 100644 index 00000000..0a35c9e3 --- /dev/null +++ b/src/zos_files/zowe/zos_files_for_zowe_sdk/file_system.py @@ -0,0 +1,150 @@ +"""Zowe Python Client SDK. + +This program and the accompanying materials are made available under the terms of the +Eclipse Public License v2.0 which accompanies this distribution, and is available at + +https://www.eclipse.org/legal/epl-v20.html + +SPDX-License-Identifier: EPL-2.0 + +Copyright Contributors to the Zowe Project. +""" + +import os + +from zowe.core_for_zowe_sdk import SdkApi +from zowe.core_for_zowe_sdk.exceptions import FileNotFound +from zowe.zos_files_for_zowe_sdk import constants, exceptions +from zowe.zos_files_for_zowe_sdk.constants import FileType, zos_file_constants + +_ZOWE_FILES_DEFAULT_ENCODING = "utf-8" + + +class FileSystems(SdkApi): + """ + Class used to represent the base z/OSMF FileSystems API + which includes all operations related to file systems. + + ... + + Attributes + ---------- + connection + connection object + """ + + def __init__(self, connection): + """ + Construct a FileSystems object. + + Parameters + ---------- + connection + The z/OSMF connection object (generated by the ZoweSDK object) + + Also update header to accept gzip encoded responses + """ + super().__init__(connection, "/zosmf/restfiles/", logger_name=__name__) + self.default_headers["Accept-Encoding"] = "gzip" + + def create(self, file_system_name, options={}): + """ + Create a z/OS UNIX zFS Filesystem. + + Parameter + --------- + file_system_name: str - the name for the file system + + Returns + ------- + json - A JSON containing the result of the operation + """ + for key, value in options.items(): + if key == "perms": + if value < 0 or value > 777: + self.logger.error("Invalid Permissions Option.") + raise exceptions.InvalidPermsOption(value) + + if key == "cylsPri" or key == "cylsSec": + if value > constants.zos_file_constants["MaxAllocationQuantity"]: + self.logger.error("Maximum allocation quantity exceeded.") + raise exceptions.MaxAllocationQuantityExceeded + + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}mfs/zfs/{}".format(self.request_endpoint, file_system_name) + custom_args["json"] = options + response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) + return response_json + + def delete(self, file_system_name): + """ + Deletes a zFS Filesystem + """ + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}mfs/zfs/{}".format(self.request_endpoint, file_system_name) + response_json = self.request_handler.perform_request("DELETE", custom_args, expected_code=[204]) + return response_json + + def mount(self, file_system_name, mount_point, options={}, encoding=_ZOWE_FILES_DEFAULT_ENCODING): + """Mounts a z/OS UNIX file system on a specified directory. + Parameter + --------- + file_system_name: str - the name for the file system + mount_point: str - mount point to be used for mounting the UNIX file system + options: dict - A JSON of request body options + + Returns + ------- + json - A JSON containing the result of the operation + """ + options["action"] = "mount" + options["mount-point"] = mount_point + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}mfs/{}".format(self.request_endpoint, file_system_name) + custom_args["json"] = options + custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204]) + return response_json + + def unmount(self, file_system_name, options={}, encoding=_ZOWE_FILES_DEFAULT_ENCODING): + """Unmounts a z/OS UNIX file system on a specified directory. + + Parameter + --------- + file_system_name: str - the name for the file system + options: dict - A JSON of request body options + + Returns + ------- + json - A JSON containing the result of the operation + """ + options["action"] = "unmount" + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}mfs/{}".format(self.request_endpoint, file_system_name) + custom_args["json"] = options + custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204]) + return response_json + + def list(self, file_path_name=None, file_system_name=None): + """ + list all mounted filesystems, or the specific filesystem mounted at a given path, or the + filesystem with a given Filesystem name. + + Parameter + --------- + file_path: str - the UNIX directory that contains the files and directories to be listed. + file_system_name: str - the name for the file system to be listed + + Returns + ------- + json - A JSON containing the result of the operation + """ + custom_args = self._create_custom_request_arguments() + + custom_args["params"] = {"path": file_path_name, "fsname": file_system_name} + custom_args["url"] = "{}mfs".format(self.request_endpoint) + response_json = self.request_handler.perform_request("GET", custom_args, expected_code=[200]) + return response_json + + diff --git a/src/zos_files/zowe/zos_files_for_zowe_sdk/files.py b/src/zos_files/zowe/zos_files_for_zowe_sdk/files.py index 688c24e8..c7a5d49d 100644 --- a/src/zos_files/zowe/zos_files_for_zowe_sdk/files.py +++ b/src/zos_files/zowe/zos_files_for_zowe_sdk/files.py @@ -16,6 +16,9 @@ from zowe.core_for_zowe_sdk.exceptions import FileNotFound from zowe.zos_files_for_zowe_sdk import constants, exceptions from zowe.zos_files_for_zowe_sdk.constants import FileType, zos_file_constants +from .datasets import Datasets +from .uss import USSFiles +from .file_system import FileSystems _ZOWE_FILES_DEFAULT_ENCODING = "utf-8" @@ -31,10 +34,14 @@ class Files(SdkApi): connection connection object """ + dsn: Datasets + uss: USSFiles + fs: FileSystems def __init__(self, connection): """ - Construct a Files object. + Construct a Files object, a composition of a Datasets object, + a USSFiles object, and a FileSystems object Parameters ---------- @@ -45,143 +52,37 @@ def __init__(self, connection): """ super().__init__(connection, "/zosmf/restfiles/", logger_name=__name__) self.default_headers["Accept-Encoding"] = "gzip" + self.dsn = Datasets(connection) + self.uss = USSFiles(connection) + self.fs = FileSystems(connection) def list_files(self, path): - """Retrieve a list of USS files based on a given pattern. - - Returns - ------- - json - A JSON with a list of dataset names matching the given pattern - """ - custom_args = self._create_custom_request_arguments() - custom_args["params"] = {"path": path} - custom_args["url"] = "{}fs".format(self.request_endpoint) - response_json = self.request_handler.perform_request("GET", custom_args) - return response_json + """Deprecated function. Please use uss.list() instead""" + return self.uss.list(path) + def get_file_content_streamed(self, file_path, binary=False): + """Deprecated function. Please use uss.get_content_streamed() instead""" + return self.uss.get_content_streamed(file_path, binary) + def get_file_content(self, filepath_name): - """Retrieve the content of a filename. The complete path must be specified. - - Returns - ------- - json - A JSON with the contents of the specified USS file - """ - custom_args = self._create_custom_request_arguments() - # custom_args["params"] = {"filepath-name": filepath_name} - custom_args["url"] = "{}fs{}".format(self.request_endpoint, filepath_name) - response_json = self.request_handler.perform_request("GET", custom_args) - return response_json + """Deprecated function. Please use uss.get_content() instead""" + return self.uss.get_content(filepath_name) def delete_uss(self, filepath_name, recursive=False): - """ - Delete a file or directory - - Parameters - ---------- - filepath of the file to be deleted - - recursive - If specified as True, all the files and sub-directories will be deleted. - - Returns - ------- - 204 - HTTP Response for No Content - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}fs/{}".format(self.request_endpoint, filepath_name.lstrip("/")) - if recursive: - custom_args["headers"]["X-IBM-Option"] = "recursive" - - response_json = self.request_handler.perform_request("DELETE", custom_args, expected_code=[204]) - return response_json + """Deprecated function. Please use uss.delete() instead""" + return self.uss.delete(filepath_name, recursive) def list_dsn(self, name_pattern, return_attributes=False): - """Retrieve a list of datasets based on a given pattern. - - Parameters - ---------- - name_pattern : str - The pattern to match dataset names. - return_attributes : bool, optional - Whether to return dataset attributes along with the names. Defaults to False. - - Returns - ------- - list of dict - - A JSON with a list of dataset names (and attributes if specified) matching the given pattern. - """ - custom_args = self._create_custom_request_arguments() - custom_args["params"] = {"dslevel": self._encode_uri_component(name_pattern)} - custom_args["url"] = "{}ds".format(self.request_endpoint) - - if return_attributes: - custom_args["headers"]["X-IBM-Attributes"] = "base" - - response_json = self.request_handler.perform_request("GET", custom_args) - return response_json + """Deprecated function. Please use dsn.list() instead""" + return self.dsn.list(name_pattern, return_attributes) def list_dsn_members(self, dataset_name, member_pattern=None, member_start=None, limit=1000, attributes="member"): - """Retrieve the list of members on a given PDS/PDSE. - - Returns - ------- - json - A JSON with a list of members from a given PDS/PDSE - """ - custom_args = self._create_custom_request_arguments() - additional_parms = {} - if member_start is not None: - additional_parms["start"] = member_start - if member_pattern is not None: - additional_parms["pattern"] = member_pattern - custom_args["params"] = additional_parms - custom_args["url"] = "{}ds/{}/member".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - custom_args["headers"]["X-IBM-Max-Items"] = "{}".format(limit) - custom_args["headers"]["X-IBM-Attributes"] = attributes - response_json = self.request_handler.perform_request("GET", custom_args) - return response_json["items"] # type: ignore - - def copy_uss_to_dataset( - self, from_filename, to_dataset_name, to_member_name=None, type=FileType.TEXT, replace=False - ): - """ - Copy a USS file to dataset. - - Parameters - ---------- - from_filename: str - Name of the file to copy from. - to_dataset_name: str - Name of the dataset to copy to. - to_member_name: str - Name of the member to copy to. - type: FileType, optional - Type of the file to copy from. Default is FileType.TEXT. - replace: bool, optional - If true, members in the target dataset are replaced. - - Returns - ------- - json - A JSON containing the result of the operation. - """ - - data = { - "request": "copy", - "from-file": {"filename": from_filename.strip(), "type": type.value}, - "replace": replace, - } + """Deprecated function. Please use dsn.list_members() instead""" + return self.dsn.list_members(dataset_name, member_pattern, member_start, limit, attributes) - path_to_member = f"{to_dataset_name}({to_member_name})" if to_member_name else to_dataset_name - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(path_to_member)) - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) - return response_json + def copy_uss_to_dataset(self, from_filename, to_dataset_name, to_member_name=None, type=FileType.TEXT, replace=False): + """Deprecated function. Please use dsn.copy_uss_to_dataset instead""" + return self.dsn.copy_uss_to_dataset(from_filename, to_dataset_name, to_member_name, type, replace) def copy_dataset_or_member( self, @@ -194,674 +95,105 @@ def copy_dataset_or_member( enq=None, replace=False, ): - """ - Copy a dataset or member to another dataset or member. - Parameters - ---------- - from_dataset_name: str - Name of the dataset to copy from - to_dataset_name: str - Name of the dataset to copy to - from_member_name: str - Name of the member to copy from - volser: str - Volume serial number of the dataset to copy from - alias: bool - Alias of the dataset to copy from - to_member_name: str - Name of the member to copy to - enq: str - Enqueue type for the dataset to copy from - replace: bool - If true, members in the target data set are replaced. - Returns - ------- - json - A JSON containing the result of the operation - """ - - data = { - "request": "copy", - "from-dataset": {"dsn": from_dataset_name.strip(), "member": from_member_name}, - "replace": replace, - } - - path_to_member = f"{to_dataset_name}({to_member_name})" if to_member_name else to_dataset_name - if enq: - if enq in ("SHR", "SHRW", "EXCLU"): - data["enq"] = enq - else: - self.logger.error("Invalid value for enq.") - raise ValueError("Invalid value for enq.") - if volser: - data["from-dataset"]["volser"] = volser - if alias is not None: # because it can be false so - data["from-dataset"]["alias"] = alias - - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(path_to_member)) - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) - return response_json + """Deprecated function. Please use dsn.copy_dataset_or_member() instead""" + return self.dsn.copy_dataset_or_member(from_dataset_name, to_dataset_name, from_member_name, volser, alias, to_member_name, enq, replace) def get_dsn_content(self, dataset_name): - """Retrieve the contents of a given dataset. - - Returns - ------- - json - A JSON with the contents of a given dataset - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - response_json = self.request_handler.perform_request("GET", custom_args) - return response_json + """Deprecated function. Please use dsn.get_content() instead""" + return self.dsn.get_content(dataset_name) def create_data_set(self, dataset_name, options={}): - """ - Create a sequential or partitioned dataset. - Parameters - ---------- - dataset_name - Returns - ------- - json - """ - - if options.get("like") is None: - if options.get("primary") is None or options.get("lrecl") is None: - self.logger.error("If 'like' is not specified, you must specify 'primary' or 'lrecl'.") - raise ValueError("If 'like' is not specified, you must specify 'primary' or 'lrecl'.") - - for opt in ( - "volser", - "unit", - "dsorg", - "alcunit", - "primary", - "secondary", - "dirblk", - "avgblk", - "recfm", - "blksize", - "lrecl", - "storclass", - "mgntclass", - "dataclass", - "dsntype", - "like", - ): - if opt == "dsorg": - if options.get(opt) is not None and options[opt] not in ("PO", "PS"): - self.logger.error(f"{opt} is not 'PO' or 'PS'.") - raise KeyError - - elif opt == "alcunit": - if options.get(opt) is None: - options[opt] = "TRK" - else: - if options[opt] not in ("CYL", "TRK"): - self.logger.error(f"{opt} is not 'CYL' or 'TRK'.") - raise KeyError - - elif opt == "primary": - if options.get(opt) is not None: - if options["primary"] > 16777215: - self.logger.error("Specified value exceeds limit.") - raise ValueError - - elif opt == "secondary": - if options.get("primary") is not None: - if options.get(opt) is None: - options["secondary"] = int(options["primary"] / 10) - if options["secondary"] > 16777215: - self.logger.error("Specified value exceeds limit.") - raise ValueError - - elif opt == "dirblk": - if options.get(opt) is not None: - if options.get("dsorg") == "PS": - if options["dirblk"] != 0: - self.logger.error("Can't allocate directory blocks for files.") - raise ValueError - elif options.get("dsorg") == "PO": - if options["dirblk"] == 0: - self.logger.error("Can't allocate empty directory blocks.") - raise ValueError - - elif opt == "recfm": - if options.get(opt) is None: - options[opt] = "F" - else: - if options[opt] not in ("F", "FB", "V", "VB", "U", "FBA", "FBM", "VBA", "VBM"): - self.logger.error("Invalid record format.") - raise KeyError - - elif opt == "blksize": - if options.get(opt) is None and options.get("lrecl") is not None: - options[opt] = options["lrecl"] - - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - custom_args["json"] = options - response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) - return response_json + """Deprecated function. Please use dsn.create() instead""" + return self.dsn.create(dataset_name, options) def create_default_data_set(self, dataset_name: str, default_type: str): - """ - Create a dataset with default options set. - Default options depend on the requested type. - - Parameters - ---------- - dataset_name: str - default_type: str - "partitioned", "sequential", "classic", "c" or "binary" - - Returns - ------- - json - A JSON containing the result of the operation - """ - - if default_type not in ("partitioned", "sequential", "classic", "c", "binary"): - self.logger.error("Invalid type for default data set.") - raise ValueError("Invalid type for default data set.") - - custom_args = self._create_custom_request_arguments() - - if default_type == "partitioned": - custom_args["json"] = { - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "dirblk": 5, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - } - elif default_type == "sequential": - custom_args["json"] = { - "alcunit": "CYL", - "dsorg": "PS", - "primary": 1, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - } - elif default_type == "classic": - custom_args["json"] = { - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - "dirblk": 25, - } - elif default_type == "c": - custom_args["json"] = { - "dsorg": "PO", - "alcunit": "CYL", - "primary": 1, - "recfm": "VB", - "blksize": 32760, - "lrecl": 260, - "dirblk": 25, - } - elif default_type == "binary": - custom_args["json"] = { - "dsorg": "PO", - "alcunit": "CYL", - "primary": 10, - "recfm": "U", - "blksize": 27998, - "lrecl": 27998, - "dirblk": 25, - } - - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) - return response_json + """Deprecated function. Please use dsn.create_default() instead""" + return self.dsn.create_default(dataset_name, default_type) def create_uss(self, file_path, type, mode=None): - """ - Add a file or directory - Parameters - ---------- - file_path of the file to add - type = "file" or "dir" - mode Ex:- rwxr-xr-x - - """ - - data = {"type": type, "mode": mode} - - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}fs/{}".format(self.request_endpoint, file_path.lstrip("/")) - response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) - return response_json + """Deprecated function. Please use uss.create() instead""" + return self.uss.create(file_path, type, mode) def get_dsn_content_streamed(self, dataset_name): - """Retrieve the contents of a given dataset streamed. - - Returns - ------- - response - A response object from the requests library - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - response = self.request_handler.perform_request("GET", custom_args, stream = True) - return response + """Deprecated function. Please use dsn.get_content() instead""" + return self.dsn.get_content(dataset_name, stream=True) def get_dsn_binary_content(self, dataset_name, with_prefixes=False): - """ - Retrieve the contents of a given dataset as a binary bytes object. - - Parameters - ---------- - dataset_name: str - Name of the dataset to retrieve - with_prefixes: boolean - if True include a 4 byte big endian record len prefix - default: False - Returns - ------- - response - A response object from the requests library - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - custom_args["headers"]["Accept"] = "application/octet-stream" - if with_prefixes: - custom_args["headers"]["X-IBM-Data-Type"] = "record" - else: - custom_args["headers"]["X-IBM-Data-Type"] = "binary" - response = self.request_handler.perform_request("GET", custom_args) - return response + """Deprecated function. Please use dsn.get_binary_content() instead""" + return self.dsn.get_binary_content(dataset_name, with_prefixes) def get_dsn_binary_content_streamed(self, dataset_name, with_prefixes=False): - """ - Retrieve the contents of a given dataset as a binary bytes object streamed. - - Parameters - ---------- - dataset_name: str - Name of the dataset to retrieve - with_prefixes: boolean - if True include a 4 byte big endian record len prefix - default: False - Returns - ------- - response - A response object from the requests library - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - custom_args["headers"]["Accept"] = "application/octet-stream" - if with_prefixes: - custom_args["headers"]["X-IBM-Data-Type"] = "record" - else: - custom_args["headers"]["X-IBM-Data-Type"] = "binary" - response = self.request_handler.perform_request("GET", custom_args, stream = True) - return response + """Deprecated function. Please use dsn.get_binary_content() instead""" + return self.dsn.get_binary_content(dataset_name, with_prefixes, stream=True) def write_to_dsn(self, dataset_name, data, encoding=_ZOWE_FILES_DEFAULT_ENCODING): - """Write content to an existing dataset. - - Returns - ------- - json - A JSON containing the result of the operation - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - custom_args["data"] = data - custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204, 201]) - return response_json + """Deprecated function. Please use dsn.write() instead""" + return self.dsn.write(dataset_name, data, encoding) def download_dsn(self, dataset_name, output_file): - """Retrieve the contents of a dataset and saves it to a given file.""" - response = self.get_dsn_content_streamed(dataset_name) - with open(output_file, "w", encoding="utf-8") as f: - for chunk in response.iter_content(chunk_size=4096, decode_unicode=True): - f.write(chunk) + """Deprecated function. Please use dsn.download() instead""" + self.dsn.download(dataset_name, output_file) def download_binary_dsn(self, dataset_name, output_file, with_prefixes=False): - """Retrieve the contents of a binary dataset and saves it to a given file. - - Parameters - ---------- - dataset_name:str - Name of the dataset to download - output_file:str - Name of the local file to create - with_prefixes:boolean - If true, include a four big endian bytes record length prefix. - The default is False - """ - response = self.get_dsn_binary_content_streamed(dataset_name, with_prefixes=with_prefixes) - with open(output_file, "wb") as f: - for chunk in response.iter_content(chunk_size=4096): - f.write(chunk) + """Deprecated function. Please use dsn.download_binary() instead""" + self.dsn.download_binary(dataset_name, output_file, with_prefixes) def upload_file_to_dsn(self, input_file, dataset_name, encoding=_ZOWE_FILES_DEFAULT_ENCODING): - """Upload contents of a given file and uploads it to a dataset.""" - if os.path.isfile(input_file): - with open(input_file, "rb") as in_file: - response_json = self.write_to_dsn(dataset_name, in_file) - else: - self.logger.error(f"File {input_file} not found.") - raise FileNotFound(input_file) + """Deprecated function. Please use dsn.upload_file() instead""" + self.dsn.upload_file(input_file, dataset_name, encoding) def write_to_uss(self, filepath_name, data, encoding=_ZOWE_FILES_DEFAULT_ENCODING): - """Write content to an existing UNIX file. - Returns - ------- - json - A JSON containing the result of the operation - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}fs/{}".format(self.request_endpoint, filepath_name.lstrip("/")) - custom_args["data"] = data - custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204, 201]) - return response_json + """Deprecated function. Please use uss.write() instead""" + return self.uss.write(filepath_name, data, encoding) def upload_file_to_uss(self, input_file, filepath_name, encoding=_ZOWE_FILES_DEFAULT_ENCODING): - """Upload contents of a given file and uploads it to UNIX file""" - if os.path.isfile(input_file): - with open(input_file, "r", encoding="utf-8") as in_file: - response_json = self.write_to_uss(filepath_name, in_file) - else: - self.logger.error(f"File {input_file} not found.") - raise FileNotFound(input_file) - - def get_file_content_streamed(self, file_path, binary=False): - """Retrieve the contents of a given USS file streamed. - - Returns - ------- - response - A response object from the requests library - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}fs/{}".format(self.request_endpoint, self._encode_uri_component(file_path.lstrip("/"))) - if binary: - custom_args["headers"]["X-IBM-Data-Type"] = "binary" - response = self.request_handler.perform_request("GET", custom_args, stream=True) - return response + """Deprecated function. Please use uss.upload() instead""" + self.uss.upload(input_file, filepath_name, encoding) def download_uss(self, file_path, output_file, binary=False): - """Retrieve the contents of a USS file and saves it to a local file.""" - response = self.get_file_content_streamed(file_path, binary) - with open(output_file, "wb" if binary else "w", encoding="utf-8") as f: - for chunk in response.iter_content(chunk_size=4096, decode_unicode=not binary): - f.write(chunk) + """Deprecated function. Please use uss.download() instead""" + self.uss.download(file_path, output_file, binary) def delete_data_set(self, dataset_name, volume=None, member_name=None): - """Deletes a sequential or partitioned data.""" - custom_args = self._create_custom_request_arguments() - if member_name is not None: - dataset_name = f"{dataset_name}({member_name})" - url = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - if volume is not None: - url = "{}ds/-{}/{}".format(self.request_endpoint, volume, self._encode_uri_component(dataset_name)) - custom_args["url"] = url - response_json = self.request_handler.perform_request("DELETE", custom_args, expected_code=[200, 202, 204]) - return response_json + """Deprecated function. Please use dsn.delete() instead""" + return self.dsn.delete(dataset_name, volume, member_name) def create_zFS_file_system(self, file_system_name, options={}): - """ - Create a z/OS UNIX zFS Filesystem. - - Parameter - --------- - file_system_name: str - the name for the file system - - Returns - ------- - json - A JSON containing the result of the operation - """ - for key, value in options.items(): - if key == "perms": - if value < 0 or value > 777: - self.logger.error("Invalid Permissions Option.") - raise exceptions.InvalidPermsOption(value) - - if key == "cylsPri" or key == "cylsSec": - if value > constants.zos_file_constants["MaxAllocationQuantity"]: - self.logger.error("Maximum allocation quantity exceeded.") - raise exceptions.MaxAllocationQuantityExceeded - - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}mfs/zfs/{}".format(self.request_endpoint, file_system_name) - custom_args["json"] = options - response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) - return response_json + """Deprecated function. Please use fs.create() instead""" + return self.fs.create(file_system_name, options) def delete_zFS_file_system(self, file_system_name): - """ - Deletes a zFS Filesystem - """ - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}mfs/zfs/{}".format(self.request_endpoint, file_system_name) - response_json = self.request_handler.perform_request("DELETE", custom_args, expected_code=[204]) - return response_json + """Deprecated function. Please use fs.delete() instead""" + return self.fs.delete(file_system_name) def mount_file_system(self, file_system_name, mount_point, options={}, encoding=_ZOWE_FILES_DEFAULT_ENCODING): - """Mounts a z/OS UNIX file system on a specified directory. - Parameter - --------- - file_system_name: str - the name for the file system - mount_point: str - mount point to be used for mounting the UNIX file system - options: dict - A JSON of request body options - - Returns - ------- - json - A JSON containing the result of the operation - """ - options["action"] = "mount" - options["mount-point"] = mount_point - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}mfs/{}".format(self.request_endpoint, file_system_name) - custom_args["json"] = options - custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204]) - return response_json + """Deprecated function. Please use fs.mount() instead""" + return self.fs.mount(file_system_name, mount_point, options, encoding) def unmount_file_system(self, file_system_name, options={}, encoding=_ZOWE_FILES_DEFAULT_ENCODING): - """Unmounts a z/OS UNIX file system on a specified directory. - - Parameter - --------- - file_system_name: str - the name for the file system - options: dict - A JSON of request body options - - Returns - ------- - json - A JSON containing the result of the operation - """ - options["action"] = "unmount" - custom_args = self._create_custom_request_arguments() - custom_args["url"] = "{}mfs/{}".format(self.request_endpoint, file_system_name) - custom_args["json"] = options - custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204]) - return response_json + """Deprecated function. Please use fs.unmount() instead""" + return self.fs.unmount(file_system_name, options, encoding) def list_unix_file_systems(self, file_path_name=None, file_system_name=None): - """ - list all mounted filesystems, or the specific filesystem mounted at a given path, or the - filesystem with a given Filesystem name. - - Parameter - --------- - file_path: str - the UNIX directory that contains the files and directories to be listed. - file_system_name: str - the name for the file system to be listed - - Returns - ------- - json - A JSON containing the result of the operation - """ - custom_args = self._create_custom_request_arguments() - - custom_args["params"] = {"path": file_path_name, "fsname": file_system_name} - custom_args["url"] = "{}mfs".format(self.request_endpoint) - response_json = self.request_handler.perform_request("GET", custom_args, expected_code=[200]) - return response_json + """Deprecated function. Please use fs.list() instead""" + return self.fs.list(file_path_name, file_system_name) def recall_migrated_dataset(self, dataset_name: str, wait=False): - """ - Recalls a migrated data set. - - Parameters - ---------- - dataset_name: str - Name of the data set - - wait: bool - If true, the function waits for completion of the request, otherwise the request is queued - - Returns - ------- - json - A JSON containing the result of the operation - """ - - data = {"request": "hrecall", "wait": wait} - - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) - return response_json + """Deprecated function. Please use dsn.recall_migrated() instead""" + return self.dsn.recall_migrated(dataset_name, wait) def delete_migrated_data_set(self, dataset_name: str, purge=False, wait=False): - """ - Deletes migrated data set. - - Parameters - ---------- - dataset_name: str - Name of the data set - - purge: bool - If true, the function uses the PURGE=YES on ARCHDEL request, otherwise it uses the PURGE=NO. - - wait: bool - If true, the function waits for completion of the request, otherwise the request is queued. - - Returns - ------- - json - A JSON containing the result of the operation - """ - - data = { - "request": "hdelete", - "purge": purge, - "wait": wait, - } - - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) - return response_json + """Deprecated function. Please use dsn.delete_migrated() instead""" + return self.dsn.delete_migrated(dataset_name, purge, wait) def migrate_data_set(self, dataset_name: str, wait=False): - """ - Migrates the data set. - - Parameters - ---------- - dataset_name: str - Name of the data set - - wait: bool - If true, the function waits for completion of the request, otherwise the request is queued. - - Returns - ------- - json - A JSON containing the result of the operation - """ - - data = {"request": "hmigrate", "wait": wait} - - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(dataset_name)) - - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) - return response_json + """Deprecated function. Please use dsn.migrate() instead""" + return self.dsn.migrate(dataset_name, wait) def rename_dataset(self, before_dataset_name: str, after_dataset_name: str): - """ - Renames the data set. - - Parameters - ---------- - before_dataset_name: str - The source data set name. - - after_dataset_name: str - New name for the source data set. - - Returns - ------- - json - A JSON containing the result of the operation - """ - - data = {"request": "rename", "from-dataset": {"dsn": before_dataset_name.strip()}} - - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}ds/{}".format( - self.request_endpoint, self._encode_uri_component(after_dataset_name).strip() - ) - - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) - return response_json + """Deprecated function. Please use dsn.rename() instead""" + return self.dsn.rename(before_dataset_name, after_dataset_name) def rename_dataset_member(self, dataset_name: str, before_member_name: str, after_member_name: str, enq=""): - """ - Renames the data set member. - - Parameters - ---------- - dataset_name: str - Name of the data set. - - before_member_name: str - The source member name. - - after_member_name: str - New name for the source member. - - enq: str - Values can be SHRW or EXCLU. SHRW is the default for PDS members, EXCLU otherwise. - - Returns - ------- - json - A JSON containing the result of the operation - """ - - data = { - "request": "rename", - "from-dataset": { - "dsn": dataset_name.strip(), - "member": before_member_name.strip(), - }, - } - - path_to_member = dataset_name.strip() + "(" + after_member_name.strip() + ")" - - if enq: - if enq in ("SHRW", "EXCLU"): - data["enq"] = enq.strip() - else: - self.logger.error("Invalid value for enq.") - raise ValueError("Invalid value for enq.") - - custom_args = self._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "{}ds/{}".format(self.request_endpoint, self._encode_uri_component(path_to_member)) - - response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[200]) - return response_json + """Deprecated function. Please use dsn.rename_member() instead""" + return self.dsn.rename_member(dataset_name, before_member_name, after_member_name, enq) diff --git a/src/zos_files/zowe/zos_files_for_zowe_sdk/uss.py b/src/zos_files/zowe/zos_files_for_zowe_sdk/uss.py new file mode 100644 index 00000000..d4df54a3 --- /dev/null +++ b/src/zos_files/zowe/zos_files_for_zowe_sdk/uss.py @@ -0,0 +1,163 @@ +"""Zowe Python Client SDK. + +This program and the accompanying materials are made available under the terms of the +Eclipse Public License v2.0 which accompanies this distribution, and is available at + +https://www.eclipse.org/legal/epl-v20.html + +SPDX-License-Identifier: EPL-2.0 + +Copyright Contributors to the Zowe Project. +""" + +import os + +from zowe.core_for_zowe_sdk import SdkApi +from zowe.core_for_zowe_sdk.exceptions import FileNotFound +from zowe.zos_files_for_zowe_sdk import constants, exceptions +from zowe.zos_files_for_zowe_sdk.constants import FileType, zos_file_constants + +_ZOWE_FILES_DEFAULT_ENCODING = "utf-8" + +class USSFiles(SdkApi): + """ + Class used to represent the base z/OSMF USSFiles API + which includes all operations related to USS files. + + ... + + Attributes + ---------- + connection + connection object + """ + + def __init__(self, connection): + """ + Construct a USSFiles object. + + Parameters + ---------- + connection + The z/OSMF connection object (generated by the ZoweSDK object) + + Also update header to accept gzip encoded responses + """ + super().__init__(connection, "/zosmf/restfiles/", logger_name=__name__) + self.default_headers["Accept-Encoding"] = "gzip" + + def list(self, path): + """Retrieve a list of USS files based on a given pattern. + + Returns + ------- + json + A JSON with a list of dataset names matching the given pattern + """ + custom_args = self._create_custom_request_arguments() + custom_args["params"] = {"path": path} + custom_args["url"] = "{}fs".format(self.request_endpoint) + response_json = self.request_handler.perform_request("GET", custom_args) + return response_json + + def delete(self, filepath_name, recursive=False): + """ + Delete a file or directory + + Parameters + ---------- + filepath of the file to be deleted + + recursive + If specified as True, all the files and sub-directories will be deleted. + + Returns + ------- + 204 + HTTP Response for No Content + """ + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}fs/{}".format(self.request_endpoint, filepath_name.lstrip("/")) + if recursive: + custom_args["headers"]["X-IBM-Option"] = "recursive" + + response_json = self.request_handler.perform_request("DELETE", custom_args, expected_code=[204]) + return response_json + + def create(self, file_path, type, mode=None): + """ + Add a file or directory + Parameters + ---------- + file_path of the file to add + type = "file" or "dir" + mode Ex:- rwxr-xr-x + + """ + + data = {"type": type, "mode": mode} + + custom_args = self._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "{}fs/{}".format(self.request_endpoint, file_path.lstrip("/")) + response_json = self.request_handler.perform_request("POST", custom_args, expected_code=[201]) + return response_json + + def write(self, filepath_name, data, encoding=_ZOWE_FILES_DEFAULT_ENCODING): + """Write content to an existing UNIX file. + Returns + ------- + json + A JSON containing the result of the operation + """ + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}fs/{}".format(self.request_endpoint, filepath_name.lstrip("/")) + custom_args["data"] = data + custom_args["headers"]["Content-Type"] = "text/plain; charset={}".format(encoding) + response_json = self.request_handler.perform_request("PUT", custom_args, expected_code=[204, 201]) + return response_json + + def get_content(self, filepath_name): + """Retrieve the content of a filename. The complete path must be specified. + + Returns + ------- + json + A JSON with the contents of the specified USS file + """ + custom_args = self._create_custom_request_arguments() + # custom_args["params"] = {"filepath-name": filepath_name} + custom_args["url"] = "{}fs{}".format(self.request_endpoint, filepath_name) + response_json = self.request_handler.perform_request("GET", custom_args) + return response_json + + def get_content_streamed(self, file_path, binary=False): + """Retrieve the contents of a given USS file streamed. + + Returns + ------- + response + A response object from the requests library + """ + custom_args = self._create_custom_request_arguments() + custom_args["url"] = "{}fs/{}".format(self.request_endpoint, self._encode_uri_component(file_path.lstrip("/"))) + if binary: + custom_args["headers"]["X-IBM-Data-Type"] = "binary" + response = self.request_handler.perform_request("GET", custom_args, stream=True) + return response + + def download(self, file_path, output_file, binary=False): + """Retrieve the contents of a USS file and saves it to a local file.""" + response = self.get_content_streamed(file_path, binary) + with open(output_file, "wb" if binary else "w", encoding="utf-8") as f: + for chunk in response.iter_content(chunk_size=4096, decode_unicode=not binary): + f.write(chunk) + + def upload(self, input_file, filepath_name, encoding=_ZOWE_FILES_DEFAULT_ENCODING): + """Upload contents of a given file and uploads it to UNIX file""" + if os.path.isfile(input_file): + with open(input_file, "r", encoding="utf-8") as in_file: + response_json = self.write(filepath_name, in_file) + else: + self.logger.error(f"File {input_file} not found.") + raise FileNotFound(input_file) \ No newline at end of file diff --git a/tests/unit/test_zos_files.py b/tests/unit/test_zos_files.py index 504d9285..eda0991b 100644 --- a/tests/unit/test_zos_files.py +++ b/tests/unit/test_zos_files.py @@ -2,7 +2,7 @@ import re from unittest import TestCase, mock -from zowe.zos_files_for_zowe_sdk import Files, exceptions +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets class TestFilesClass(TestCase): @@ -176,7 +176,7 @@ def test_recall_migrated_dataset_parameterized(self): files_test_profile = Files(self.test_profile) for test_case in test_values: - files_test_profile.request_handler.perform_request = mock.Mock() + files_test_profile.dsn.request_handler.perform_request = mock.Mock() data = {"request": "hrecall", "wait": test_case[1]} @@ -184,7 +184,7 @@ def test_recall_migrated_dataset_parameterized(self): custom_args = files_test_profile._create_custom_request_arguments() custom_args["json"] = data custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) - files_test_profile.request_handler.perform_request.assert_called_once_with( + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( "PUT", custom_args, expected_code=[200] ) @@ -211,7 +211,7 @@ def test_delete_migrated_data_set_parameterized(self): files_test_profile = Files(self.test_profile) for test_case in test_values: - files_test_profile.request_handler.perform_request = mock.Mock() + files_test_profile.dsn.request_handler.perform_request = mock.Mock() data = { "request": "hdelete", @@ -223,7 +223,7 @@ def test_delete_migrated_data_set_parameterized(self): custom_args = files_test_profile._create_custom_request_arguments() custom_args["json"] = data custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) - files_test_profile.request_handler.perform_request.assert_called_once_with( + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( "PUT", custom_args, expected_code=[200] ) @@ -248,7 +248,7 @@ def test_migrate_data_set_parameterized(self): files_test_profile = Files(self.test_profile) for test_case in test_values: - files_test_profile.request_handler.perform_request = mock.Mock() + files_test_profile.dsn.request_handler.perform_request = mock.Mock() data = { "request": "hmigrate", @@ -260,7 +260,7 @@ def test_migrate_data_set_parameterized(self): custom_args = files_test_profile._create_custom_request_arguments() custom_args["json"] = data custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) - files_test_profile.request_handler.perform_request.assert_called_once_with( + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( "PUT", custom_args, expected_code=[200] ) @@ -283,7 +283,7 @@ def test_rename_dataset_parametrized(self): files_test_profile = Files(self.test_profile) for test_case in test_values: - files_test_profile.request_handler.perform_request = mock.Mock() + files_test_profile.dsn.request_handler.perform_request = mock.Mock() data = { "request": "rename", @@ -297,7 +297,7 @@ def test_rename_dataset_parametrized(self): custom_args = files_test_profile._create_custom_request_arguments() custom_args["json"] = data custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][1]) - files_test_profile.request_handler.perform_request.assert_called_once_with( + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( "PUT", custom_args, expected_code=[200] ) @@ -328,7 +328,7 @@ def test_rename_dataset_member_parametrized(self): files_test_profile = Files(self.test_profile) for test_case in test_values: - files_test_profile.request_handler.perform_request = mock.Mock() + files_test_profile.dsn.request_handler.perform_request = mock.Mock() data = { "request": "rename", @@ -349,7 +349,7 @@ def test_rename_dataset_member_parametrized(self): self.assertNotRegex(ds_path_adjusted, r"[\$\@\#]") self.assertRegex(ds_path_adjusted, r"[\(" + re.escape(test_case[0][2]) + r"\)]") custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(ds_path_adjusted) - files_test_profile.request_handler.perform_request.assert_called_once_with( + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( "PUT", custom_args, expected_code=[200] ) else: @@ -523,14 +523,14 @@ def test_create_dataset_parameterized(self): files_test_profile = Files(self.test_profile) for test_case in test_values: - files_test_profile.request_handler.perform_request = mock.Mock() + files_test_profile.dsn.request_handler.perform_request = mock.Mock() if test_case[1]: files_test_profile.create_data_set(*test_case[0]) custom_args = files_test_profile._create_custom_request_arguments() custom_args["json"] = test_case[0][1] custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][0]) - files_test_profile.request_handler.perform_request.assert_called_once_with( + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( "POST", custom_args, expected_code=[201] ) else: @@ -562,7 +562,7 @@ def test_create_default_dataset_parameterized(self): files_test_profile = Files(self.test_profile) for test_case in test_values: - files_test_profile.request_handler.perform_request = mock.Mock() + files_test_profile.dsn.request_handler.perform_request = mock.Mock() options = { "partitioned": { @@ -616,7 +616,7 @@ def test_create_default_dataset_parameterized(self): custom_args = files_test_profile._create_custom_request_arguments() custom_args["json"] = options.get(test_case[0][1]) custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][0]) - files_test_profile.request_handler.perform_request.assert_called_once_with( + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( "POST", custom_args, expected_code=[201] ) else: From 58a0854c157cb187fe81cc453a69400b4f6c2026 Mon Sep 17 00:00:00 2001 From: pem70 Date: Thu, 30 May 2024 10:04:06 -0400 Subject: [PATCH 2/6] Update CHANGELOG.md Signed-off-by: pem70 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bc3d0ae..97c16b80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ All notable changes to the Zowe Client Python SDK will be documented in this fil ### Enhancements -- Added logger class to core SDK [#185](https://github.com/zowe/zowe-client-python-sdk/issues/185) +- Refactor Files class into proper classes [#264](https://github.com/zowe/zowe-client-python-sdk/issues/264) ## `1.0.0-dev15` From a4380d517f0b599c9228899dd26f3da8f37b7862 Mon Sep 17 00:00:00 2001 From: pem70 Date: Thu, 30 May 2024 14:45:57 -0400 Subject: [PATCH 3/6] organize some tests Signed-off-by: pem70 --- tests/unit/core/test_api_connection.py | 35 +++ tests/unit/core/test_config.py | 71 +++++ tests/unit/core/test_request_handler.py | 70 +++++ tests/unit/files/datasets/test_copy.py | 74 +++++ tests/unit/files/datasets/test_create.py | 285 ++++++++++++++++++ tests/unit/files/datasets/test_list.py | 29 ++ tests/unit/files/datasets/test_migrate.py | 127 ++++++++ tests/unit/files/datasets/test_rename.py | 111 +++++++ .../files/file_systems/test_file_systems.py | 69 +++++ tests/unit/files/uss/test_uss.py | 27 ++ tests/unit/test_zowe_core.py | 2 +- 11 files changed, 899 insertions(+), 1 deletion(-) create mode 100644 tests/unit/core/test_api_connection.py create mode 100644 tests/unit/core/test_config.py create mode 100644 tests/unit/core/test_request_handler.py create mode 100644 tests/unit/files/datasets/test_copy.py create mode 100644 tests/unit/files/datasets/test_create.py create mode 100644 tests/unit/files/datasets/test_list.py create mode 100644 tests/unit/files/datasets/test_migrate.py create mode 100644 tests/unit/files/datasets/test_rename.py create mode 100644 tests/unit/files/file_systems/test_file_systems.py create mode 100644 tests/unit/files/uss/test_uss.py diff --git a/tests/unit/core/test_api_connection.py b/tests/unit/core/test_api_connection.py new file mode 100644 index 00000000..0eb5de2a --- /dev/null +++ b/tests/unit/core/test_api_connection.py @@ -0,0 +1,35 @@ +import unittest + +from zowe.core_for_zowe_sdk import ( + ApiConnection, + exceptions +) + +class TestApiConnectionClass(unittest.TestCase): + """ApiConnection class unit tests.""" + + def setUp(self): + """Setup ApiConnection fixtures.""" + self.url = "https://mock-url.com" + self.user = "Username" + self.password = "Password" + + def test_object_should_be_instance_of_class(self): + """Created object should be instance of ApiConnection class.""" + api_connection = ApiConnection(self.url, self.user, self.password) + self.assertIsInstance(api_connection, ApiConnection) + + def test_object_should_raise_custom_error_without_url(self): + """Instantiation of ApiConnection object should raise MissingConnectionArgs if host_url is blank.""" + with self.assertRaises(exceptions.MissingConnectionArgs): + ApiConnection(host_url="", user=self.user, password=self.password) + + def test_object_should_raise_custom_error_without_user(self): + """Instantiation of ApiConnection object should raise MissingConnectionArgs if user is blank.""" + with self.assertRaises(exceptions.MissingConnectionArgs): + ApiConnection(host_url=self.url, user="", password=self.password) + + def test_object_should_raise_custom_error_without_password(self): + """Instantiation of ApiConnection object should raise MissingConnectionArgs if password is blank.""" + with self.assertRaises(exceptions.MissingConnectionArgs): + ApiConnection(host_url=self.url, user=self.user, password="") \ No newline at end of file diff --git a/tests/unit/core/test_config.py b/tests/unit/core/test_config.py new file mode 100644 index 00000000..559faa89 --- /dev/null +++ b/tests/unit/core/test_config.py @@ -0,0 +1,71 @@ +import importlib.util +import os + +import commentjson +from jsonschema import ValidationError, validate +from pyfakefs.fake_filesystem_unittest import TestCase +from zowe.core_for_zowe_sdk.validators import validate_config_json + +FIXTURES_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "fixtures") +CWD = os.getcwd() +CRED_DICT: dict = {} +SECURE_CONFIG_PROPS: bytes + + +def keyring_get_password(serviceName: str, username: str): + global SECURE_CONFIG_PROPS + return SECURE_CONFIG_PROPS + + +def keyring_get_password_exception(): + raise Exception + +class TestValidateConfigJsonClass(TestCase): + """Testing the validate_config_json function""" + + def setUp(self): + self.setUpPyfakefs() + loader = importlib.util.find_spec("jsonschema") + module_path = loader.origin + self.fs.add_real_directory(os.path.dirname(module_path)) + self.original_file_path = os.path.join(FIXTURES_PATH, "zowe.config.json") + self.original_schema_file_path = os.path.join(FIXTURES_PATH, "zowe.schema.json") + self.fs.add_real_file(self.original_file_path) + self.fs.add_real_file(self.original_schema_file_path) + + def test_validate_config_json_valid(self): + """Test validate_config_json with valid config.json matching schema.json""" + config_json = commentjson.load(open(self.original_file_path)) + schema_json = commentjson.load(open(self.original_schema_file_path)) + + expected = validate(config_json, schema_json) + result = validate_config_json(self.original_file_path, self.original_schema_file_path, cwd=FIXTURES_PATH) + + self.assertEqual(result, expected) + + def test_validate_config_json_invalid(self): + """Test validate_config_json with invalid config.json that does not match schema.json""" + custom_dir = os.path.dirname(FIXTURES_PATH) + path_to_invalid_config = os.path.join(custom_dir, "invalid.zowe.config.json") + path_to_invalid_schema = os.path.join(custom_dir, "invalid.zowe.schema.json") + + with open(self.original_file_path, "r") as f: + original_config = commentjson.load(f) + original_config["$schema"] = "invalid.zowe.schema.json" + original_config["profiles"]["zosmf"]["properties"]["port"] = "10443" + with open(path_to_invalid_config, "w") as f: + commentjson.dump(original_config, f) + with open(self.original_schema_file_path, "r") as f: + original_schema = commentjson.load(f) + with open(path_to_invalid_schema, "w") as f: + commentjson.dump(original_schema, f) + invalid_config_json = commentjson.load(open(path_to_invalid_config)) + invalid_schema_json = commentjson.load(open(path_to_invalid_schema)) + + with self.assertRaises(ValidationError) as expected_info: + validate(invalid_config_json, invalid_schema_json) + + with self.assertRaises(ValidationError) as actual_info: + validate_config_json(path_to_invalid_config, path_to_invalid_schema, cwd=FIXTURES_PATH) + + self.assertEqual(str(actual_info.exception), str(expected_info.exception)) \ No newline at end of file diff --git a/tests/unit/core/test_request_handler.py b/tests/unit/core/test_request_handler.py new file mode 100644 index 00000000..952ab0c1 --- /dev/null +++ b/tests/unit/core/test_request_handler.py @@ -0,0 +1,70 @@ +"""Unit tests for the Zowe Python SDK Core package.""" + +# Including necessary paths +import unittest +from unittest import mock + +from zowe.core_for_zowe_sdk import ( + RequestHandler, + exceptions +) + +class TestRequestHandlerClass(unittest.TestCase): + """RequestHandler class unit tests.""" + + def setUp(self): + """Setup fixtures for RequestHandler class.""" + self.session_arguments = {"verify": False} + + def test_object_should_be_instance_of_class(self): + """Created object should be instance of RequestHandler class.""" + request_handler = RequestHandler(self.session_arguments) + self.assertIsInstance(request_handler, RequestHandler) + + @mock.patch("logging.Logger.debug") + @mock.patch("logging.Logger.error") + @mock.patch("requests.Session.send") + def test_perform_streamed_request(self, mock_send_request, mock_logger_error: mock.MagicMock, mock_logger_debug: mock.MagicMock): + """Performing a streamed request should call 'send_request' method""" + mock_send_request.return_value = mock.Mock(status_code=200) + request_handler = RequestHandler(self.session_arguments) + request_handler.perform_request("GET", {"url": "https://www.zowe.org"}, stream = True) + + mock_logger_error.assert_not_called() + mock_logger_debug.assert_called() + self.assertIn("Request method: GET", mock_logger_debug.call_args[0][0]) + mock_send_request.assert_called_once() + self.assertTrue(mock_send_request.call_args[1]["stream"]) + + + @mock.patch("logging.Logger.error") + def test_logger_unmatched_status_code(self, mock_logger_error: mock.MagicMock): + """Test logger with unexpeceted status code""" + request_handler = RequestHandler(self.session_arguments) + try: + request_handler.perform_request("GET", {"url": "https://www.zowe.org"}, expected_code= [0], stream = True) + except exceptions.UnexpectedStatus: + mock_logger_error.assert_called_once() + self.assertIn("The status code", mock_logger_error.call_args[0][0]) + + @mock.patch("logging.Logger.error") + def test_logger_perform_request_invalid_method(self, mock_logger_error: mock.MagicMock): + """Test logger with invalid request method""" + request_handler = RequestHandler(self.session_arguments) + try: + request_handler.perform_request("Invalid method", {"url": "https://www.zowe.org"}, stream = True) + except exceptions.InvalidRequestMethod: + mock_logger_error.assert_called_once() + self.assertIn("Invalid HTTP method input", mock_logger_error.call_args[0][0]) + + @mock.patch("logging.Logger.error") + @mock.patch("requests.Session.send") + def test_logger_invalid_status_code(self, mock_send_request, mock_logger_error: mock.MagicMock): + mock_send_request.return_value = mock.Mock(ok=False) + request_handler = RequestHandler(self.session_arguments) + try: + request_handler.perform_request("GET", {"url": "https://www.zowe.org"}, stream = True) + except exceptions.RequestFailed: + mock_logger_error.assert_called_once() + self.assertIn("HTTP Request has failed", mock_logger_error.call_args[0][0]) + mock_logger_error.assert_called_once \ No newline at end of file diff --git a/tests/unit/files/datasets/test_copy.py b/tests/unit/files/datasets/test_copy.py new file mode 100644 index 00000000..2e229d31 --- /dev/null +++ b/tests/unit/files/datasets/test_copy.py @@ -0,0 +1,74 @@ +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestCreateClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_copy_uss_to_dataset(self, mock_send_request): + """Test copy_uss_to_dataset sends a request""" + + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).copy_uss_to_dataset("from_filename", "to_dataset_name", "to_member_name", replace=True) + + mock_send_request.assert_called_once() + + def test_copy_dataset_or_member_raises_exception(self): + """Test copying a data set or member raises error when assigning invalid values to enq parameter""" + + test_case = { + "from_dataset_name": "MY.OLD.DSN", + "to_dataset_name": "MY.NEW.DSN", + "from_member_name": "MYMEM1", + "to_member_name": "MYMEM2", + "enq": "RANDOM", + "replace": True, + } + with self.assertRaises(ValueError) as e_info: + Files(self.test_profile).copy_dataset_or_member(**test_case) + self.assertEqual(str(e_info.exception), "Invalid value for enq.") + + @mock.patch("requests.Session.send") + def test_copy_dataset_or_member(self, mock_send_request): + """Test copying a data set or member sends a request""" + + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + test_values = [ + { + "from_dataset_name": "MY.OLD.DSN", + "to_dataset_name": "MY.NEW.DSN", + "from_member_name": "MYMEM1", + "to_member_name": "MYMEM2", + "volser": "ABC", + "alias": False, + "enq": "SHRW", + "replace": False, + }, + { + "from_dataset_name": "MY.OLD.DSN", + "to_dataset_name": "MY.NEW.DSN", + "from_member_name": "MYMEM1", + "to_member_name": "MYMEM2", + "volser": "ABC", + "alias": True, + "enq": "SHRW", + "replace": True, + }, + ] + for test_case in test_values: + Files(self.test_profile).copy_dataset_or_member(**test_case) + mock_send_request.assert_called() \ No newline at end of file diff --git a/tests/unit/files/datasets/test_create.py b/tests/unit/files/datasets/test_create.py new file mode 100644 index 00000000..d6e24127 --- /dev/null +++ b/tests/unit/files/datasets/test_create.py @@ -0,0 +1,285 @@ +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestCreateClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_create_data_set_accept_valid_recfm(self, mock_send_request): + """Test if create dataset does accept all accepted record formats""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) + for recfm in ["F", "FB", "V", "VB", "U", "FBA", "FBM", "VBA", "VBM"]: + Files(self.test_profile).create_data_set( + "DSNAME123", + options={ + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "dirblk": 5, + "recfm": recfm, + "blksize": 6160, + "lrecl": 80, + }, + ) + mock_send_request.assert_called() + + def test_create_data_set_does_not_accept_invalid_recfm(self): + """Test if create dataset raises an error for invalid record formats""" + with self.assertRaises(KeyError): + Files(self.test_profile).create_data_set( + "DSNAME123", + options={ + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "dirblk": 5, + "recfm": "XX", + "blksize": 6160, + "lrecl": 80, + }, + ) + + def test_create_data_set_raises_error_without_required_arguments(self): + """Test not providing required arguments raises an error""" + with self.assertRaises(ValueError) as e_info: + obj = Files(self.test_profile).create_data_set( + "DSNAME123", options={"alcunit": "CYL", "dsorg": "PO", "recfm": "FB", "blksize": 6160, "dirblk": 25} + ) + self.assertEqual(str(e_info.exception), "If 'like' is not specified, you must specify 'primary' or 'lrecl'.") + + def test_create_data_set_raises_error_with_invalid_arguments_parameterized(self): + """Test not providing valid arguments raises an error""" + test_values = [ + { + "alcunit": "invalid", + "dsorg": "PO", + "primary": 1, + "dirblk": 5, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + }, + { + "dsorg": "PO", + "alcunit": "CYL", + "primary": 1, + "recfm": "invalid", + "blksize": 32760, + "lrecl": 260, + "dirblk": 25, + }, + { + "alcunit": "CYL", + "dsorg": "invalid", + "primary": 1, + "dirblk": 5, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + }, + { + "dsorg": "PO", + "alcunit": "CYL", + "primary": 10, + "recfm": "U", + "blksize": 27998, + "lrecl": 27998, + "dirblk": 0, + }, + { + "alcunit": "CYL", + "dsorg": "PO", + "primary": 99777215, + "dirblk": 5, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + }, + ] + + for test_case in test_values: + with self.assertRaises((KeyError, ValueError)): + obj = Files(self.test_profile).create_data_set("MY.OLD.DSN", options=test_case) + + def test_create_dataset_parameterized(self): + """Test create dataset with different values""" + test_values = [ + ( + ( + "DSN", + { + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "dirblk": 5, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + }, + ), + True, + ), + ( + ( + "DSN", + { + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + "dirblk": 25, + }, + ), + True, + ), + ( + ( + "DSN", + { + "dsorg": "PO", + "alcunit": "CYL", + "primary": 1, + "recfm": "VB", + "blksize": 32760, + "lrecl": 260, + "dirblk": 25, + }, + ), + True, + ), + ( + ("DSN", {"alcunit": "CYL", "dsorg": "PS", "primary": 1, "recfm": "FB", "blksize": 6160, "lrecl": 80}), + True, + ), + ( + ( + "DSN", + { + "alcunit": "CYL", + "dsorg": "PS", + "recfm": "FB", + "blksize": 6160, + }, + ), + False, + ), + ] + + files_test_profile = Files(self.test_profile) + + for test_case in test_values: + files_test_profile.dsn.request_handler.perform_request = mock.Mock() + + if test_case[1]: + files_test_profile.create_data_set(*test_case[0]) + custom_args = files_test_profile._create_custom_request_arguments() + custom_args["json"] = test_case[0][1] + custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][0]) + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( + "POST", custom_args, expected_code=[201] + ) + else: + with self.assertRaises(ValueError) as e_info: + files_test_profile.create_data_set(*test_case[0]) + self.assertEqual( + str(e_info.exception), "If 'like' is not specified, you must specify 'primary' or 'lrecl'." + ) + + @mock.patch("requests.Session.send") + def test_create_default_dataset(self, mock_send_request): + """Test creating a default data set sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) + + Files(self.test_profile).create_default_data_set("dataset_name", "partitioned") + mock_send_request.assert_called_once() + + def test_create_default_dataset_parameterized(self): + """Test create default dataset with different values""" + test_values = [ + (("DSN", "partitioned"), True), + (("DSN", "sequential"), True), + (("DSN", "classic"), True), + (("DSN", "c"), True), + (("DSN", "binary"), True), + (("DSN", "invalid"), False), + ] + + files_test_profile = Files(self.test_profile) + + for test_case in test_values: + files_test_profile.dsn.request_handler.perform_request = mock.Mock() + + options = { + "partitioned": { + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "dirblk": 5, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + }, + "sequential": { + "alcunit": "CYL", + "dsorg": "PS", + "primary": 1, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + }, + "classic": { + "alcunit": "CYL", + "dsorg": "PO", + "primary": 1, + "recfm": "FB", + "blksize": 6160, + "lrecl": 80, + "dirblk": 25, + }, + "c": { + "dsorg": "PO", + "alcunit": "CYL", + "primary": 1, + "recfm": "VB", + "blksize": 32760, + "lrecl": 260, + "dirblk": 25, + }, + "binary": { + "dsorg": "PO", + "alcunit": "CYL", + "primary": 10, + "recfm": "U", + "blksize": 27998, + "lrecl": 27998, + "dirblk": 25, + }, + } + + if test_case[1]: + files_test_profile.create_default_data_set(*test_case[0]) + custom_args = files_test_profile._create_custom_request_arguments() + custom_args["json"] = options.get(test_case[0][1]) + custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][0]) + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( + "POST", custom_args, expected_code=[201] + ) + else: + with self.assertRaises(ValueError) as e_info: + files_test_profile.create_default_data_set(*test_case[0]) + self.assertEqual(str(e_info.exception), "Invalid type for default data set.") \ No newline at end of file diff --git a/tests/unit/files/datasets/test_list.py b/tests/unit/files/datasets/test_list.py new file mode 100644 index 00000000..8fd33a07 --- /dev/null +++ b/tests/unit/files/datasets/test_list.py @@ -0,0 +1,29 @@ +"""Unit tests for the Zowe Python SDK z/OS Files package.""" +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestFilesClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_list_dsn(self, mock_send_request): + """Test list DSN sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + test_values = [("MY.DSN", False), ("MY.DSN", True)] + for test_case in test_values: + Files(self.test_profile).list_dsn(*test_case) + mock_send_request.assert_called() \ No newline at end of file diff --git a/tests/unit/files/datasets/test_migrate.py b/tests/unit/files/datasets/test_migrate.py new file mode 100644 index 00000000..be8a3597 --- /dev/null +++ b/tests/unit/files/datasets/test_migrate.py @@ -0,0 +1,127 @@ +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestCreateClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_recall_migrated_dataset(self, mock_send_request): + """Test recalling migrated data set sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).recall_migrated_dataset("dataset_name") + mock_send_request.assert_called_once() + + def test_recall_migrated_dataset_parameterized(self): + """Testing recall migrated_dataset with different values""" + + test_values = [ + ("MY.OLD.DSN", False), + ("MY.OLD.DSN", True), + ("MY.NEW.DSN", False), + ("MY.NEW.DSN", True), + ] + + files_test_profile = Files(self.test_profile) + + for test_case in test_values: + files_test_profile.dsn.request_handler.perform_request = mock.Mock() + + data = {"request": "hrecall", "wait": test_case[1]} + + files_test_profile.recall_migrated_dataset(test_case[0], test_case[1]) + custom_args = files_test_profile._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( + "PUT", custom_args, expected_code=[200] + ) + + @mock.patch("requests.Session.send") + def test_delete_migrated_data_set(self, mock_send_request): + """Test deleting a migrated data set sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).delete_migrated_data_set("dataset_name") + mock_send_request.assert_called_once() + + def test_delete_migrated_data_set_parameterized(self): + """Test deleting a migrated data set with different values""" + + test_values = [ + ("MY.OLD.DSN", False, False), + ("MY.OLD.DSN", False, True), + ("MY.OLD.DSN", True, True), + ("MY.NEW.DSN", True, True), + ("MY.NEW.DSN", False, True), + ("MY.NEW.DSN", False, False), + ] + + files_test_profile = Files(self.test_profile) + + for test_case in test_values: + files_test_profile.dsn.request_handler.perform_request = mock.Mock() + + data = { + "request": "hdelete", + "purge": test_case[1], + "wait": test_case[2], + } + + files_test_profile.delete_migrated_data_set(test_case[0], test_case[1], test_case[2]) + custom_args = files_test_profile._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( + "PUT", custom_args, expected_code=[200] + ) + + @mock.patch("requests.Session.send") + def test_migrate_data_set(self, mock_send_request): + """Test migrating a data set sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).migrate_data_set("dataset_name") + mock_send_request.assert_called_once() + + def test_migrate_data_set_parameterized(self): + """Test migrating a data set with different values""" + + test_values = [ + ("MY.OLD.DSN", False), + ("MY.OLD.DSN", True), + ("MY.NEW.DSN", True), + ("MY.NEW.DSN", False), + ] + + files_test_profile = Files(self.test_profile) + + for test_case in test_values: + files_test_profile.dsn.request_handler.perform_request = mock.Mock() + + data = { + "request": "hmigrate", + "wait": test_case[1], + } + + files_test_profile.migrate_data_set(test_case[0], test_case[1]) + + custom_args = files_test_profile._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( + "PUT", custom_args, expected_code=[200] + ) \ No newline at end of file diff --git a/tests/unit/files/datasets/test_rename.py b/tests/unit/files/datasets/test_rename.py new file mode 100644 index 00000000..30b3b727 --- /dev/null +++ b/tests/unit/files/datasets/test_rename.py @@ -0,0 +1,111 @@ +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestCreateClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_rename_dataset(self, mock_send_request): + """Test renaming dataset sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).rename_dataset("MY.OLD.DSN", "MY.NEW.DSN") + mock_send_request.assert_called_once() + + def test_rename_dataset_parametrized(self): + """Test renaming a dataset with different values""" + test_values = [ + (("DSN.OLD", "DSN.NEW"), True), + (("DS.NAME.CURRENT", "DS.NAME.NEW"), True), + (("MY.OLD.DSN", "MY.NEW.DSN"), True), + ] + + files_test_profile = Files(self.test_profile) + + for test_case in test_values: + files_test_profile.dsn.request_handler.perform_request = mock.Mock() + + data = { + "request": "rename", + "from-dataset": { + "dsn": test_case[0][0].strip(), + }, + } + + files_test_profile.rename_dataset(test_case[0][0], test_case[0][1]) + + custom_args = files_test_profile._create_custom_request_arguments() + custom_args["json"] = data + custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][1]) + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( + "PUT", custom_args, expected_code=[200] + ) + + @mock.patch("requests.Session.send") + def test_rename_dataset_member(self, mock_send_request): + """Test renaming dataset member sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).rename_dataset_member("MY.DS.NAME", "MEMBEROLD", "MEMBERNEW") + mock_send_request.assert_called_once() + + def test_rename_dataset_member_raises_exception(self): + """Test renaming a dataset member raises error when assigning invalid values to enq parameter""" + with self.assertRaises(ValueError) as e_info: + Files(self.test_profile).rename_dataset_member("MY.DS.NAME", "MEMBER1", "MEMBER1N", "RANDOM") + self.assertEqual(str(e_info.exception), "Invalid value for enq.") + + def test_rename_dataset_member_parametrized(self): + """Test renaming a dataset member with different values""" + test_values = [ + (("DSN", "MBROLD$", "MBRNEW$", "EXCLU"), True), + (("DSN", "MBROLD#", "MBRNE#", "SHRW"), True), + (("DSN", "MBROLD", "MBRNEW", "INVALID"), False), + (("DATA.SET.@NAME", "MEMBEROLD", "MEMBERNEW"), True), + (("DS.NAME", "MONAME", "MNNAME"), True), + ] + + files_test_profile = Files(self.test_profile) + + for test_case in test_values: + files_test_profile.dsn.request_handler.perform_request = mock.Mock() + + data = { + "request": "rename", + "from-dataset": { + "dsn": test_case[0][0].strip(), + "member": test_case[0][1].strip(), + }, + } + + if len(test_case[0]) > 3: + data["enq"] = test_case[0][3].strip() + if test_case[1]: + files_test_profile.rename_dataset_member(*test_case[0]) + custom_args = files_test_profile._create_custom_request_arguments() + custom_args["json"] = data + ds_path = "{}({})".format(test_case[0][0], test_case[0][2]) + ds_path_adjusted = files_test_profile._encode_uri_component(ds_path) + self.assertNotRegex(ds_path_adjusted, r"[\$\@\#]") + self.assertRegex(ds_path_adjusted, r"[\(" + re.escape(test_case[0][2]) + r"\)]") + custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(ds_path_adjusted) + files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( + "PUT", custom_args, expected_code=[200] + ) + else: + with self.assertRaises(ValueError) as e_info: + files_test_profile.rename_dataset_member(*test_case[0]) + self.assertEqual(str(e_info.exception), "Invalid value for enq.") \ No newline at end of file diff --git a/tests/unit/files/file_systems/test_file_systems.py b/tests/unit/files/file_systems/test_file_systems.py new file mode 100644 index 00000000..9e49596b --- /dev/null +++ b/tests/unit/files/file_systems/test_file_systems.py @@ -0,0 +1,69 @@ +"""Unit tests for the Zowe Python SDK z/OS Files package.""" +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestFilesClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_create_zFS_file_system(self, mock_send_request): + """Test creating a zfs sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) + + Files(self.test_profile).create_zFS_file_system( + "file_system_name", {"perms": 100, "cylsPri": 16777213, "cylsSec": 16777215} + ) + mock_send_request.assert_called_once() + + @mock.patch("requests.Session.send") + def test_delete_zFS_file_system(self, mock_send_request): + """Test deleting a zfs sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) + + Files(self.test_profile).delete_zFS_file_system("file_system_name") + mock_send_request.assert_called_once() + + def test_invalid_permission(self): + """Test that the correct exception is raised when an invalid permission option is provided""" + with self.assertRaises(exceptions.InvalidPermsOption) as e_info: + Files(self.test_profile).create_zFS_file_system( + "file_system_name", {"perms": -1, "cylsPri": 16777213, "cylsSec": 16777215} + ) + self.assertEqual(str(e_info.exception), "Invalid zos-files create command 'perms' option: -1") + + def test_invalid_memory_allocation(self): + """Test that the correct exception is raised when an invalid memory allocation option is provided""" + with self.assertRaises(exceptions.MaxAllocationQuantityExceeded) as e_info: + Files(self.test_profile).create_zFS_file_system( + "file_system_name", {"perms": 775, "cylsPri": 1677755513, "cylsSec": 16777215} + ) + self.assertEqual(str(e_info.exception), "Maximum allocation quantity of 16777215 exceeded") + + @mock.patch("requests.Session.send") + def test_mount_zFS_file_system(self, mock_send_request): + """Test mounting a zfs sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) + + Files(self.test_profile).mount_file_system("file_system_name", "mount_point") + mock_send_request.assert_called_once() + + @mock.patch("requests.Session.send") + def test_unmount_zFS_file_system(self, mock_send_request): + """Test unmounting a zfs sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) + + Files(self.test_profile).unmount_file_system("file_system_name") + mock_send_request.assert_called_once() \ No newline at end of file diff --git a/tests/unit/files/uss/test_uss.py b/tests/unit/files/uss/test_uss.py new file mode 100644 index 00000000..39a2a4e4 --- /dev/null +++ b/tests/unit/files/uss/test_uss.py @@ -0,0 +1,27 @@ +"""Unit tests for the Zowe Python SDK z/OS Files package.""" +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestFilesClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_delete_uss(self, mock_send_request): + """Test deleting a directory recursively sends a request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) + + Files(self.test_profile).delete_uss("filepath_name", recursive=True) + mock_send_request.assert_called_once() \ No newline at end of file diff --git a/tests/unit/test_zowe_core.py b/tests/unit/test_zowe_core.py index 4e216fec..d04d4291 100644 --- a/tests/unit/test_zowe_core.py +++ b/tests/unit/test_zowe_core.py @@ -1022,7 +1022,7 @@ class TestValidateConfigJsonClass(TestCase): def setUp(self): self.setUpPyfakefs() - + self.original_file_path = os.path.join(FIXTURES_PATH, "zowe.config.json") self.original_schema_file_path = os.path.join(FIXTURES_PATH, "zowe.schema.json") self.fs.add_real_file(self.original_file_path) From 6a709b81eba34954223375086f036173a6bd7522 Mon Sep 17 00:00:00 2001 From: pem70 Date: Fri, 31 May 2024 09:21:26 -0400 Subject: [PATCH 4/6] Refactor files tests and core tests Signed-off-by: pem70 --- tests/unit/core/test_logger.py | 20 + tests/unit/core/test_profile.py | 20 + .../test_profile_manager.py} | 266 +------- tests/unit/core/test_sdk_api.py | 93 +++ tests/unit/test_zos_files.py | 625 ------------------ 5 files changed, 137 insertions(+), 887 deletions(-) create mode 100644 tests/unit/core/test_logger.py create mode 100644 tests/unit/core/test_profile.py rename tests/unit/{test_zowe_core.py => core/test_profile_manager.py} (76%) create mode 100644 tests/unit/core/test_sdk_api.py delete mode 100644 tests/unit/test_zos_files.py diff --git a/tests/unit/core/test_logger.py b/tests/unit/core/test_logger.py new file mode 100644 index 00000000..5c4b8dba --- /dev/null +++ b/tests/unit/core/test_logger.py @@ -0,0 +1,20 @@ +"""Unit tests for the Zowe Python SDK Core package.""" + +# Including necessary paths +import logging + +from pyfakefs.fake_filesystem_unittest import TestCase +from zowe.core_for_zowe_sdk import ( + ProfileManager, + logger +) + +class test_logger_setLoggerLevel(TestCase): + + def test_logger_setLoggerLevel(self): + """Test setLoggerLevel""" + profile = ProfileManager() + test_logger = logging.getLogger("zowe.core_for_zowe_sdk.profile_manager") + test_value = logging.DEBUG + logger.Log.setLoggerLevel(test_value) + self.assertEqual(test_logger.level, test_value) \ No newline at end of file diff --git a/tests/unit/core/test_profile.py b/tests/unit/core/test_profile.py new file mode 100644 index 00000000..32a0c414 --- /dev/null +++ b/tests/unit/core/test_profile.py @@ -0,0 +1,20 @@ +"""Unit tests for the Zowe Python SDK Core package.""" + +import os +import unittest + +from zowe.core_for_zowe_sdk import ( + ZosmfProfile +) + +class TestZosmfProfileClass(unittest.TestCase): + """ZosmfProfile class unit tests.""" + + def setUp(self): + """Setup fixtures for ZosmfProfile class.""" + self.profile_name = "MOCK" + + def test_object_should_be_instance_of_class(self): + """Created object should be instance of ZosmfProfile class.""" + zosmf_profile = ZosmfProfile(self.profile_name) + self.assertIsInstance(zosmf_profile, ZosmfProfile) \ No newline at end of file diff --git a/tests/unit/test_zowe_core.py b/tests/unit/core/test_profile_manager.py similarity index 76% rename from tests/unit/test_zowe_core.py rename to tests/unit/core/test_profile_manager.py index d04d4291..76313474 100644 --- a/tests/unit/test_zowe_core.py +++ b/tests/unit/core/test_profile_manager.py @@ -6,31 +6,21 @@ import json import os import shutil -import unittest from unittest import mock -import logging import commentjson -from jsonschema import SchemaError, ValidationError, validate +from jsonschema import SchemaError, ValidationError from pyfakefs.fake_filesystem_unittest import TestCase from zowe.core_for_zowe_sdk import ( - ApiConnection, ConfigFile, CredentialManager, ProfileManager, - RequestHandler, - SdkApi, - ZosmfProfile, constants, custom_warnings, - exceptions, - session_constants, - logger + exceptions ) -from zowe.core_for_zowe_sdk.validators import validate_config_json -from zowe.secrets_for_zowe_sdk import keyring -FIXTURES_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures") +FIXTURES_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "fixtures") CWD = os.getcwd() CRED_DICT: dict = {} SECURE_CONFIG_PROPS: bytes @@ -40,198 +30,10 @@ def keyring_get_password(serviceName: str, username: str): global SECURE_CONFIG_PROPS return SECURE_CONFIG_PROPS - def keyring_get_password_exception(): raise Exception -class TestApiConnectionClass(unittest.TestCase): - """ApiConnection class unit tests.""" - - def setUp(self): - """Setup ApiConnection fixtures.""" - self.url = "https://mock-url.com" - self.user = "Username" - self.password = "Password" - - def test_object_should_be_instance_of_class(self): - """Created object should be instance of ApiConnection class.""" - api_connection = ApiConnection(self.url, self.user, self.password) - self.assertIsInstance(api_connection, ApiConnection) - - def test_object_should_raise_custom_error_without_url(self): - """Instantiation of ApiConnection object should raise MissingConnectionArgs if host_url is blank.""" - with self.assertRaises(exceptions.MissingConnectionArgs): - ApiConnection(host_url="", user=self.user, password=self.password) - - def test_object_should_raise_custom_error_without_user(self): - """Instantiation of ApiConnection object should raise MissingConnectionArgs if user is blank.""" - with self.assertRaises(exceptions.MissingConnectionArgs): - ApiConnection(host_url=self.url, user="", password=self.password) - - def test_object_should_raise_custom_error_without_password(self): - """Instantiation of ApiConnection object should raise MissingConnectionArgs if password is blank.""" - with self.assertRaises(exceptions.MissingConnectionArgs): - ApiConnection(host_url=self.url, user=self.user, password="") - - -class TestSdkApiClass(TestCase): - """SdkApi class unit tests.""" - - def setUp(self): - """Setup fixtures for SdkApi class.""" - common_props = {"host": "mock-url.com", "port": 443, "protocol": "https", "rejectUnauthorized": True} - self.basic_props = {**common_props, "user": "Username", "password": "Password"} - self.bearer_props = {**common_props, "tokenValue": "BearerToken"} - self.token_props = { - **common_props, - "tokenType": "MyToken", - "tokenValue": "TokenValue", - } - self.default_url = "https://default-api.com/" - - def test_object_should_be_instance_of_class(self): - """Created object should be instance of SdkApi class.""" - sdk_api = SdkApi(self.basic_props, self.default_url) - self.assertIsInstance(sdk_api, SdkApi) - - @mock.patch("logging.Logger.error") - def test_session_no_host_logger(self, mock_logger_error: mock.MagicMock): - props = {} - try: - sdk_api = SdkApi(props, self.default_url) - except Exception: - mock_logger_error.assert_called() - self.assertIn("Host", mock_logger_error.call_args[0][0]) - - @mock.patch("logging.Logger.error") - def test_session_no_authentication_logger(self, mock_logger_error: mock.MagicMock): - props = {"host": "test"} - try: - sdk_api = SdkApi(props, self.default_url) - except Exception: - mock_logger_error.assert_called() - self.assertIn("Authentication", mock_logger_error.call_args[0][0]) - - def test_should_handle_basic_auth(self): - """Created object should handle basic authentication.""" - sdk_api = SdkApi(self.basic_props, self.default_url) - self.assertEqual(sdk_api.session.type, session_constants.AUTH_TYPE_BASIC) - self.assertEqual( - sdk_api.request_arguments["auth"], - (self.basic_props["user"], self.basic_props["password"]), - ) - - def test_should_handle_bearer_auth(self): - """Created object should handle bearer authentication.""" - sdk_api = SdkApi(self.bearer_props, self.default_url) - self.assertEqual(sdk_api.session.type, session_constants.AUTH_TYPE_BEARER) - self.assertEqual( - sdk_api.default_headers["Authorization"], - "Bearer " + self.bearer_props["tokenValue"], - ) - - def test_should_handle_token_auth(self): - """Created object should handle token authentication.""" - sdk_api = SdkApi(self.token_props, self.default_url) - self.assertEqual(sdk_api.session.type, session_constants.AUTH_TYPE_TOKEN) - self.assertEqual( - sdk_api.default_headers["Cookie"], - self.token_props["tokenType"] + "=" + self.token_props["tokenValue"], - ) - - def test_encode_uri_component(self): - """Test string is being adjusted to the correct URL parameter""" - - sdk_api = SdkApi(self.basic_props, self.default_url) - - actual_not_empty = sdk_api._encode_uri_component("MY.STRING@.TEST#.$HERE(MBR#NAME)") - expected_not_empty = "MY.STRING%40.TEST%23.%24HERE(MBR%23NAME)" - self.assertEqual(actual_not_empty, expected_not_empty) - - actual_wildcard = sdk_api._encode_uri_component("GET.#DS.*") - expected_wildcard = "GET.%23DS.*" - self.assertEqual(actual_wildcard, expected_wildcard) - - actual_none = sdk_api._encode_uri_component(None) - expected_none = None - self.assertEqual(actual_none, expected_none) - - -class TestRequestHandlerClass(unittest.TestCase): - """RequestHandler class unit tests.""" - - def setUp(self): - """Setup fixtures for RequestHandler class.""" - self.session_arguments = {"verify": False} - - def test_object_should_be_instance_of_class(self): - """Created object should be instance of RequestHandler class.""" - request_handler = RequestHandler(self.session_arguments) - self.assertIsInstance(request_handler, RequestHandler) - - @mock.patch("logging.Logger.debug") - @mock.patch("logging.Logger.error") - @mock.patch("requests.Session.send") - def test_perform_streamed_request(self, mock_send_request, mock_logger_error: mock.MagicMock, mock_logger_debug: mock.MagicMock): - """Performing a streamed request should call 'send_request' method""" - mock_send_request.return_value = mock.Mock(status_code=200) - request_handler = RequestHandler(self.session_arguments) - request_handler.perform_request("GET", {"url": "https://www.zowe.org"}, stream = True) - - mock_logger_error.assert_not_called() - mock_logger_debug.assert_called() - self.assertIn("Request method: GET", mock_logger_debug.call_args[0][0]) - mock_send_request.assert_called_once() - self.assertTrue(mock_send_request.call_args[1]["stream"]) - - - @mock.patch("logging.Logger.error") - def test_logger_unmatched_status_code(self, mock_logger_error: mock.MagicMock): - """Test logger with unexpeceted status code""" - request_handler = RequestHandler(self.session_arguments) - try: - request_handler.perform_request("GET", {"url": "https://www.zowe.org"}, expected_code= [0], stream = True) - except exceptions.UnexpectedStatus: - mock_logger_error.assert_called_once() - self.assertIn("The status code", mock_logger_error.call_args[0][0]) - - @mock.patch("logging.Logger.error") - def test_logger_perform_request_invalid_method(self, mock_logger_error: mock.MagicMock): - """Test logger with invalid request method""" - request_handler = RequestHandler(self.session_arguments) - try: - request_handler.perform_request("Invalid method", {"url": "https://www.zowe.org"}, stream = True) - except exceptions.InvalidRequestMethod: - mock_logger_error.assert_called_once() - self.assertIn("Invalid HTTP method input", mock_logger_error.call_args[0][0]) - - @mock.patch("logging.Logger.error") - @mock.patch("requests.Session.send") - def test_logger_invalid_status_code(self, mock_send_request, mock_logger_error: mock.MagicMock): - mock_send_request.return_value = mock.Mock(ok=False) - request_handler = RequestHandler(self.session_arguments) - try: - request_handler.perform_request("GET", {"url": "https://www.zowe.org"}, stream = True) - except exceptions.RequestFailed: - mock_logger_error.assert_called_once() - self.assertIn("HTTP Request has failed", mock_logger_error.call_args[0][0]) - mock_logger_error.assert_called_once - - -class TestZosmfProfileClass(unittest.TestCase): - """ZosmfProfile class unit tests.""" - - def setUp(self): - """Setup fixtures for ZosmfProfile class.""" - self.profile_name = "MOCK" - - def test_object_should_be_instance_of_class(self): - """Created object should be instance of ZosmfProfile class.""" - zosmf_profile = ZosmfProfile(self.profile_name) - self.assertIsInstance(zosmf_profile, ZosmfProfile) - - class TestZosmfProfileManager(TestCase): """ProfileManager class unit tests.""" @@ -1014,64 +816,4 @@ def test_config_file_save(self, mock_save_secure_props): self.assertNotIn("user", config_file.jsonc["profiles"]["lpar1"]["properties"]) self.assertEqual( ["port"], list(config_file.jsonc["profiles"]["lpar1"]["profiles"]["zosmf"]["properties"].keys()) - ) - - -class TestValidateConfigJsonClass(TestCase): - """Testing the validate_config_json function""" - - def setUp(self): - self.setUpPyfakefs() - - self.original_file_path = os.path.join(FIXTURES_PATH, "zowe.config.json") - self.original_schema_file_path = os.path.join(FIXTURES_PATH, "zowe.schema.json") - self.fs.add_real_file(self.original_file_path) - self.fs.add_real_file(self.original_schema_file_path) - - def test_validate_config_json_valid(self): - """Test validate_config_json with valid config.json matching schema.json""" - config_json = commentjson.load(open(self.original_file_path)) - schema_json = commentjson.load(open(self.original_schema_file_path)) - - expected = validate(config_json, schema_json) - result = validate_config_json(self.original_file_path, self.original_schema_file_path, cwd=FIXTURES_PATH) - - self.assertEqual(result, expected) - - def test_validate_config_json_invalid(self): - """Test validate_config_json with invalid config.json that does not match schema.json""" - custom_dir = os.path.dirname(FIXTURES_PATH) - path_to_invalid_config = os.path.join(custom_dir, "invalid.zowe.config.json") - path_to_invalid_schema = os.path.join(custom_dir, "invalid.zowe.schema.json") - - with open(self.original_file_path, "r") as f: - original_config = commentjson.load(f) - original_config["$schema"] = "invalid.zowe.schema.json" - original_config["profiles"]["zosmf"]["properties"]["port"] = "10443" - with open(path_to_invalid_config, "w") as f: - commentjson.dump(original_config, f) - with open(self.original_schema_file_path, "r") as f: - original_schema = commentjson.load(f) - with open(path_to_invalid_schema, "w") as f: - commentjson.dump(original_schema, f) - invalid_config_json = commentjson.load(open(path_to_invalid_config)) - invalid_schema_json = commentjson.load(open(path_to_invalid_schema)) - - with self.assertRaises(ValidationError) as expected_info: - validate(invalid_config_json, invalid_schema_json) - - with self.assertRaises(ValidationError) as actual_info: - validate_config_json(path_to_invalid_config, path_to_invalid_schema, cwd=FIXTURES_PATH) - - self.assertEqual(str(actual_info.exception), str(expected_info.exception)) - - -class test_logger_setLoggerLevel(TestCase): - - def test_logger_setLoggerLevel(self): - """Test setLoggerLevel""" - profile = ProfileManager() - test_logger = logging.getLogger("zowe.core_for_zowe_sdk.profile_manager") - test_value = logging.DEBUG - logger.Log.setLoggerLevel(test_value) - self.assertEqual(test_logger.level, test_value) \ No newline at end of file + ) \ No newline at end of file diff --git a/tests/unit/core/test_sdk_api.py b/tests/unit/core/test_sdk_api.py new file mode 100644 index 00000000..eda522ef --- /dev/null +++ b/tests/unit/core/test_sdk_api.py @@ -0,0 +1,93 @@ +"""Unit tests for the Zowe Python SDK Core package.""" + +# Including necessary paths +import os +from unittest import mock + +from pyfakefs.fake_filesystem_unittest import TestCase +from zowe.core_for_zowe_sdk import ( + SdkApi, + session_constants +) + +class TestSdkApiClass(TestCase): + """SdkApi class unit tests.""" + + def setUp(self): + """Setup fixtures for SdkApi class.""" + common_props = {"host": "mock-url.com", "port": 443, "protocol": "https", "rejectUnauthorized": True} + self.basic_props = {**common_props, "user": "Username", "password": "Password"} + self.bearer_props = {**common_props, "tokenValue": "BearerToken"} + self.token_props = { + **common_props, + "tokenType": "MyToken", + "tokenValue": "TokenValue", + } + self.default_url = "https://default-api.com/" + + def test_object_should_be_instance_of_class(self): + """Created object should be instance of SdkApi class.""" + sdk_api = SdkApi(self.basic_props, self.default_url) + self.assertIsInstance(sdk_api, SdkApi) + + @mock.patch("logging.Logger.error") + def test_session_no_host_logger(self, mock_logger_error: mock.MagicMock): + props = {} + try: + sdk_api = SdkApi(props, self.default_url) + except Exception: + mock_logger_error.assert_called() + self.assertIn("Host", mock_logger_error.call_args[0][0]) + + @mock.patch("logging.Logger.error") + def test_session_no_authentication_logger(self, mock_logger_error: mock.MagicMock): + props = {"host": "test"} + try: + sdk_api = SdkApi(props, self.default_url) + except Exception: + mock_logger_error.assert_called() + self.assertIn("Authentication", mock_logger_error.call_args[0][0]) + + def test_should_handle_basic_auth(self): + """Created object should handle basic authentication.""" + sdk_api = SdkApi(self.basic_props, self.default_url) + self.assertEqual(sdk_api.session.type, session_constants.AUTH_TYPE_BASIC) + self.assertEqual( + sdk_api.request_arguments["auth"], + (self.basic_props["user"], self.basic_props["password"]), + ) + + def test_should_handle_bearer_auth(self): + """Created object should handle bearer authentication.""" + sdk_api = SdkApi(self.bearer_props, self.default_url) + self.assertEqual(sdk_api.session.type, session_constants.AUTH_TYPE_BEARER) + self.assertEqual( + sdk_api.default_headers["Authorization"], + "Bearer " + self.bearer_props["tokenValue"], + ) + + def test_should_handle_token_auth(self): + """Created object should handle token authentication.""" + sdk_api = SdkApi(self.token_props, self.default_url) + self.assertEqual(sdk_api.session.type, session_constants.AUTH_TYPE_TOKEN) + self.assertEqual( + sdk_api.default_headers["Cookie"], + self.token_props["tokenType"] + "=" + self.token_props["tokenValue"], + ) + + def test_encode_uri_component(self): + """Test string is being adjusted to the correct URL parameter""" + + sdk_api = SdkApi(self.basic_props, self.default_url) + + actual_not_empty = sdk_api._encode_uri_component("MY.STRING@.TEST#.$HERE(MBR#NAME)") + expected_not_empty = "MY.STRING%40.TEST%23.%24HERE(MBR%23NAME)" + self.assertEqual(actual_not_empty, expected_not_empty) + + actual_wildcard = sdk_api._encode_uri_component("GET.#DS.*") + expected_wildcard = "GET.%23DS.*" + self.assertEqual(actual_wildcard, expected_wildcard) + + actual_none = sdk_api._encode_uri_component(None) + expected_none = None + self.assertEqual(actual_none, expected_none) \ No newline at end of file diff --git a/tests/unit/test_zos_files.py b/tests/unit/test_zos_files.py deleted file mode 100644 index eda0991b..00000000 --- a/tests/unit/test_zos_files.py +++ /dev/null @@ -1,625 +0,0 @@ -"""Unit tests for the Zowe Python SDK z/OS Files package.""" -import re -from unittest import TestCase, mock - -from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets - - -class TestFilesClass(TestCase): - """File class unit tests.""" - - def setUp(self): - """Setup fixtures for File class.""" - self.test_profile = { - "host": "mock-url.com", - "user": "Username", - "password": "Password", - "port": 443, - "rejectUnauthorized": True, - } - - def test_object_should_be_instance_of_class(self): - """Created object should be instance of Files class.""" - files = Files(self.test_profile) - self.assertIsInstance(files, Files) - - @mock.patch("requests.Session.send") - def test_delete_uss(self, mock_send_request): - """Test deleting a directory recursively sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) - - Files(self.test_profile).delete_uss("filepath_name", recursive=True) - mock_send_request.assert_called_once() - - @mock.patch("requests.Session.send") - def test_create_zFS_file_system(self, mock_send_request): - """Test creating a zfs sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) - - Files(self.test_profile).create_zFS_file_system( - "file_system_name", {"perms": 100, "cylsPri": 16777213, "cylsSec": 16777215} - ) - mock_send_request.assert_called_once() - - @mock.patch("requests.Session.send") - def test_delete_zFS_file_system(self, mock_send_request): - """Test deleting a zfs sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) - - Files(self.test_profile).delete_zFS_file_system("file_system_name") - mock_send_request.assert_called_once() - - def test_invalid_permission(self): - """Test that the correct exception is raised when an invalid permission option is provided""" - with self.assertRaises(exceptions.InvalidPermsOption) as e_info: - Files(self.test_profile).create_zFS_file_system( - "file_system_name", {"perms": -1, "cylsPri": 16777213, "cylsSec": 16777215} - ) - self.assertEqual(str(e_info.exception), "Invalid zos-files create command 'perms' option: -1") - - def test_invalid_memory_allocation(self): - """Test that the correct exception is raised when an invalid memory allocation option is provided""" - with self.assertRaises(exceptions.MaxAllocationQuantityExceeded) as e_info: - Files(self.test_profile).create_zFS_file_system( - "file_system_name", {"perms": 775, "cylsPri": 1677755513, "cylsSec": 16777215} - ) - self.assertEqual(str(e_info.exception), "Maximum allocation quantity of 16777215 exceeded") - - @mock.patch("requests.Session.send") - def test_mount_zFS_file_system(self, mock_send_request): - """Test mounting a zfs sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) - - Files(self.test_profile).mount_file_system("file_system_name", "mount_point") - mock_send_request.assert_called_once() - - @mock.patch("requests.Session.send") - def test_unmount_zFS_file_system(self, mock_send_request): - """Test unmounting a zfs sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) - - Files(self.test_profile).unmount_file_system("file_system_name") - mock_send_request.assert_called_once() - - @mock.patch("requests.Session.send") - def test_list_dsn(self, mock_send_request): - """Test list DSN sends request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - test_values = [("MY.DSN", False), ("MY.DSN", True)] - for test_case in test_values: - Files(self.test_profile).list_dsn(*test_case) - mock_send_request.assert_called() - - @mock.patch("requests.Session.send") - def test_list_zFS_file_system(self, mock_send_request): - """Test unmounting a zfs sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - Files(self.test_profile).list_unix_file_systems("file_system_name") - mock_send_request.assert_called_once() - - @mock.patch("requests.Session.send") - def test_recall_migrated_dataset(self, mock_send_request): - """Test recalling migrated data set sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - Files(self.test_profile).recall_migrated_dataset("dataset_name") - mock_send_request.assert_called_once() - - @mock.patch("requests.Session.send") - def test_copy_uss_to_dataset(self, mock_send_request): - """Test copy_uss_to_dataset sends a request""" - - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - Files(self.test_profile).copy_uss_to_dataset("from_filename", "to_dataset_name", "to_member_name", replace=True) - - mock_send_request.assert_called_once() - - def test_copy_dataset_or_member_raises_exception(self): - """Test copying a data set or member raises error when assigning invalid values to enq parameter""" - - test_case = { - "from_dataset_name": "MY.OLD.DSN", - "to_dataset_name": "MY.NEW.DSN", - "from_member_name": "MYMEM1", - "to_member_name": "MYMEM2", - "enq": "RANDOM", - "replace": True, - } - with self.assertRaises(ValueError) as e_info: - Files(self.test_profile).copy_dataset_or_member(**test_case) - self.assertEqual(str(e_info.exception), "Invalid value for enq.") - - @mock.patch("requests.Session.send") - def test_copy_dataset_or_member(self, mock_send_request): - """Test copying a data set or member sends a request""" - - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - test_values = [ - { - "from_dataset_name": "MY.OLD.DSN", - "to_dataset_name": "MY.NEW.DSN", - "from_member_name": "MYMEM1", - "to_member_name": "MYMEM2", - "volser": "ABC", - "alias": False, - "enq": "SHRW", - "replace": False, - }, - { - "from_dataset_name": "MY.OLD.DSN", - "to_dataset_name": "MY.NEW.DSN", - "from_member_name": "MYMEM1", - "to_member_name": "MYMEM2", - "volser": "ABC", - "alias": True, - "enq": "SHRW", - "replace": True, - }, - ] - for test_case in test_values: - Files(self.test_profile).copy_dataset_or_member(**test_case) - mock_send_request.assert_called() - - def test_recall_migrated_dataset_parameterized(self): - """Testing recall migrated_dataset with different values""" - - test_values = [ - ("MY.OLD.DSN", False), - ("MY.OLD.DSN", True), - ("MY.NEW.DSN", False), - ("MY.NEW.DSN", True), - ] - - files_test_profile = Files(self.test_profile) - - for test_case in test_values: - files_test_profile.dsn.request_handler.perform_request = mock.Mock() - - data = {"request": "hrecall", "wait": test_case[1]} - - files_test_profile.recall_migrated_dataset(test_case[0], test_case[1]) - custom_args = files_test_profile._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) - files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( - "PUT", custom_args, expected_code=[200] - ) - - @mock.patch("requests.Session.send") - def test_delete_migrated_data_set(self, mock_send_request): - """Test deleting a migrated data set sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - Files(self.test_profile).delete_migrated_data_set("dataset_name") - mock_send_request.assert_called_once() - - def test_delete_migrated_data_set_parameterized(self): - """Test deleting a migrated data set with different values""" - - test_values = [ - ("MY.OLD.DSN", False, False), - ("MY.OLD.DSN", False, True), - ("MY.OLD.DSN", True, True), - ("MY.NEW.DSN", True, True), - ("MY.NEW.DSN", False, True), - ("MY.NEW.DSN", False, False), - ] - - files_test_profile = Files(self.test_profile) - - for test_case in test_values: - files_test_profile.dsn.request_handler.perform_request = mock.Mock() - - data = { - "request": "hdelete", - "purge": test_case[1], - "wait": test_case[2], - } - - files_test_profile.delete_migrated_data_set(test_case[0], test_case[1], test_case[2]) - custom_args = files_test_profile._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) - files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( - "PUT", custom_args, expected_code=[200] - ) - - @mock.patch("requests.Session.send") - def test_migrate_data_set(self, mock_send_request): - """Test migrating a data set sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - Files(self.test_profile).migrate_data_set("dataset_name") - mock_send_request.assert_called_once() - - def test_migrate_data_set_parameterized(self): - """Test migrating a data set with different values""" - - test_values = [ - ("MY.OLD.DSN", False), - ("MY.OLD.DSN", True), - ("MY.NEW.DSN", True), - ("MY.NEW.DSN", False), - ] - - files_test_profile = Files(self.test_profile) - - for test_case in test_values: - files_test_profile.dsn.request_handler.perform_request = mock.Mock() - - data = { - "request": "hmigrate", - "wait": test_case[1], - } - - files_test_profile.migrate_data_set(test_case[0], test_case[1]) - - custom_args = files_test_profile._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0]) - files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( - "PUT", custom_args, expected_code=[200] - ) - - @mock.patch("requests.Session.send") - def test_rename_dataset(self, mock_send_request): - """Test renaming dataset sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - Files(self.test_profile).rename_dataset("MY.OLD.DSN", "MY.NEW.DSN") - mock_send_request.assert_called_once() - - def test_rename_dataset_parametrized(self): - """Test renaming a dataset with different values""" - test_values = [ - (("DSN.OLD", "DSN.NEW"), True), - (("DS.NAME.CURRENT", "DS.NAME.NEW"), True), - (("MY.OLD.DSN", "MY.NEW.DSN"), True), - ] - - files_test_profile = Files(self.test_profile) - - for test_case in test_values: - files_test_profile.dsn.request_handler.perform_request = mock.Mock() - - data = { - "request": "rename", - "from-dataset": { - "dsn": test_case[0][0].strip(), - }, - } - - files_test_profile.rename_dataset(test_case[0][0], test_case[0][1]) - - custom_args = files_test_profile._create_custom_request_arguments() - custom_args["json"] = data - custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][1]) - files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( - "PUT", custom_args, expected_code=[200] - ) - - @mock.patch("requests.Session.send") - def test_rename_dataset_member(self, mock_send_request): - """Test renaming dataset member sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) - - Files(self.test_profile).rename_dataset_member("MY.DS.NAME", "MEMBEROLD", "MEMBERNEW") - mock_send_request.assert_called_once() - - def test_rename_dataset_member_raises_exception(self): - """Test renaming a dataset member raises error when assigning invalid values to enq parameter""" - with self.assertRaises(ValueError) as e_info: - Files(self.test_profile).rename_dataset_member("MY.DS.NAME", "MEMBER1", "MEMBER1N", "RANDOM") - self.assertEqual(str(e_info.exception), "Invalid value for enq.") - - def test_rename_dataset_member_parametrized(self): - """Test renaming a dataset member with different values""" - test_values = [ - (("DSN", "MBROLD$", "MBRNEW$", "EXCLU"), True), - (("DSN", "MBROLD#", "MBRNE#", "SHRW"), True), - (("DSN", "MBROLD", "MBRNEW", "INVALID"), False), - (("DATA.SET.@NAME", "MEMBEROLD", "MEMBERNEW"), True), - (("DS.NAME", "MONAME", "MNNAME"), True), - ] - - files_test_profile = Files(self.test_profile) - - for test_case in test_values: - files_test_profile.dsn.request_handler.perform_request = mock.Mock() - - data = { - "request": "rename", - "from-dataset": { - "dsn": test_case[0][0].strip(), - "member": test_case[0][1].strip(), - }, - } - - if len(test_case[0]) > 3: - data["enq"] = test_case[0][3].strip() - if test_case[1]: - files_test_profile.rename_dataset_member(*test_case[0]) - custom_args = files_test_profile._create_custom_request_arguments() - custom_args["json"] = data - ds_path = "{}({})".format(test_case[0][0], test_case[0][2]) - ds_path_adjusted = files_test_profile._encode_uri_component(ds_path) - self.assertNotRegex(ds_path_adjusted, r"[\$\@\#]") - self.assertRegex(ds_path_adjusted, r"[\(" + re.escape(test_case[0][2]) + r"\)]") - custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(ds_path_adjusted) - files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( - "PUT", custom_args, expected_code=[200] - ) - else: - with self.assertRaises(ValueError) as e_info: - files_test_profile.rename_dataset_member(*test_case[0]) - self.assertEqual(str(e_info.exception), "Invalid value for enq.") - - @mock.patch("requests.Session.send") - def test_create_data_set_accept_valid_recfm(self, mock_send_request): - """Test if create dataset does accept all accepted record formats""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) - for recfm in ["F", "FB", "V", "VB", "U", "FBA", "FBM", "VBA", "VBM"]: - Files(self.test_profile).create_data_set( - "DSNAME123", - options={ - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "dirblk": 5, - "recfm": recfm, - "blksize": 6160, - "lrecl": 80, - }, - ) - mock_send_request.assert_called() - - def test_create_data_set_does_not_accept_invalid_recfm(self): - """Test if create dataset raises an error for invalid record formats""" - with self.assertRaises(KeyError): - Files(self.test_profile).create_data_set( - "DSNAME123", - options={ - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "dirblk": 5, - "recfm": "XX", - "blksize": 6160, - "lrecl": 80, - }, - ) - - def test_create_data_set_raises_error_without_required_arguments(self): - """Test not providing required arguments raises an error""" - with self.assertRaises(ValueError) as e_info: - obj = Files(self.test_profile).create_data_set( - "DSNAME123", options={"alcunit": "CYL", "dsorg": "PO", "recfm": "FB", "blksize": 6160, "dirblk": 25} - ) - self.assertEqual(str(e_info.exception), "If 'like' is not specified, you must specify 'primary' or 'lrecl'.") - - def test_create_data_set_raises_error_with_invalid_arguments_parameterized(self): - """Test not providing valid arguments raises an error""" - test_values = [ - { - "alcunit": "invalid", - "dsorg": "PO", - "primary": 1, - "dirblk": 5, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - }, - { - "dsorg": "PO", - "alcunit": "CYL", - "primary": 1, - "recfm": "invalid", - "blksize": 32760, - "lrecl": 260, - "dirblk": 25, - }, - { - "alcunit": "CYL", - "dsorg": "invalid", - "primary": 1, - "dirblk": 5, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - }, - { - "dsorg": "PO", - "alcunit": "CYL", - "primary": 10, - "recfm": "U", - "blksize": 27998, - "lrecl": 27998, - "dirblk": 0, - }, - { - "alcunit": "CYL", - "dsorg": "PO", - "primary": 99777215, - "dirblk": 5, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - }, - ] - - for test_case in test_values: - with self.assertRaises((KeyError, ValueError)): - obj = Files(self.test_profile).create_data_set("MY.OLD.DSN", options=test_case) - - def test_create_dataset_parameterized(self): - """Test create dataset with different values""" - test_values = [ - ( - ( - "DSN", - { - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "dirblk": 5, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - }, - ), - True, - ), - ( - ( - "DSN", - { - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - "dirblk": 25, - }, - ), - True, - ), - ( - ( - "DSN", - { - "dsorg": "PO", - "alcunit": "CYL", - "primary": 1, - "recfm": "VB", - "blksize": 32760, - "lrecl": 260, - "dirblk": 25, - }, - ), - True, - ), - ( - ("DSN", {"alcunit": "CYL", "dsorg": "PS", "primary": 1, "recfm": "FB", "blksize": 6160, "lrecl": 80}), - True, - ), - ( - ( - "DSN", - { - "alcunit": "CYL", - "dsorg": "PS", - "recfm": "FB", - "blksize": 6160, - }, - ), - False, - ), - ] - - files_test_profile = Files(self.test_profile) - - for test_case in test_values: - files_test_profile.dsn.request_handler.perform_request = mock.Mock() - - if test_case[1]: - files_test_profile.create_data_set(*test_case[0]) - custom_args = files_test_profile._create_custom_request_arguments() - custom_args["json"] = test_case[0][1] - custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][0]) - files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( - "POST", custom_args, expected_code=[201] - ) - else: - with self.assertRaises(ValueError) as e_info: - files_test_profile.create_data_set(*test_case[0]) - self.assertEqual( - str(e_info.exception), "If 'like' is not specified, you must specify 'primary' or 'lrecl'." - ) - - @mock.patch("requests.Session.send") - def test_create_default_dataset(self, mock_send_request): - """Test creating a default data set sends a request""" - mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) - - Files(self.test_profile).create_default_data_set("dataset_name", "partitioned") - mock_send_request.assert_called_once() - - def test_create_default_dataset_parameterized(self): - """Test create default dataset with different values""" - test_values = [ - (("DSN", "partitioned"), True), - (("DSN", "sequential"), True), - (("DSN", "classic"), True), - (("DSN", "c"), True), - (("DSN", "binary"), True), - (("DSN", "invalid"), False), - ] - - files_test_profile = Files(self.test_profile) - - for test_case in test_values: - files_test_profile.dsn.request_handler.perform_request = mock.Mock() - - options = { - "partitioned": { - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "dirblk": 5, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - }, - "sequential": { - "alcunit": "CYL", - "dsorg": "PS", - "primary": 1, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - }, - "classic": { - "alcunit": "CYL", - "dsorg": "PO", - "primary": 1, - "recfm": "FB", - "blksize": 6160, - "lrecl": 80, - "dirblk": 25, - }, - "c": { - "dsorg": "PO", - "alcunit": "CYL", - "primary": 1, - "recfm": "VB", - "blksize": 32760, - "lrecl": 260, - "dirblk": 25, - }, - "binary": { - "dsorg": "PO", - "alcunit": "CYL", - "primary": 10, - "recfm": "U", - "blksize": 27998, - "lrecl": 27998, - "dirblk": 25, - }, - } - - if test_case[1]: - files_test_profile.create_default_data_set(*test_case[0]) - custom_args = files_test_profile._create_custom_request_arguments() - custom_args["json"] = options.get(test_case[0][1]) - custom_args["url"] = "https://mock-url.com:443/zosmf/restfiles/ds/{}".format(test_case[0][0]) - files_test_profile.dsn.request_handler.perform_request.assert_called_once_with( - "POST", custom_args, expected_code=[201] - ) - else: - with self.assertRaises(ValueError) as e_info: - files_test_profile.create_default_data_set(*test_case[0]) - self.assertEqual(str(e_info.exception), "Invalid type for default data set.") From a01dc05080bf1997ece9e937aa4179ee4e8c88f3 Mon Sep 17 00:00:00 2001 From: pem70 Date: Fri, 31 May 2024 12:28:58 -0400 Subject: [PATCH 5/6] Add more unit tests Signed-off-by: pem70 --- tests/unit/files/datasets/test_delete.py | 48 ++++++++++++++++++++++++ tests/unit/files/datasets/test_get.py | 39 +++++++++++++++++++ tests/unit/files/datasets/test_list.py | 27 ++++++++++++- tests/unit/files/datasets/test_write.py | 28 ++++++++++++++ tests/unit/files/uss/test_uss.py | 33 +++++++++++++++- 5 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 tests/unit/files/datasets/test_delete.py create mode 100644 tests/unit/files/datasets/test_get.py create mode 100644 tests/unit/files/datasets/test_write.py diff --git a/tests/unit/files/datasets/test_delete.py b/tests/unit/files/datasets/test_delete.py new file mode 100644 index 00000000..1422be9f --- /dev/null +++ b/tests/unit/files/datasets/test_delete.py @@ -0,0 +1,48 @@ +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestDeleteClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_delete(self, mock_send_request): + """Test list members sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) + + Files(self.test_profile).delete_data_set(dataset_name="ds_name", member_name="member_name") + mock_send_request.assert_called_once() + + @mock.patch("requests.Session.send") + def test_delete(self, mock_send_request): + """Test list members sends request""" + self.files_instance = Files(self.test_profile) + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + mock_send_request.return_value.json.return_value = {} + + test_cases = [ + ("MY.PDS", 1000, "m1"), + ("MY.C", 100, "m2"), + ("MY.D", 1000, "member"), + ("MY.E", 500, "extended") + ] + + for dataset_name, volume, member_name in test_cases: + result = self.files_instance.delete_data_set(dataset_name, volume, member_name) + self.assertEqual(result, {}) + mock_send_request.assert_called() + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "DELETE") + diff --git a/tests/unit/files/datasets/test_get.py b/tests/unit/files/datasets/test_get.py new file mode 100644 index 00000000..46438b62 --- /dev/null +++ b/tests/unit/files/datasets/test_get.py @@ -0,0 +1,39 @@ +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestGetClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_get(self, mock_send_request): + """Test list members sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).get_dsn_content(dataset_name="ds_name") + mock_send_request.assert_called_once() + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "GET") + + @mock.patch("requests.Session.send") + def test_binary_get(self, mock_send_request): + """Test list members sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).get_dsn_binary_content(dataset_name="ds_name") + mock_send_request.assert_called_once() + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "GET") + self.assertEqual(prepared_request.headers["X-IBM-Data-Type"], "binary") \ No newline at end of file diff --git a/tests/unit/files/datasets/test_list.py b/tests/unit/files/datasets/test_list.py index 8fd33a07..3e6d1349 100644 --- a/tests/unit/files/datasets/test_list.py +++ b/tests/unit/files/datasets/test_list.py @@ -26,4 +26,29 @@ def test_list_dsn(self, mock_send_request): test_values = [("MY.DSN", False), ("MY.DSN", True)] for test_case in test_values: Files(self.test_profile).list_dsn(*test_case) - mock_send_request.assert_called() \ No newline at end of file + mock_send_request.assert_called() + + @mock.patch("requests.Session.send") + def test_list_members(self, mock_send_request): + """Test list members sends request""" + self.files_instance = Files(self.test_profile) + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + mock_send_request.return_value.json.return_value = {"items": ["MEMBER1", "MEMBER2"]} + + test_cases = [ + ("MY.PDS", None, None, 1000, "member"), + ("MY.PDS", "MEM*", None, 1000, "member"), + ("MY.PDS", None, "MEMBER1", 1000, "member"), + ("MY.PDS", "MEM*", "MEMBER1", 500, "extended") + ] + + for dataset_name, member_pattern, member_start, limit, attributes in test_cases: + result = self.files_instance.list_dsn_members(dataset_name, member_pattern, member_start, limit, attributes) + self.assertEqual(result, ["MEMBER1", "MEMBER2"]) + mock_send_request.assert_called() + + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "GET") + self.assertIn(dataset_name, prepared_request.url) + self.assertEqual(prepared_request.headers["X-IBM-Max-Items"], str(limit)) + self.assertEqual(prepared_request.headers["X-IBM-Attributes"], attributes) \ No newline at end of file diff --git a/tests/unit/files/datasets/test_write.py b/tests/unit/files/datasets/test_write.py new file mode 100644 index 00000000..1f0c84b2 --- /dev/null +++ b/tests/unit/files/datasets/test_write.py @@ -0,0 +1,28 @@ +import re +from unittest import TestCase, mock + +from zowe.zos_files_for_zowe_sdk import Files, exceptions, Datasets + + +class TestWriteClass(TestCase): + """File class unit tests.""" + + def setUp(self): + """Setup fixtures for File class.""" + self.test_profile = { + "host": "mock-url.com", + "user": "Username", + "password": "Password", + "port": 443, + "rejectUnauthorized": True, + } + + @mock.patch("requests.Session.send") + def test_write(self, mock_send_request): + """Test list members sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) + + Files(self.test_profile).write_to_dsn(dataset_name="ds_name", data="test") + mock_send_request.assert_called_once() + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "PUT") \ No newline at end of file diff --git a/tests/unit/files/uss/test_uss.py b/tests/unit/files/uss/test_uss.py index 39a2a4e4..3a9ab204 100644 --- a/tests/unit/files/uss/test_uss.py +++ b/tests/unit/files/uss/test_uss.py @@ -24,4 +24,35 @@ def test_delete_uss(self, mock_send_request): mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=204) Files(self.test_profile).delete_uss("filepath_name", recursive=True) - mock_send_request.assert_called_once() \ No newline at end of file + mock_send_request.assert_called_once() + + @mock.patch("requests.Session.send") + def test_get(self, mock_send_request): + """Test list members sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).get_file_content("uss_name") + mock_send_request.assert_called_once() + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "GET") + + @mock.patch("requests.Session.send") + def test_get_streamed(self, mock_send_request): + """Test list members sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=200) + + Files(self.test_profile).get_file_content_streamed("uss_name", binary=True) + mock_send_request.assert_called_once() + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "GET") + self.assertEqual(prepared_request.headers["X-IBM-Data-Type"], "binary") + + @mock.patch("requests.Session.send") + def test_write(self, mock_send_request): + """Test list members sends request""" + mock_send_request.return_value = mock.Mock(headers={"Content-Type": "application/json"}, status_code=201) + + Files(self.test_profile).write_to_uss(filepath_name="test", data="test") + mock_send_request.assert_called_once() + prepared_request = mock_send_request.call_args[0][0] + self.assertEqual(prepared_request.method, "PUT") \ No newline at end of file From 6a13b4df7c3892e2c2e896c179f690e362462257 Mon Sep 17 00:00:00 2001 From: pem70 Date: Fri, 31 May 2024 12:46:13 -0400 Subject: [PATCH 6/6] Update CHANGELOG.md Signed-off-by: pem70 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97c16b80..3ae0fcb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ All notable changes to the Zowe Client Python SDK will be documented in this fil - Refactor Files class into proper classes [#264](https://github.com/zowe/zowe-client-python-sdk/issues/264) +- Refactor testings into proper folders and files [#265](https://github.com/zowe/zowe-client-python-sdk/issues/265) + ## `1.0.0-dev15` ### Bug Fixes