Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client][placement groups] Client placement group hooks, attempt #3 #15382

Merged
merged 22 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions python/ray/_private/client_mode_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,26 @@ def client_mode_should_convert():
return client_mode_enabled and _client_hook_enabled


def client_mode_wrap(func):
"""Decorator to wrap a function call in a task.

This is useful for functions where the goal isn't to delegate
module calls to the ray client equivalent, but to instead implement
ray client features that can be executed by tasks on the server side.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a little trouble parsing this doc string. So is this meant for ray client's internal functionality that needs to run on the cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I think the doc string is supposed to distinguish this wrapper from client_mode_hook which delegates public ray APIs to Ray client's RayApiStub

I'll update the doc string.

Copy link
Contributor Author

@DmitriGekhtman DmitriGekhtman Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the new doc string is clearer.

from ray.util.client import ray

@wraps(func)
def wrapper(*args, **kwargs):
if client_mode_should_convert():
f = ray.remote(num_cpus=0)(func)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still concerned about why we need to submit these tasks to the cluster, it just seems kinda unnecessary and pretty decent chunk of overhead/complication. I understand that we can't run these functions on the client, but the client server is a driver in the cluster so it should be fine right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this PR is to make public placement group APIs available on the client.

Copy link
Contributor Author

@DmitriGekhtman DmitriGekhtman Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a user should be able to do the following locally

import ray
from ray.util.placement_group import placement_group
ray.util.connect(...)
pg = placement_group(...)
...

Currently, that would throw an error here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuisawesome I agree that this is slower than having a gRPC message for each of these functions, but I don't think that the minor performance difference is worth the additional complexity added to the client + server (namely having to create and manage ClientPlacementGroups).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuisawesome Is this ok to resolve for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this seem like an abuse of Client tasks? Is this iffiness of this that tasks should be used for heavy computations
(rather than for simple RPCs which is pretty much what's going on here) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's a reason num_cpus=1 is the default for a task, rather than num_cpus=0? What's the correct use case (if any) for a num_cpus=0 task?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks should be used for heavy computations

There is definitely more overhead with submitting a Ray task (as opposed to a RPC call). That being said I think the extra overhead isn't that big of a deal given that if people are trying to create many, many placement groups, the real bottleneck will all likelihood be in waiting for the cluster to scale to the appropriate size to support all the placement groups.

What's the correct use case (if any) for a num_cpus=0 task?

For computationally light tasks. The functions being wrapped are either getter/setter methods (tiny overhead) or are waiting for PG creation (which can be thought of as a 'node-startup'-bound operation).

ref = f.remote(*args, **kwargs)
return ray.get(ref)
return func(*args, **kwargs)

return wrapper


def client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs):
"""Runs a preregistered ray RemoteFunction through the ray client.

Expand All @@ -80,7 +100,10 @@ def client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs):
from ray.util.client import ray

key = getattr(func_cls, RAY_CLIENT_MODE_ATTR, None)
if key is None:

# Second part of "or" is needed in case func_cls is reused between Ray
# client sessions in one Python interpreter session.
if (key is None) or (not ray._converted_key_exists(key)):
key = ray._convert_function(func_cls)
setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
client_func = ray._get_converted(key)
Expand All @@ -98,7 +121,9 @@ def client_mode_convert_actor(actor_cls, in_args, in_kwargs, **kwargs):
from ray.util.client import ray

key = getattr(actor_cls, RAY_CLIENT_MODE_ATTR, None)
if key is None:
# Second part of "or" is needed in case actor_cls is reused between Ray
# client sessions in one Python interpreter session.
if (key is None) or (not ray._converted_key_exists(key)):
key = ray._convert_actor(actor_cls)
setattr(actor_cls, RAY_CLIENT_MODE_ATTR, key)
client_actor = ray._get_converted(key)
Expand Down
51 changes: 41 additions & 10 deletions python/ray/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
import _thread

import ray.util.client.server.server as ray_client_server
from ray.tests.client_test_utils import create_remote_signal_actor
from ray.util.client.common import ClientObjectRef
from ray.util.client.ray_client_helpers import connect_to_client_or_not
from ray.util.client.ray_client_helpers import ray_start_client_server
from ray._private.client_mode_hook import _explicitly_enable_client_mode
from ray._private.client_mode_hook import client_mode_should_convert
from ray._private.client_mode_hook import enable_client_mode


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
Expand Down Expand Up @@ -179,6 +182,8 @@ def test_wait(ray_start_regular_shared):
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_remote_functions(ray_start_regular_shared):
with ray_start_client_server() as ray:
SignalActor = create_remote_signal_actor(ray)
signaler = SignalActor.remote()

@ray.remote
def plus2(x):
Expand Down Expand Up @@ -220,6 +225,18 @@ def fact(x):
all_vals = ray.get(res[0])
assert all_vals == [236, 2_432_902_008_176_640_000, 120, 3628800]

# Timeout 0 on ray.wait leads to immediate return
# (not indefinite wait for first return as with timeout None):
unready_ref = signaler.wait.remote()
res = ray.wait([unready_ref], timeout=0)
# Not ready.
assert res[0] == [] and len(res[1]) == 1
ray.get(signaler.send.remote())
ready_ref = signaler.wait.remote()
# Ready.
res = ray.wait([ready_ref], timeout=10)
assert len(res[0]) == 1 and res[1] == []


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_function_calling_function(ray_start_regular_shared):
Expand Down Expand Up @@ -523,16 +540,16 @@ def test_client_gpu_ids(call_ray_stop_only):
import ray
ray.init(num_cpus=2)

_explicitly_enable_client_mode()
# No client connection.
with pytest.raises(Exception) as e:
ray.get_gpu_ids()
assert str(e.value) == "Ray Client is not connected."\
" Please connect by calling `ray.connect`."
with enable_client_mode():
# No client connection.
with pytest.raises(Exception) as e:
ray.get_gpu_ids()
assert str(e.value) == "Ray Client is not connected."\
" Please connect by calling `ray.connect`."

with ray_start_client_server():
# Now have a client connection.
assert ray.get_gpu_ids() == []
with ray_start_client_server():
# Now have a client connection.
assert ray.get_gpu_ids() == []


def test_client_serialize_addon(call_ray_stop_only):
Expand All @@ -548,5 +565,19 @@ class User(pydantic.BaseModel):
assert ray.get(ray.put(User(name="ray"))).name == "ray"


@pytest.mark.parametrize("connect_to_client", [False, True])
def test_client_context_manager(ray_start_regular_shared, connect_to_client):
import ray
with connect_to_client_or_not(connect_to_client):
if connect_to_client:
# Client mode is on.
assert client_mode_should_convert() is True
# We're connected to Ray client.
assert ray.util.client.ray.is_connected() is True
else:
assert client_mode_should_convert() is False
assert ray.util.client.ray.is_connected() is False


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
31 changes: 19 additions & 12 deletions python/ray/tests/test_client_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,26 @@ def test_validate_port():


def test_basic_preregister(init_and_serve):
"""Tests conversion of Ray actors and remote functions to client actors
and client remote functions.

Checks that the conversion works when disconnecting and reconnecting client
sessions.
"""
from ray.util.client import ray
ray.connect("localhost:50051")
val = ray.get(hello_world.remote())
print(val)
assert val >= 20
assert val <= 200
c = C.remote(3)
x = c.double.remote()
y = c.double.remote()
ray.wait([x, y])
val = ray.get(c.get.remote())
assert val == 12
ray.disconnect()
for _ in range(2):
ray.connect("localhost:50051")
val = ray.get(hello_world.remote())
print(val)
assert val >= 20
assert val <= 200
c = C.remote(3)
x = c.double.remote()
y = c.double.remote()
ray.wait([x, y])
val = ray.get(c.get.remote())
assert val == 12
ray.disconnect()


def test_num_clients(init_and_serve_lazy):
Expand Down
Loading