Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev cluster config #71

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions golem-cluster-dev.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Ray on Golem cluster name
cluster_name: golem-cluster

# The maximum number of workers the cluster will have at any given time
max_workers: 10

# The number of minutes that need to pass before an idle worker node is removed by the Autoscaler
idle_timeout_minutes: 5

# The cloud provider-specific configuration properties.
provider:
type: "external"
use_internal_ips: true
module: "ray_on_golem.provider.node_provider.GolemNodeProvider"
parameters:
# Port of golem webserver that has connection with golem network
webserver_port: 4578

enable_registry_stats: false

# Blockchain used for payments.
# Goerli means running free nodes on testnet,
# Polygon is for mainnet operations.
network: "goerli"

# Maximum amount of GML that's going to be spent (not supported yet)
budget: 1

# Params for creating golem demands (same for head and workers)
node_config:
# if not provided, image_tag will be autodetected based on currently used python and ray versions
# check available versions at https://registry.golem.network/explore/golem/ray-on-golem
image_tag: "golem/ray-on-golem:0.1.0-alpha.1"

# you can also provide the image hash directly (although it is not recommended :)
# image_hash: "c6fc9b12da19d56a998eace84b6df961852360b37479e93ef4ce3913" # approxit/ray-test:py3.10.12-ray2.3.1

capabilities: ["vpn", "inet", "manifest-support"]
min_mem_gib: 0
min_cpu_threads: 0
min_storage_gib: 0

# The files or directories to copy to the head and worker nodes
file_mounts:
# remote_path: local_path
{
"/app/ray_on_golem": "./ray_on_golem",
}

# Tells the autoscaler the allowed node types and the resources they provide
available_node_types:
ray.head.default:
# The minimum number of worker nodes of this type to launch
min_workers: 0

# The maximum number of worker nodes of this type to launch
max_workers: 0

# The node type's CPU and GPU resources
resources: {"CPU": 1}

node_config: {} # TODO: Demand description here
ray.worker.default:
min_workers: 1
max_workers: 10
resources: {"CPU": 1}
node_config: {}

# List of commands that will be run to initialize the nodes (before `setup_commands`)
#initialization_commands: [
# "pip install endplay",
#]
initialization_commands: []

# List of shell commands to run to set up nodes
setup_commands: []

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands: [
"ray start --head --node-ip-address 192.168.0.3 --include-dashboard=True --dashboard-host 0.0.0.0 --disable-usage-stats --autoscaling-config=~/ray_bootstrap_config.yaml",
]

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands: [
"ray start --address 192.168.0.3:6379",
]

# Authentication credentials that Ray will use to launch nodes
# auth:
# Custom username for ssh
# ssh_user: "root"

# If ssh_private_key will be not provided, temporary key will be created and used
# ssh_private_key: "~/.ssh/id_rsa"

# A list of paths to the files or directories to copy from the head node to the worker nodes
cluster_synced_files: []
23 changes: 9 additions & 14 deletions golem-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@ provider:
# you can also provide the image hash directly (although it is not recommended :)
# image_hash: "c6fc9b12da19d56a998eace84b6df961852360b37479e93ef4ce3913" # approxit/ray-test:py3.10.12-ray2.3.1

capabilities: ['vpn', 'inet', 'manifest-support']
capabilities: ["vpn", "inet", "manifest-support"]
min_mem_gib: 0
min_cpu_threads: 0
min_storage_gib: 0

# The files or directories to copy to the head and worker nodes
file_mounts:
{
# "/app/ray_on_golem": "/tmp/ray-on-golem",
}
# file_mounts:
# <remote_path>: <local_path>

# Tells the autoscaler the allowed node types and the resources they provide
available_node_types:
Expand All @@ -56,15 +54,12 @@ available_node_types:
# The node type's CPU and GPU resources
resources: {"CPU": 1}

node_config:
# TODO: Demand description here
kind: Node
node_config: {} # TODO: Demand description here
ray.worker.default:
min_workers: 1
max_workers: 10
resources: {"CPU": 1}
node_config:
kind: Node
node_config: {}

# List of commands that will be run to initialize the nodes (before `setup_commands`)
#initialization_commands: [
Expand Down Expand Up @@ -92,12 +87,12 @@ worker_start_ray_commands: [
]

# Authentication credentials that Ray will use to launch nodes
auth:
# TODO: try to remove explicit ssh_user field
ssh_user: 'root'
# auth:
# Custom username for ssh
# ssh_user: "root"

# If ssh_private_key will be not provided, temporary key will be created and used
# ssh_private_key: '~/.ssh/id_rsa'
# ssh_private_key: "~/.ssh/id_rsa"

# A list of paths to the files or directories to copy from the head node to the worker nodes
cluster_synced_files: []
15 changes: 12 additions & 3 deletions ray_on_golem/provider/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, provider_config: dict, cluster_name: str):
)

if not is_running_on_golem_network():
self._start_webserver()
self._start_webserver(provider_parameters["enable_registry_stats"])

self._ray_on_golem_client = RayOnGolemClient.get_instance(self._webserver_url)
self._ray_on_golem_client.get_running_or_create_cluster(
Expand All @@ -56,12 +56,15 @@ def bootstrap_config(cluster_config: Dict[str, Any]) -> Dict[str, Any]:

provider_parameters: Dict = config["provider"]["parameters"]
provider_parameters.setdefault("webserver_port", RAY_ON_GOLEM_PORT)
provider_parameters.setdefault("enable_registry_stats", True)
provider_parameters.setdefault("network", "goerli")
provider_parameters.setdefault("budget", 1)

ray_on_golem_client = RayOnGolemClient.get_instance(provider_parameters["webserver_port"])

auth: Dict = config["auth"]
auth.setdefault("ssh_user", "root")

if "ssh_private_key" not in auth:
ssh_key_path = TMP_PATH / get_default_ssh_key_name(config["cluster_name"])
auth["ssh_private_key"] = provider_parameters["ssh_private_key"] = str(ssh_key_path)
Expand Down Expand Up @@ -150,7 +153,7 @@ def external_ip(self, node_id: NodeId) -> str:
def set_node_tags(self, node_id: NodeId, tags: Dict) -> None:
self._ray_on_golem_client.set_node_tags(node_id, tags)

def _start_webserver(self) -> None:
def _start_webserver(self, registry_stats: bool) -> None:
with cli_logger.group(WEBSERVER_LOG_GROUP):
if self._is_webserver_running():
cli_logger.print("Webserver is already running")
Expand All @@ -159,7 +162,13 @@ def _start_webserver(self) -> None:
cli_logger.print("Starting webserver...")

subprocess.Popen(
[RAY_ON_GOLEM_PATH, "-p", str(self._webserver_url.port), "--self-shutdown"],
[
RAY_ON_GOLEM_PATH,
"-p",
str(self._webserver_url.port),
"--registry-stats" if registry_stats else "--no-registry-stats",
"--self-shutdown",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
Expand Down
25 changes: 20 additions & 5 deletions ray_on_golem/server/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,20 @@ def parse_sys_args() -> argparse.Namespace:
parser.add_argument(
"--self-shutdown",
action="store_true",
default=False,
help="flag to enable self-shutdown after last node termination, default: %(default)s",
)
parser.add_argument("--no-self-shutdown", action="store_false", dest="self_shutdown")
parser.add_argument(
"--registry-stats",
action="store_true",
help="flag to enable collection of Golem Registry stats about resolved images, default: %(default)s",
)
parser.add_argument(
"--no-registry-stats",
action="store_false",
dest="registry_stats",
)
parser.set_defaults(self_shutdown=False, registry_stats=True)
return parser.parse_args()


Expand All @@ -43,11 +54,12 @@ def prepare_tmp_dir():
pass


def create_application(port: int, self_shutdown: bool) -> web.Application:
def create_application(port: int, self_shutdown: bool, registry_stats: bool) -> web.Application:
app = web.Application(middlewares=[error_middleware])

app["port"] = port
app["self_shutdown"] = self_shutdown
app["registry_stats"] = registry_stats

app["yagna_service"] = YagnaService(
yagna_path=YAGNA_PATH,
Expand All @@ -56,6 +68,7 @@ def create_application(port: int, self_shutdown: bool) -> web.Application:
app["golem_service"] = GolemService(
ray_on_golem_port=RAY_ON_GOLEM_PORT,
websocat_path=WEBSOCAT_PATH,
registry_stats=app["registry_stats"],
)

app["ray_service"] = RayService(
Expand Down Expand Up @@ -116,13 +129,15 @@ def main():
args = parse_sys_args()
prepare_tmp_dir()

app = create_application(args.port, args.self_shutdown)
app = create_application(args.port, args.self_shutdown, args.registry_stats)

logger.info("Starting server...")
logger.info(
"Starting server... {}".format(", ".join(f"{k}={v}" for k, v in args.__dict__.items()))
)

web.run_app(app, port=app["port"], print=None)

logger.info("Server stopped, bye!")
logger.info("Stopping server done, bye!")


if __name__ == "__main__":
Expand Down
33 changes: 16 additions & 17 deletions ray_on_golem/server/services/golem/golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@


class GolemService:
def __init__(self, ray_on_golem_port: int, websocat_path: Path):
def __init__(self, ray_on_golem_port: int, websocat_path: Path, registry_stats: bool):
self._ray_on_golem_port = ray_on_golem_port
self._websocat_path = websocat_path
self._registry_stats = registry_stats

self._golem: Optional[GolemNode] = None
self._demand: Optional[Demand] = None
Expand All @@ -50,15 +51,9 @@ def __init__(self, ray_on_golem_port: int, websocat_path: Path):
self._yagna_appkey: Optional[str] = None
self._lock = asyncio.Lock()

@property
def golem(self):
return self._golem

@property
def payment_manager(self) -> DefaultPaymentManager:
return self._payment_manager

async def init(self, yagna_appkey: str) -> None:
logger.info("Starting GolemService...")

self._golem = GolemNode(app_key=yagna_appkey)
self._yagna_appkey = yagna_appkey
await self._golem.start()
Expand All @@ -76,22 +71,28 @@ async def on_event(event) -> None:
)
self._payment_manager = DefaultPaymentManager(self._golem, self._allocation)

logger.info("Starting GolemService done")

async def shutdown(self) -> None:
"""
Terminates all activities and ray on head node.
Additionally, closes reverse ssh connection from local to proxy.

:return:
"""
await self.payment_manager.terminate_agreements()
logger.info("Stopping GolemService...")

await self._payment_manager.terminate_agreements()

logger.info(f"Waiting for all invoices...")
await self.payment_manager.wait_for_invoices()
await self._payment_manager.wait_for_invoices()
logger.info(f"Waiting for all invoices done")

await self._golem.aclose()
self._golem = None

logger.info("Stopping GolemService done")

async def create_cluster(self, provider_config: CreateClusterRequestData):
"""
Manages creating cluster, creates payload from given data and creates demand basing on payload
Expand Down Expand Up @@ -158,12 +159,11 @@ async def _get_image_url_and_hash(self, node_config: NodeConfigData) -> Tuple[UR

return await self._get_image_url_and_hash_from_tag(image_tag)

@staticmethod
async def _get_image_url_from_hash(image_hash: str) -> URL:
async def _get_image_url_from_hash(self, image_hash: str) -> URL:
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://registry.golem.network/v1/image/info",
params={"hash": image_hash, "count": "true"},
params={"hash": image_hash, "count": str(self._registry_stats).lower()},
) as response:
response_data = await response.json()

Expand All @@ -174,12 +174,11 @@ async def _get_image_url_from_hash(image_hash: str) -> URL:
else:
raise RegistryRequestError("Can't access Golem Registry for image lookup!")

@staticmethod
async def _get_image_url_and_hash_from_tag(image_tag: str) -> Tuple[URL, str]:
async def _get_image_url_and_hash_from_tag(self, image_tag: str) -> Tuple[URL, str]:
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://registry.golem.network/v1/image/info",
params={"tag": image_tag, "count": "true"},
params={"tag": image_tag, "count": str(self._registry_stats).lower()},
) as response:
response_data = await response.json()

Expand Down
8 changes: 7 additions & 1 deletion ray_on_golem/server/services/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ def __init__(self, golem_service: GolemService, tmp_path: Path):
self._ssh_public_key_path: Optional[Path] = None

async def shutdown(self) -> None:
logger.info("Stopping RayService...")

await self._stop_head_node_to_webserver_tunel()

async with self._nodes_lock:
if not self._nodes:
logger.info(f"No need to destroy activities, as no activities are running")
logger.info(
"Stopping RayService done, no need to destroy activities, as no activities are running"
)
return

logger.info(f"Destroying {len(self._nodes)} activities...")
Expand All @@ -51,6 +55,8 @@ async def shutdown(self) -> None:

self._nodes.clear()

logger.info("Stopping RayService done")

async def create_cluster_on_golem(self, provider_config: CreateClusterRequestData) -> None:
self._ssh_private_key_path = Path(provider_config.ssh_private_key)
self._ssh_public_key_path = self._ssh_private_key_path.with_suffix(".pub")
Expand Down
Loading