Skip to content

Commit

Permalink
[core] Add the function to cleanup Redis backend. (#31782)
Browse files Browse the repository at this point in the history
This feature is a helper tool to clean up the redis storage. It's meant to be a solution to cleanup the old data in redis stored by Ray until we got the work of GCS storage backend done.

This feature support redis cluster and non cluster mode and is for redis cleanup only. The feature is built with cython, so no external libraries needed.

Since _raylet.so depends on redis_client implicitly, so size change in the ray pkg.
  • Loading branch information
fishbone authored Jan 25, 2023
1 parent 477910b commit 41e1685
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 5 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2735,6 +2735,7 @@ pyx_library(
"//:global_state_accessor_lib",
"//:ray_util",
"//:raylet_lib",
"//:redis_client",
"//:src/ray/ray_exported_symbols.lds",
"//:src/ray/ray_version_script.lds",
"//:stats_lib",
Expand Down
37 changes: 37 additions & 0 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,3 +577,40 @@ def use_gcs_for_bootstrap():
This function is included for the purposes of backwards compatibility.
"""
return True


def cleanup_redis_storage(
host: str, port: int, password: str, use_ssl: bool, storage_namespace: str
):
"""This function is used to cleanup the storage. Before we having
a good design for storage backend, it can be used to delete the old
data. It support redis cluster and non cluster mode.
Args:
host: The host address of the Redis.
port: The port of the Redis.
password: The password of the Redis.
use_ssl: Whether to encrypt the connection.
storage_namespace: The namespace of the storage to be deleted.
"""

from ray._raylet import del_key_from_storage # type: ignore

if not isinstance(host, str):
raise ValueError("Host must be a string")

if not isinstance(password, str):
raise ValueError("Password must be a string")

if port < 0:
raise ValueError(f"Invalid port: {port}")

if not isinstance(use_ssl, bool):
raise TypeError("use_ssl must be a boolean")

if not isinstance(storage_namespace, str):
raise ValueError("storage namespace must be a string")

# Right now, GCS store all data into a hash set key by storage_namespace.
# So we only need to delete the specific key to cleanup the cluster.
return del_key_from_storage(host, port, password, use_ssl, storage_namespace)
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ from typing import Any, Awaitable, TypeVar
R = TypeVar("R")


class ObjectRef(Awaitable[R]):
class ObjectRef(Awaitable[R]): # type: ignore
pass


class ObjectID(Awaitable[R]):
class ObjectID(Awaitable[R]): # type: ignore
pass
6 changes: 5 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor

from ray.includes.global_state_accessor cimport RedisDelKeySync
from ray.includes.optional cimport (
optional
)
Expand Down Expand Up @@ -2793,3 +2793,7 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
py_callback = <object>user_callback
py_callback(result)
cpython.Py_DECREF(py_callback)


def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)
63 changes: 63 additions & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ from libcpp.string cimport string as c_string
from libcpp cimport bool as c_bool
from libcpp.vector cimport vector as c_vector
from libcpp.memory cimport unique_ptr
from libc.stdint cimport int32_t as c_int32_t
from ray.includes.unique_ids cimport (
CActorID,
CJobID,
Expand Down Expand Up @@ -43,3 +44,65 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
CRayStatus GetNodeToConnectForDriver(
const c_string &node_ip_address,
c_string *node_to_connect)

cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/redis_client.h"
namespace ray {
namespace gcs {
class Cleanup {
public:
Cleanup(std::function<void()> f): f_(f) {}
~Cleanup() { f_(); }
private:
std::function<void()> f_;
};
bool RedisDelKeySync(const std::string& host,
int32_t port,
const std::string& password,
bool use_ssl,
const std::string& key) {
RedisClientOptions options(host, port, password, false, use_ssl);
auto cli = std::make_unique<RedisClient>(options);
instrumented_io_context io_service;
auto thread = std::make_unique<std::thread>([&]() {
boost::asio::io_service::work work(io_service);
io_service.run();
});
Cleanup _([&](){
io_service.stop();
thread->join();
});
auto status = cli->Connect(io_service);
if(!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString();
return false;
}
auto context = cli->GetShardContext(key);
auto cmd = std::vector<std::string>{"DEL", key};
auto reply = context->RunArgvSync(cmd);
if(reply->ReadAsInteger() == 1) {
RAY_LOG(INFO) << "Successfully deleted " << key;
return true;
} else {
RAY_LOG(ERROR) << "Failed to delete " << key;
return false;
}
}
}
}
"""
c_bool RedisDelKeySync(const c_string& host,
c_int32_t port,
const c_string& password,
c_bool use_ssl,
const c_string& key)
1 change: 1 addition & 0 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ from ray.includes.unique_ids cimport (

from ray.includes.global_state_accessor cimport (
CGlobalStateAccessor,
RedisDelKeySync,
)

from libcpp.string cimport string as c_string
Expand Down
39 changes: 39 additions & 0 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import grpc
import pytest
import ray
import redis
from ray._private.gcs_utils import GcsClient
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import (
Expand Down Expand Up @@ -211,6 +212,44 @@ async def check(expect_liveness):
)


@pytest.fixture(params=[True, False])
def redis_replicas(request, monkeypatch):
if request.param:
monkeypatch.setenv("TEST_EXTERNAL_REDIS_REPLICAS", "3")
yield


@pytest.mark.skipif(
not enable_external_redis(), reason="Only valid when start with an external redis"
)
def test_redis_cleanup(redis_replicas, shutdown_only):
addr = ray.init(
namespace="a", _system_config={"external_storage_namespace": "c1"}
).address_info["address"]
gcs_client = GcsClient(address=addr)
gcs_client.internal_kv_put(b"ABC", b"DEF", True, None)

ray.shutdown()
addr = ray.init(
namespace="a", _system_config={"external_storage_namespace": "c2"}
).address_info["address"]
gcs_client = GcsClient(address=addr)
gcs_client.internal_kv_put(b"ABC", b"XYZ", True, None)
ray.shutdown()
redis_addr = os.environ["RAY_REDIS_ADDRESS"]
host, port = redis_addr.split(":")
if os.environ.get("TEST_EXTERNAL_REDIS_REPLICAS", "1") != "1":
cli = redis.RedisCluster(host, int(port))
else:
cli = redis.Redis(host, int(port))

assert set(cli.keys()) == {b"c1", b"c2"}
gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c1")
assert set(cli.keys()) == {b"c2"}
gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c2")
assert len(cli.keys()) == 0


if __name__ == "__main__":
import sys

Expand Down
3 changes: 1 addition & 2 deletions src/ray/gcs/redis_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/status.h"
#include "ray/gcs/asio.h"
#include "ray/gcs/redis_context.h"
#include "ray/util/logging.h"

namespace ray {

namespace gcs {

class RedisContext;

class RedisClientOptions {
public:
RedisClientOptions(const std::string &ip,
Expand Down

0 comments on commit 41e1685

Please sign in to comment.