diff --git a/.azure-pipelines/scripts/codeScan/pyspelling/inc_dict.txt b/.azure-pipelines/scripts/codeScan/pyspelling/inc_dict.txt index 1ba7cd7a55e..834e40928aa 100644 --- a/.azure-pipelines/scripts/codeScan/pyspelling/inc_dict.txt +++ b/.azure-pipelines/scripts/codeScan/pyspelling/inc_dict.txt @@ -2677,3 +2677,4 @@ jJA wWLes xHKe PR +hostname diff --git a/neural_solution/backend/cluster.py b/neural_solution/backend/cluster.py index cee9a23dc9a..3c3d17e8ba2 100644 --- a/neural_solution/backend/cluster.py +++ b/neural_solution/backend/cluster.py @@ -84,12 +84,54 @@ def free_resource(self, reserved_resource_lst): """ self.cursor.execute(sql, (free_resources[node_id], free_resources[node_id], node_id)) self.conn.commit() + # delete nodes with status of remove, some version without RETURNING syntax + self.cursor.execute("SELECT id FROM cluster WHERE status='remove' AND busy_sockets=0") + deleted_ids = self.cursor.fetchall() + deleted_ids = [str(id_tuple[0]) for id_tuple in deleted_ids] + self.cursor.execute("DELETE FROM cluster WHERE status='remove' AND busy_sockets=0") + self.conn.commit() + + # remove deleted nodes from socket queue + socket_queue_delete_ids = [socket for socket in self.socket_queue if socket.split()[0] in deleted_ids] + if len(socket_queue_delete_ids) > 0: + logger.info(f"[Cluster] remove node-list {socket_queue_delete_ids} from socket_queue: {self.socket_queue}") + self.socket_queue = [socket for socket in self.socket_queue if socket.split()[0] not in deleted_ids] logger.info(f"[Cluster] free resource {reserved_resource_lst}, now have free resource {self.socket_queue}") @synchronized def get_free_socket(self, num_sockets: int) -> List[str]: """Get the free sockets list.""" booked_socket_lst = [] + + # detect and append new resource + self.cursor.execute(f"SELECT id, name, total_sockets FROM cluster where status = 'join'") + new_node_lst = self.cursor.fetchall() + for index, name, total_sockets in new_node_lst: + sql = """ + UPDATE cluster + SET status = ? + WHERE id = ? + """ + self.cursor.execute(sql, ('alive', index)) + self.conn.commit() + self.socket_queue += [str(index) + " " + name] * total_sockets + logger.info(f"[Cluster] add new node-id {index} to socket_queue: {self.socket_queue}") + + # do not assign nodes with status of remove + # remove to-delete nodes from socket queue + self.cursor.execute("SELECT id FROM cluster WHERE status='remove'") + deleted_ids = self.cursor.fetchall() + deleted_ids = [str(id_tuple[0]) for id_tuple in deleted_ids] + + socket_queue_delete_ids = [socket for socket in self.socket_queue if socket.split()[0] in deleted_ids] + if len(socket_queue_delete_ids) > 0: + logger.info(f"[Cluster] remove node-list {socket_queue_delete_ids} from socket_queue: {self.socket_queue}") + self.socket_queue = [socket for socket in self.socket_queue if socket.split()[0] not in deleted_ids] + + # delete nodes with status of remove + self.cursor.execute("DELETE FROM cluster WHERE status='remove' AND busy_sockets=0") + self.conn.commit() + if len(self.socket_queue) < num_sockets: logger.info(f"Can not allocate {num_sockets} sockets, due to only {len(self.socket_queue)} left.") return 0 @@ -111,6 +153,7 @@ def initial_cluster_from_node_lst(self, node_lst): self.cursor.execute("drop table if exists cluster ") self.cursor.execute( r"create table cluster(id INTEGER PRIMARY KEY AUTOINCREMENT," + + "name varchar(100)," + "node_info varchar(500)," + "status varchar(100)," + "free_sockets int," @@ -121,9 +164,9 @@ def initial_cluster_from_node_lst(self, node_lst): for index, node in enumerate(self.node_lst): self.socket_queue += [str(index + 1) + " " + node.name] * node.num_sockets self.cursor.execute( - r"insert into cluster(node_info, status, free_sockets, busy_sockets, total_sockets)" - + "values ('{}', '{}', {}, {}, {})".format( - repr(node).replace("Node", f"Node{index+1}"), "alive", node.num_sockets, 0, node.num_sockets + r"insert into cluster(name, node_info, status, free_sockets, busy_sockets, total_sockets)" + + "values ('{}', '{}', '{}', {}, {}, {})".format( + node.name, repr(node).replace("Node", f"Node{index+1}"), "alive", node.num_sockets, 0, node.num_sockets ) ) diff --git a/neural_solution/docs/source/README.md b/neural_solution/docs/source/README.md index 9418cb161c9..2e599268405 100644 --- a/neural_solution/docs/source/README.md +++ b/neural_solution/docs/source/README.md @@ -10,6 +10,11 @@ - [Query task status](#query-task-status) - [Stop service](#stop-service) - [Inspect logs](#inspect-logs) + - [Manage resource](#manage-resource) + - [Node States](#node-states) + - [Query cluster](#query-cluster) + - [Add node](#add-node) + - [Remove node](#remove-node) ## Install Neural Solution ### Prerequisites @@ -126,3 +131,40 @@ There are several logs under workspace: ``` +## Manage resource +Neural Solution supports cluster management for service maintainers, providing several command-line tools for efficient resource management. + +### Node States + +Each node in the cluster can have three different states: + +- Alive: Represents a node that is functioning properly and available to handle requests. +- Join: Indicates that a node is in the process of being added to the cluster but has not fully joined yet. +- Remove: Indicates that a node is scheduled to be removed from the cluster. + +Below are some commonly used commands and their usage: + +### Query cluster +This command is used to query the current status of the cluster. No additional parameters are required, simply enter the following command: +```shell +neural_solution cluster --query +``` +### Add node +This command is used to add nodes to the cluster. You can either specify a host file or provide a list of nodes separated by ";". The node format consists of three parts: hostname, number_of_sockets, and cores_per_socket. Here's a breakdown of each part: + +- hostname: This refers to the name or IP address of the node that you want to add to the cluster. It identifies the specific machine or server that will be part of the cluster. + +- number_of_sockets: This indicates the number of physical CPU sockets available on the node. A socket is a physical component that houses one or more CPU cores. It represents a physical processor unit. + +- cores_per_socket: This specifies the number of CPU cores present in each socket. A core is an individual processing unit within a CPU. + +For example: +```shell +neural_solution cluster --join "host1 2 20; host2 4 20" +``` +### Remove node +This command is used to remove nodes from the cluster based on the IDs obtained from the query. The IDs can be passed as a parameter to the command. For example: +```shell +neural_solution cluster --remove +``` +Please note that the above commands are just examples and may require additional parameters or configurations based on your specific setup. \ No newline at end of file diff --git a/neural_solution/examples/custom_models_optimized/tf_example1/README.md b/neural_solution/examples/custom_models_optimized/tf_example1/README.md index 2a56f61f93e..6d6dfce1bf7 100644 --- a/neural_solution/examples/custom_models_optimized/tf_example1/README.md +++ b/neural_solution/examples/custom_models_optimized/tf_example1/README.md @@ -8,6 +8,7 @@ In this example, we show how to quantize a [custom model](https://github.com/int - Demonstrate how to start the Neural Solution Service. - Demonstrate how to prepare an optimization task request and submit it to Neural Solution Service. - Demonstrate how to query the status of the task and fetch the optimization result. +- Demonstrate how to query and manage the resource of the cluster. ### Requirements Customizing the model requires preparing the following folders and files. @@ -48,12 +49,12 @@ neural_solution -h usage: neural_solution {start,stop} [-h] [--hostfile HOSTFILE] [--restful_api_port RESTFUL_API_PORT] [--grpc_api_port GRPC_API_PORT] [--result_monitor_port RESULT_MONITOR_PORT] [--task_monitor_port TASK_MONITOR_PORT] [--api_type API_TYPE] - [--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH] + [--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH] [--query] [--join JOIN] [--remove REMOVE] Neural Solution positional arguments: - {start,stop} start/stop service + {start,stop,cluster} start/stop/management service optional arguments: -h, --help show this help message and exit @@ -73,6 +74,9 @@ optional arguments: specify the running environment for the task --upload_path UPLOAD_PATH specify the file path for the tasks + --query [cluster parameter] query cluster information + --join JOIN [cluster parameter] add new node into cluster + --remove REMOVE [cluster parameter] remove from cluster ``` @@ -155,6 +159,19 @@ When using distributed quantization, the `workers` needs to be set to greater th # download quantized_model.zip ``` +### Manage resource +```shell +# query cluster information +neural_solution cluster --query + +# add new node into cluster +# parameter: " ; " +neural_solution cluster --join "host1 2 20; host2 5 20" + +# remove node from cluster according to id +neural_solution cluster --remove +``` + ### Stop the service ```shell neural_solution stop diff --git a/neural_solution/examples/hf_models/README.md b/neural_solution/examples/hf_models/README.md index b4e84036289..97331484e3f 100644 --- a/neural_solution/examples/hf_models/README.md +++ b/neural_solution/examples/hf_models/README.md @@ -6,6 +6,7 @@ In this example, we show how to quantize a Hugging Face model with Neural Soluti - Demonstrate how to start the Neural Solution Service. - Demonstrate how to prepare an optimization task request and submit it to Neural Solution Service. - Demonstrate how to query the status of the task and fetch the optimization result. +- Demonstrate how to query and manage the resource of the cluster. ### Start the Neural Solution Service @@ -27,14 +28,14 @@ neural_solution stop neural_solution -h # Help output -usage: neural_solution {start,stop} [-h] [--hostfile HOSTFILE] [--restful_api_port RESTFUL_API_PORT] [--grpc_api_port GRPC_API_PORT] +usage: neural_solution {start,stop,cluster} [-h] [--hostfile HOSTFILE] [--restful_api_port RESTFUL_API_PORT] [--grpc_api_port GRPC_API_PORT] [--result_monitor_port RESULT_MONITOR_PORT] [--task_monitor_port TASK_MONITOR_PORT] [--api_type API_TYPE] - [--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH] + [--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH] [--query] [--join JOIN] [--remove REMOVE] Neural Solution positional arguments: - {start,stop} start/stop service + {start,stop,cluster} start/stop/management service optional arguments: -h, --help show this help message and exit @@ -54,6 +55,9 @@ optional arguments: specify the running environment for the task --upload_path UPLOAD_PATH specify the file path for the tasks + --query [cluster parameter] query cluster information + --join JOIN [cluster parameter] add new node into cluster + --remove REMOVE [cluster parameter] remove from cluster ``` @@ -118,6 +122,19 @@ optional arguments: ``` shell [user@server tf_example1]$ curl -X GET http://localhost:8000/download/{task_id} --output quantized_model.zip # download quantized_model.zip +``` +### Manage resource +```shell +# query cluster information +neural_solution cluster --query + +# add new node into cluster +# parameter: " ; " +neural_solution cluster --join "host1 2 20; host2 5 20" + +# remove node from cluster according to id +neural_solution cluster --remove + ``` ### Stop the service ```shell diff --git a/neural_solution/launcher.py b/neural_solution/launcher.py index 3711bed982f..218c631b97d 100644 --- a/neural_solution/launcher.py +++ b/neural_solution/launcher.py @@ -15,14 +15,17 @@ """The entry of Neural Solution.""" import argparse import os +import psutil import shlex import socket +import sqlite3 import subprocess import sys import time from datetime import datetime +from neural_solution.utils.utility import get_db_path +from prettytable import PrettyTable -import psutil def check_ports(args): @@ -252,13 +255,138 @@ def start_service(args): print("Neural Solution Service Started!") print(f'Service log saving path is in "{os.path.abspath(serve_log_dir)}"') print(f"To submit task at: {ip_address}:{args.restful_api_port}/task/submit/") - print("[For information] neural_solution help") + print("[For information] neural_solution -h") + +def query_cluster(db_path: str): + """Query cluster information from database. + + Args: + db_path (str): the database path + """ + conn = sqlite3.connect(f"{db_path}") + cursor = conn.cursor() + cursor.execute(r"select * from cluster") + conn.commit() + results = cursor.fetchall() + + table = PrettyTable() + table.field_names = [i[0] for i in cursor.description] + + for row in results: + table.add_row(row) + + table.title = "Neural Solution Cluster Management System" + print(table) + cursor.close() + conn.close() + + +def create_node(line: str): + """Parse line to create node. + + Args: + line (str): node information, e.g. "localhost 2 20" + + Returns: + Node: node object + """ + from neural_solution.backend.cluster import Node + hostname, num_sockets, num_cores_per_socket = line.strip().split(" ") + num_sockets, num_cores_per_socket = int(num_sockets), int(num_cores_per_socket) + node = Node(name=hostname, num_sockets=num_sockets, num_cores_per_socket=num_cores_per_socket) + return node + +def join_node_to_cluster(db_path: str, args): + """Append new node into cluster. + + Args: + db_path (str): the database path + """ + is_file = os.path.isfile(args.join) + node_lst = [] + if is_file: + num_threads_per_process = 5 + with open(args.join, 'r') as f: + for line in f: + node_lst.append(create_node(line)) + else: + for line in args.join.split(";"): + node_lst.append(create_node(line)) + + # Insert node into cluster table. + for count, node in enumerate(node_lst): + print(node) + conn = sqlite3.connect(f"{db_path}") + cursor = conn.cursor() + if count == 0: + cursor.execute("SELECT id FROM cluster ORDER BY id DESC LIMIT 1") + result = cursor.fetchone() + index = result[0] if result else 0 + + cursor.execute(r"insert into cluster(name, node_info, status, free_sockets, busy_sockets, total_sockets)" + + "values ('{}', '{}', '{}', {}, {}, {})".format(node.name, + repr(node).replace("Node", f"Node{index+1}"), + "join", + node.num_sockets, + 0, + node.num_sockets)) + conn.commit() + index += 1 + print(f"Insert node-id: {index} successfully!") + + cursor.close() + conn.close() + +def remove_node_from_cluster(db_path: str, node_id: int): + """Remove one node from cluster table. In the future, it will be deleted in the Cluster class. + + Args: + db_path (str): the database path + node_id (int): the node id + """ + conn = sqlite3.connect(f"{db_path}") + cursor = conn.cursor() + + cursor.execute(f"SELECT status, busy_sockets FROM cluster where id = {node_id}") + results = cursor.fetchone() + + if results is None: + print(f"No node-id {node_id} in cluster table.") + return + elif results[1] == 0: + sql = f"UPDATE cluster SET status = 'remove' WHERE id = {node_id}" + cursor.execute(sql) + print(f"Remove node-id {node_id} successfully.") + else: + sql = f"UPDATE cluster SET status = 'remove' WHERE id = {node_id}" + cursor.execute(sql) + print(f"Resource occupied, will be removed after resource release") + conn.commit() + + cursor.close() + conn.close() + +def manage_cluster(args): + """Neural Solution resource management. query/join/remove node. + + Args: + args (argparse.Namespace): configuration + """ + db_path = get_db_path(args.workspace) + if args.query: + query_cluster(db_path) + if args.join: + join_node_to_cluster(db_path, args) + if args.remove: + remove_node_from_cluster(db_path, node_id=args.remove) def main(): """Implement the main function.""" parser = argparse.ArgumentParser(description="Neural Solution") - parser.add_argument("action", choices=["start", "stop"], help="start/stop service") + parser.add_argument( + 'action', choices=['start', 'stop', "cluster"], help='start/stop/management service' + ) parser.add_argument( "--hostfile", default=None, help="start backend serve host file which contains all available nodes" ) @@ -280,12 +408,27 @@ def main(): default=2222, help="start serve for task monitor at {task_monitor_port}, default 2222", ) - parser.add_argument("--api_type", default="all", help="start web serve with all/grpc/restful, default all") + parser.add_argument( + "--api_type", default="all", help="start web serve with all/grpc/restful, default all" + ) parser.add_argument( "--workspace", default="./ns_workspace", help='neural solution workspace, default "./ns_workspace"' ) - parser.add_argument("--conda_env", default=None, help="specify the running environment for the task") - parser.add_argument("--upload_path", default="examples", help="specify the file path for the tasks") + parser.add_argument( + "--conda_env", default=None, help="specify the running environment for the task" + ) + parser.add_argument( + "--upload_path", default="examples", help="specify the file path for the tasks" + ) + parser.add_argument( + "--query", action="store_true", help="[cluster parameter] query cluster information" + ) + parser.add_argument( + "--join", help="[cluster parameter] add new node into cluster" + ) + parser.add_argument( + "--remove", help="[cluster parameter] remove from cluster" + ) args = parser.parse_args() # Check parameters ending in '_port' @@ -295,7 +438,8 @@ def main(): start_service(args) elif args.action == "stop": stop_service() - - + elif args.action == "cluster": + manage_cluster(args) + if __name__ == "__main__": main()