Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add the function to cleanup Redis backend. #31782

Merged
merged 7 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,7 @@ pyx_library(
"//:global_state_accessor_lib",
"//:ray_util",
"//:raylet_lib",
"//:redis_client",
"//:src/ray/ray_exported_symbols.lds",
"//:src/ray/ray_version_script.lds",
"//:stats_lib",
Expand Down
37 changes: 37 additions & 0 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,3 +577,40 @@ def use_gcs_for_bootstrap():
This function is included for the purposes of backwards compatibility.
"""
return True


def cleanup_redis_storage(
host: str, port: int, password: str, use_ssl: bool, storage_namespace: str
):
"""This function is used to cleanup the storage. Before we having
a good design for storage backend, it can be used to delete the old
data. It support redis cluster and non cluster mode.

Args:
host: The host address of the Redis.
port: The port of the Redis.
password: The password of the Redis.
use_ssl: Whether to encrypt the connection.
storage_namespace: The namespace of the storage to be deleted.
Comment on lines +589 to +594
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make these all default to the same environment variables that the GCS does?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so called ray start and this utility with the same env variables "just works"

Copy link
Contributor Author

@fishbone fishbone Jan 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's doable unless we baked everything into OS envs. I'd say maybe app layer can have a wrapper on top of this (host/port can be retrieved from RAY_REDIS_ADDRESS and storage_namespace can be retrieved from RAY_external_storage_namespace). These should be given by the user.

For password it seems not good (you might not need to use this if it's a closed network).
I'm trying to make the utils not deep coupled with the existing ray api since the current API might be updated in the future (we'll make it backward compatible) and as you can see the current API is really bad, for example, redis address and redis password are passed to ray in different style.

Let me know if it's hard to do it in app layer and we'll see what we can do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the current API is definitely pretty bad lol.

Ok, I think it's fine to not pull them from the env vars.

"""

from ray._raylet import del_key_from_storage # type: ignore

if not isinstance(host, str):
raise ValueError("Host must be a string")

if not isinstance(password, str):
raise ValueError("Password must be a string")

if port < 0:
raise ValueError(f"Invalid port: {port}")

if not isinstance(use_ssl, bool):
raise TypeError("use_ssl must be a boolean")

if not isinstance(storage_namespace, str):
raise ValueError("storage namespace must be a string")

# Right now, GCS store all data into a hash set key by storage_namespace.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment added here.

# So we only need to delete the specific key to cleanup the cluster.
return del_key_from_storage(host, port, password, use_ssl, storage_namespace)
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ from typing import Any, Awaitable, TypeVar
R = TypeVar("R")


class ObjectRef(Awaitable[R]):
class ObjectRef(Awaitable[R]): # type: ignore
pass


class ObjectID(Awaitable[R]):
class ObjectID(Awaitable[R]): # type: ignore
pass
6 changes: 5 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor

from ray.includes.global_state_accessor cimport RedisDelKeySync
from ray.includes.optional cimport (
optional
)
Expand Down Expand Up @@ -2779,3 +2779,7 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
py_callback = <object>user_callback
py_callback(result)
cpython.Py_DECREF(py_callback)


def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)
63 changes: 63 additions & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ from libcpp.string cimport string as c_string
from libcpp cimport bool as c_bool
from libcpp.vector cimport vector as c_vector
from libcpp.memory cimport unique_ptr
from libc.stdint cimport int32_t as c_int32_t
from ray.includes.unique_ids cimport (
CActorID,
CJobID,
Expand Down Expand Up @@ -43,3 +44,65 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
CRayStatus GetNodeToConnectForDriver(
const c_string &node_ip_address,
c_string *node_to_connect)

cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/redis_client.h"
namespace ray {
namespace gcs {

class Cleanup {
public:
Cleanup(std::function<void()> f): f_(f) {}
~Cleanup() { f_(); }
private:
std::function<void()> f_;
};

bool RedisDelKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& key) {
RedisClientOptions options(host, port, password, false, use_ssl);
auto cli = std::make_unique<RedisClient>(options);

instrumented_io_context io_service;

auto thread = std::make_unique<std::thread>([&]() {
boost::asio::io_service::work work(io_service);
io_service.run();
});

Cleanup _([&](){
io_service.stop();
thread->join();
});

auto status = cli->Connect(io_service);
if(!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString();
return false;
}

auto context = cli->GetShardContext(key);
auto cmd = std::vector<std::string>{"DEL", key};
auto reply = context->RunArgvSync(cmd);
if(reply->ReadAsInteger() == 1) {
RAY_LOG(INFO) << "Successfully deleted " << key;
return true;
} else {
RAY_LOG(ERROR) << "Failed to delete " << key;
return false;
}
}

}
}
"""
c_bool RedisDelKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& key)
1 change: 1 addition & 0 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ from ray.includes.unique_ids cimport (

from ray.includes.global_state_accessor cimport (
CGlobalStateAccessor,
RedisDelKeySync,
)

from libcpp.string cimport string as c_string
Expand Down
39 changes: 39 additions & 0 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import grpc
import pytest
import ray
import redis
from ray._private.gcs_utils import GcsClient
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import (
Expand Down Expand Up @@ -211,6 +212,44 @@ async def check(expect_liveness):
)


@pytest.fixture(params=[True, False])
def redis_replicas(request, monkeypatch):
if request.param:
monkeypatch.setenv("TEST_EXTERNAL_REDIS_REPLICAS", "3")
yield


@pytest.mark.skipif(
not enable_external_redis(), reason="Only valid when start with an external redis"
)
def test_redis_cleanup(redis_replicas, shutdown_only):
addr = ray.init(
namespace="a", _system_config={"external_storage_namespace": "c1"}
).address_info["address"]
gcs_client = GcsClient(address=addr)
gcs_client.internal_kv_put(b"ABC", b"DEF", True, None)

ray.shutdown()
addr = ray.init(
namespace="a", _system_config={"external_storage_namespace": "c2"}
).address_info["address"]
gcs_client = GcsClient(address=addr)
gcs_client.internal_kv_put(b"ABC", b"XYZ", True, None)
ray.shutdown()
redis_addr = os.environ["RAY_REDIS_ADDRESS"]
host, port = redis_addr.split(":")
if os.environ.get("TEST_EXTERNAL_REDIS_REPLICAS", "1") != "1":
cli = redis.RedisCluster(host, int(port))
else:
cli = redis.Redis(host, int(port))

assert set(cli.keys()) == {b"c1", b"c2"}
gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c1")
assert set(cli.keys()) == {b"c2"}
gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c2")
assert len(cli.keys()) == 0


if __name__ == "__main__":
import sys

Expand Down
3 changes: 1 addition & 2 deletions src/ray/gcs/redis_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/status.h"
#include "ray/gcs/asio.h"
#include "ray/gcs/redis_context.h"
#include "ray/util/logging.h"

namespace ray {

namespace gcs {

class RedisContext;

class RedisClientOptions {
public:
RedisClientOptions(const std::string &ip,
Expand Down