Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neural Solution Resource Management #1060

Merged
merged 11 commits into from
Jul 7, 2023
49 changes: 46 additions & 3 deletions neural_solution/backend/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,54 @@ def free_resource(self, reserved_resource_lst):
"""
self.cursor.execute(sql, (free_resources[node_id], free_resources[node_id], node_id))
self.conn.commit()
# delete nodes with status of remove, some version without RETURNING syntax
self.cursor.execute("SELECT id FROM cluster WHERE status='remove' AND busy_sockets=0")
deleted_ids = self.cursor.fetchall()
deleted_ids = [str(id_tuple[0]) for id_tuple in deleted_ids]
self.cursor.execute("DELETE FROM cluster WHERE status='remove' AND busy_sockets=0")
self.conn.commit()

# remove deleted nodes from socket queue
socket_queue_delete_ids = [socket for socket in self.socket_queue if socket.split()[0] in deleted_ids]
if len(socket_queue_delete_ids) > 0:
logger.info(f"[Cluster] remove node-list {socket_queue_delete_ids} from socket_queue: {self.socket_queue}")
self.socket_queue = [socket for socket in self.socket_queue if socket.split()[0] not in deleted_ids]
logger.info(f"[Cluster] free resource {reserved_resource_lst}, now have free resource {self.socket_queue}")

@synchronized
def get_free_socket(self, num_sockets: int) -> List[str]:
"""Get the free sockets list."""
booked_socket_lst = []

# detect and append new resource
self.cursor.execute(f"SELECT id, name, total_sockets FROM cluster where status = 'join'")
new_node_lst = self.cursor.fetchall()
for index, name, total_sockets in new_node_lst:
sql = """
UPDATE cluster
SET status = ?
WHERE id = ?
"""
self.cursor.execute(sql, ('alive', index))
self.conn.commit()
self.socket_queue += [str(index) + " " + name] * total_sockets
logger.info(f"[Cluster] add new node-id {index} to socket_queue: {self.socket_queue}")

# do not assign nodes with status of remove
# remove to-delete nodes from socket queue
self.cursor.execute("SELECT id FROM cluster WHERE status='remove'")
deleted_ids = self.cursor.fetchall()
deleted_ids = [str(id_tuple[0]) for id_tuple in deleted_ids]

socket_queue_delete_ids = [socket for socket in self.socket_queue if socket.split()[0] in deleted_ids]
if len(socket_queue_delete_ids) > 0:
logger.info(f"[Cluster] remove node-list {socket_queue_delete_ids} from socket_queue: {self.socket_queue}")
self.socket_queue = [socket for socket in self.socket_queue if socket.split()[0] not in deleted_ids]

# delete nodes with status of remove
self.cursor.execute("DELETE FROM cluster WHERE status='remove' AND busy_sockets=0")
self.conn.commit()

if len(self.socket_queue) < num_sockets:
logger.info(f"Can not allocate {num_sockets} sockets, due to only {len(self.socket_queue)} left.")
return 0
Expand All @@ -111,6 +153,7 @@ def initial_cluster_from_node_lst(self, node_lst):
self.cursor.execute("drop table if exists cluster ")
self.cursor.execute(
r"create table cluster(id INTEGER PRIMARY KEY AUTOINCREMENT,"
+ "name varchar(100),"
+ "node_info varchar(500),"
+ "status varchar(100),"
+ "free_sockets int,"
Expand All @@ -121,9 +164,9 @@ def initial_cluster_from_node_lst(self, node_lst):
for index, node in enumerate(self.node_lst):
self.socket_queue += [str(index + 1) + " " + node.name] * node.num_sockets
self.cursor.execute(
r"insert into cluster(node_info, status, free_sockets, busy_sockets, total_sockets)"
+ "values ('{}', '{}', {}, {}, {})".format(
repr(node).replace("Node", f"Node{index+1}"), "alive", node.num_sockets, 0, node.num_sockets
r"insert into cluster(name, node_info, status, free_sockets, busy_sockets, total_sockets)"
+ "values ('{}', '{}', '{}', {}, {}, {})".format(
node.name, repr(node).replace("Node", f"Node{index+1}"), "alive", node.num_sockets, 0, node.num_sockets
)
)

Expand Down
34 changes: 34 additions & 0 deletions neural_solution/docs/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
- [Query task status](#query-task-status)
- [Stop service](#stop-service)
- [Inspect logs](#inspect-logs)
- [Start resource management](#start-resource-management)
Kaihui-intel marked this conversation as resolved.
Show resolved Hide resolved
- [Node States](#node-states)
- [Query cluster](#query-cluster)
- [Add node](#add-node)
- [Remove node](#remove-node)

## Install Neural Solution
### Prerequisites
Expand Down Expand Up @@ -126,3 +131,32 @@ There are several logs under workspace:

```

## Start resource management
Neural Solution supports cluster management for service maintainers, providing several command-line tools for efficient resource management.

### Node States

Each node in the cluster can have three different states:

- Alive: Represents a node that is functioning properly and available to handle requests.
- Join: Indicates that a node is in the process of being added to the cluster but has not fully joined yet.
- Remove: Indicates that a node is scheduled to be removed from the cluster.

Below are some commonly used commands and their usage:

### Query cluster
This command is used to query the current status of the cluster. No additional parameters are required, simply enter the following command:
```shell
neural_solution cluster --query
```
### Add node
This command is used to add nodes to the cluster. You can either specify a host file or provide a list of nodes separated by ";". For example:
```shell
neural_solution cluster --join "host1 2 20; host2 4 20"
Kaihui-intel marked this conversation as resolved.
Show resolved Hide resolved
```
### Remove node
This command is used to remove nodes from the cluster based on the IDs obtained from the query. The IDs can be passed as a parameter to the command. For example:
```shell
neural_solution cluster --rm <query_id>
Kaihui-intel marked this conversation as resolved.
Show resolved Hide resolved
```
Please note that the above commands are just examples and may require additional parameters or configurations based on your specific setup.
160 changes: 152 additions & 8 deletions neural_solution/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
"""The entry of Neural Solution."""
import argparse
import os
import psutil
import shlex
import socket
import sqlite3
import subprocess
import sys
import time
from datetime import datetime
from neural_solution.utils.utility import get_db_path
from prettytable import PrettyTable

import psutil


def check_ports(args):
Expand Down Expand Up @@ -252,13 +255,138 @@ def start_service(args):
print("Neural Solution Service Started!")
print(f'Service log saving path is in "{os.path.abspath(serve_log_dir)}"')
print(f"To submit task at: {ip_address}:{args.restful_api_port}/task/submit/")
print("[For information] neural_solution help")
print("[For information] neural_solution -h")

def query_cluster(db_path: str):
"""Query cluster information from database.

Args:
db_path (str): the database path
"""
conn = sqlite3.connect(f"{db_path}")
cursor = conn.cursor()
cursor.execute(r"select * from cluster")
conn.commit()
results = cursor.fetchall()

table = PrettyTable()
table.field_names = [i[0] for i in cursor.description]

for row in results:
table.add_row(row)

table.title = "Neural Solution Cluster Management System"
print(table)
cursor.close()
conn.close()


def create_node(line: str):
"""Parse line to create node.

Args:
line (str): node information, e.g. "localhost 2 20"

Returns:
Node: node object
"""
from neural_solution.backend.cluster import Node
hostname, num_sockets, num_cores_per_socket = line.strip().split(" ")
num_sockets, num_cores_per_socket = int(num_sockets), int(num_cores_per_socket)
node = Node(name=hostname, num_sockets=num_sockets, num_cores_per_socket=num_cores_per_socket)
return node

def join_node_to_cluster(db_path: str, args):
"""Append new node into cluster.

Args:
db_path (str): the database path
"""
is_file = os.path.isfile(args.join)
node_lst = []
if is_file:
num_threads_per_process = 5
with open(args.join, 'r') as f:
for line in f:
node_lst.append(create_node(line))
else:
for line in args.join.split(";"):
node_lst.append(create_node(line))

# Insert node into cluster table.
for count, node in enumerate(node_lst):
print(node)
conn = sqlite3.connect(f"{db_path}")
cursor = conn.cursor()
if count == 0:
cursor.execute("SELECT id FROM cluster ORDER BY id DESC LIMIT 1")
result = cursor.fetchone()
index = result[0] if result else 0

cursor.execute(r"insert into cluster(name, node_info, status, free_sockets, busy_sockets, total_sockets)" +
"values ('{}', '{}', '{}', {}, {}, {})".format(node.name,
repr(node).replace("Node", f"Node{index+1}"),
"join",
node.num_sockets,
0,
node.num_sockets))
conn.commit()
index += 1
print(f"Insert node-id: {index} successfully!")

cursor.close()
conn.close()

def remove_node_from_cluster(db_path: str, node_id: int):
"""Remove one node from cluster table. In the future, it will be deleted in the Cluster class.

Args:
db_path (str): the database path
node_id (int): the node id
"""
conn = sqlite3.connect(f"{db_path}")
cursor = conn.cursor()

cursor.execute(f"SELECT status, busy_sockets FROM cluster where id = {node_id}")
results = cursor.fetchone()

if results is None:
print(f"No node-id {node_id} in cluster table.")
return
elif results[1] == 0:
sql = f"UPDATE cluster SET status = 'remove' WHERE id = {node_id}"
cursor.execute(sql)
print(f"Remove node-id {node_id} successfully.")
else:
sql = f"UPDATE cluster SET status = 'remove' WHERE id = {node_id}"
cursor.execute(sql)
print(f"Resource occupied, will be removed after resource release")
conn.commit()

cursor.close()
conn.close()

def manage_cluster(args):
"""Neural Solution resource management. query/join/remove node.

Args:
args (argparse.Namespace): configuration
"""
db_path = get_db_path(args.workspace)
if args.query:
query_cluster(db_path)
if args.join:
join_node_to_cluster(db_path, args)
if args.rm:
remove_node_from_cluster(db_path, node_id=args.rm)


def main():
"""Implement the main function."""
parser = argparse.ArgumentParser(description="Neural Solution")
parser.add_argument("action", choices=["start", "stop"], help="start/stop service")
parser.add_argument(
'action', choices=['start', 'stop', "cluster"], help='start/stop/management service'
)
parser.add_argument(
"--hostfile", default=None, help="start backend serve host file which contains all available nodes"
)
Expand All @@ -280,12 +408,27 @@ def main():
default=2222,
help="start serve for task monitor at {task_monitor_port}, default 2222",
)
parser.add_argument("--api_type", default="all", help="start web serve with all/grpc/restful, default all")
parser.add_argument(
"--api_type", default="all", help="start web serve with all/grpc/restful, default all"
)
parser.add_argument(
"--workspace", default="./ns_workspace", help='neural solution workspace, default "./ns_workspace"'
)
parser.add_argument("--conda_env", default=None, help="specify the running environment for the task")
parser.add_argument("--upload_path", default="examples", help="specify the file path for the tasks")
parser.add_argument(
"--conda_env", default=None, help="specify the running environment for the task"
)
parser.add_argument(
"--upload_path", default="examples", help="specify the file path for the tasks"
)
parser.add_argument(
"--query", action="store_true", help="[cluster parameter] query cluster information"
)
parser.add_argument(
"--join", help="[cluster parameter] add new node into cluster"
)
parser.add_argument(
"--rm", help="[cluster parameter] remove <node-id> from cluster"
)
args = parser.parse_args()

# Check parameters ending in '_port'
Expand All @@ -295,7 +438,8 @@ def main():
start_service(args)
elif args.action == "stop":
stop_service()


elif args.action == "cluster":
manage_cluster(args)

if __name__ == "__main__":
main()