From 59dc0a96a8cb50160ee0fbc79625d2a519613ec4 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 19 Jan 2023 19:39:17 +0000 Subject: [PATCH 1/7] add the feature Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/_private/gcs_utils.py | 21 ++++++++++++++++ python/ray/tests/test_gcs_utils.py | 39 ++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 55558332578f..c7338ef3b802 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -577,3 +577,24 @@ def use_gcs_for_bootstrap(): This function is included for the purposes of backwards compatibility. """ return True + + +def cleanup_redis_storage(host, port, storage_namespace, **kwargs): + """This function is used to cleanup the storage. Before we having + a good design for storage backend, it can be used to delete the old + data. It support redis cluster and non cluster mode. + + Args: + host: The host address of the Redis. + port: The port of the Redis. + storage_namespace: The namespace of the storage to be deleted. + """ + + import redis + + try: + cli = redis.RedisCluster(host, port, **kwargs) + except redis.exceptions.RedisClusterException: + cli = redis.Redis(host, port, **kwargs) + + cli.delete(storage_namespace) diff --git a/python/ray/tests/test_gcs_utils.py b/python/ray/tests/test_gcs_utils.py index a2f9a5037b6a..a1c5274550d1 100644 --- a/python/ray/tests/test_gcs_utils.py +++ b/python/ray/tests/test_gcs_utils.py @@ -7,6 +7,7 @@ import grpc import pytest import ray +import redis from ray._private.gcs_utils import GcsClient import ray._private.gcs_utils as gcs_utils from ray._private.test_utils import ( @@ -211,6 +212,44 @@ async def check(expect_liveness): ) +@pytest.fixture(params=[True, False]) +def redis_replicas(request, monkeypatch): + if request.param: + monkeypatch.setenv("TEST_EXTERNAL_REDIS_REPLICAS", "3") + yield + + +@pytest.mark.skipif( + not enable_external_redis(), reason="Only valid when start with an external redis" +) +def test_redis_cleanup(redis_replicas, shutdown_only): + addr = ray.init( + namespace="a", _system_config={"external_storage_namespace": "c1"} + ).address_info["address"] + gcs_client = GcsClient(address=addr) + gcs_client.internal_kv_put(b"ABC", b"DEF", True, None) + + ray.shutdown() + addr = ray.init( + namespace="a", _system_config={"external_storage_namespace": "c2"} + ).address_info["address"] + gcs_client = GcsClient(address=addr) + gcs_client.internal_kv_put(b"ABC", b"XYZ", True, None) + ray.shutdown() + redis_addr = os.environ["RAY_REDIS_ADDRESS"] + host, port = redis_addr.split(":") + if os.environ.get("TEST_EXTERNAL_REDIS_REPLICAS", "1") != "1": + cli = redis.RedisCluster(host, port) + else: + cli = redis.Redis(host, port) + + assert set(cli.keys()) == {b"c1", b"c2"} + gcs_utils.cleanup_redis_storage(host, port, "c1") + assert set(cli.keys()) == {b"c2"} + gcs_utils.cleanup_redis_storage(host, port, "c2") + assert len(cli.keys()) == 0 + + if __name__ == "__main__": import sys From cef46fc343d3b6c5424c7ef798a561740348ae9c Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 24 Jan 2023 01:39:48 +0000 Subject: [PATCH 2/7] fix --- BUILD.bazel | 1 + python/ray/_private/gcs_utils.py | 26 +++++--- python/ray/_raylet.pyx | 5 +- python/ray/includes/global_state_accessor.pxd | 63 +++++++++++++++++++ python/ray/includes/global_state_accessor.pxi | 1 + python/ray/tests/test_gcs_utils.py | 8 +-- src/ray/gcs/redis_client.h | 3 +- 7 files changed, 93 insertions(+), 14 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 10d3b0e621fc..a41b6c758f72 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2722,6 +2722,7 @@ pyx_library( "//:exported_internal", "//:global_state_accessor_lib", "//:ray_util", + "//:redis_client", "//:raylet_lib", "//:src/ray/ray_exported_symbols.lds", "//:src/ray/ray_version_script.lds", diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index c7338ef3b802..6eb01054d9d5 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -579,7 +579,7 @@ def use_gcs_for_bootstrap(): return True -def cleanup_redis_storage(host, port, storage_namespace, **kwargs): +def cleanup_redis_storage(host: str, port: int, password: str, use_ssl: bool, storage_namespace: str): """This function is used to cleanup the storage. Before we having a good design for storage backend, it can be used to delete the old data. It support redis cluster and non cluster mode. @@ -587,14 +587,26 @@ def cleanup_redis_storage(host, port, storage_namespace, **kwargs): Args: host: The host address of the Redis. port: The port of the Redis. + password: The password of the Redis. + use_ssl: Whether to encrypt the connection. storage_namespace: The namespace of the storage to be deleted. """ - import redis + from ray._raylet import del_key_from_storage - try: - cli = redis.RedisCluster(host, port, **kwargs) - except redis.exceptions.RedisClusterException: - cli = redis.Redis(host, port, **kwargs) + if not isinstance(host, str): + raise ValueError("Host must be a string") + + if not isinstance(password, str): + raise ValueError("Password must be a string") + + if port < 0: + raise ValueError(f"Invalid port: {port}") + + if not isinstance(use_ssl, bool): + raise TypeError("use_ssl must be a boolean") + + if not isinstance(storage_namespace, str): + raise ValueError("storage namespace must be a string") - cli.delete(storage_namespace) + return del_key_from_storage(host, port, password, use_ssl, storage_namespace) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 55625dd18f87..4bee725244ca 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -109,7 +109,7 @@ from ray.includes.libcoreworker cimport ( from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor - +from ray.includes.global_state_accessor cimport RedisDelKeySync from ray.includes.optional cimport ( optional ) @@ -2779,3 +2779,6 @@ cdef void async_callback(shared_ptr[CRayObject] obj, py_callback = user_callback py_callback(result) cpython.Py_DECREF(py_callback) + +def del_key_from_storage(host, port, password, use_ssl, key): + return RedisDelKeySync(host, port, password, use_ssl, key) diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 8e3364592253..f05d88516589 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -2,6 +2,7 @@ from libcpp.string cimport string as c_string from libcpp cimport bool as c_bool from libcpp.vector cimport vector as c_vector from libcpp.memory cimport unique_ptr +from libc.stdint cimport int32_t as c_int32_t from ray.includes.unique_ids cimport ( CActorID, CJobID, @@ -43,3 +44,65 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil: CRayStatus GetNodeToConnectForDriver( const c_string &node_ip_address, c_string *node_to_connect) + +cdef extern from * namespace "ray::gcs" nogil: + """ + #include + #include "ray/gcs/redis_client.h" + namespace ray { + namespace gcs { + + class Cleanup { + public: + Cleanup(std::function f): f_(f) {} + ~Cleanup() { f_(); } + private: + std::function f_; + }; + + bool RedisDelKeySync(const std::string& host, + int32_t port, + const std::string& password, + bool use_ssl, + const std::string& key) { + RedisClientOptions options(host, port, password, false, use_ssl); + auto cli = std::make_unique(options); + + instrumented_io_context io_service; + + auto thread = std::make_unique([&]() { + boost::asio::io_service::work work(io_service); + io_service.run(); + }); + + Cleanup _([&](){ + io_service.stop(); + thread->join(); + }); + + auto status = cli->Connect(io_service); + if(!status.ok()) { + RAY_LOG(ERROR) << "Failed to connect to redis: " << status.ToString(); + return false; + } + + auto context = cli->GetShardContext(key); + auto cmd = std::vector{"DEL", key}; + auto reply = context->RunArgvSync(cmd); + if(reply->ReadAsInteger() == 1) { + RAY_LOG(INFO) << "Successfully deleted " << key; + return true; + } else { + RAY_LOG(ERROR) << "Failed to delete " << key; + return false; + } + } + + } + } + """ + c_bool RedisDelKeySync(const c_string& host, + c_int32_t port, + const c_string& password, + c_bool use_ssl, + const c_string& key) diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index f98a5181d60a..8492ee56a89b 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -12,6 +12,7 @@ from ray.includes.unique_ids cimport ( from ray.includes.global_state_accessor cimport ( CGlobalStateAccessor, + RedisDelKeySync, ) from libcpp.string cimport string as c_string diff --git a/python/ray/tests/test_gcs_utils.py b/python/ray/tests/test_gcs_utils.py index a1c5274550d1..cbc3b2d757fd 100644 --- a/python/ray/tests/test_gcs_utils.py +++ b/python/ray/tests/test_gcs_utils.py @@ -239,14 +239,14 @@ def test_redis_cleanup(redis_replicas, shutdown_only): redis_addr = os.environ["RAY_REDIS_ADDRESS"] host, port = redis_addr.split(":") if os.environ.get("TEST_EXTERNAL_REDIS_REPLICAS", "1") != "1": - cli = redis.RedisCluster(host, port) + cli = redis.RedisCluster(host, int(port)) else: - cli = redis.Redis(host, port) + cli = redis.Redis(host, int(port)) assert set(cli.keys()) == {b"c1", b"c2"} - gcs_utils.cleanup_redis_storage(host, port, "c1") + gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c1") assert set(cli.keys()) == {b"c2"} - gcs_utils.cleanup_redis_storage(host, port, "c2") + gcs_utils.cleanup_redis_storage(host, int(port), "", False, "c2") assert len(cli.keys()) == 0 diff --git a/src/ray/gcs/redis_client.h b/src/ray/gcs/redis_client.h index 90c6a9226735..ccf7a43b55fb 100644 --- a/src/ray/gcs/redis_client.h +++ b/src/ray/gcs/redis_client.h @@ -20,14 +20,13 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/status.h" #include "ray/gcs/asio.h" +#include "ray/gcs/redis_context.h" #include "ray/util/logging.h" namespace ray { namespace gcs { -class RedisContext; - class RedisClientOptions { public: RedisClientOptions(const std::string &ip, From 92e09f9281cec6a73a93f03df1e815940a7446c8 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 24 Jan 2023 23:42:15 +0000 Subject: [PATCH 3/7] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/_private/gcs_utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 6eb01054d9d5..9f69256fe7fd 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -579,7 +579,9 @@ def use_gcs_for_bootstrap(): return True -def cleanup_redis_storage(host: str, port: int, password: str, use_ssl: bool, storage_namespace: str): +def cleanup_redis_storage( + host: str, port: int, password: str, use_ssl: bool, storage_namespace: str +): """This function is used to cleanup the storage. Before we having a good design for storage backend, it can be used to delete the old data. It support redis cluster and non cluster mode. @@ -609,4 +611,6 @@ def cleanup_redis_storage(host: str, port: int, password: str, use_ssl: bool, st if not isinstance(storage_namespace, str): raise ValueError("storage namespace must be a string") + # Right now, GCS store all data into a hash set key by storage_namespace. + # So we only need to delete the specific key to cleanup the cluster. return del_key_from_storage(host, port, password, use_ssl, storage_namespace) From 52d2de2f097c6cec0117f0e746696b79b82ab7fb Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 24 Jan 2023 23:42:52 +0000 Subject: [PATCH 4/7] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/_raylet.pyx | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 4bee725244ca..eb405769a5ef 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2780,5 +2780,6 @@ cdef void async_callback(shared_ptr[CRayObject] obj, py_callback(result) cpython.Py_DECREF(py_callback) + def del_key_from_storage(host, port, password, use_ssl, key): return RedisDelKeySync(host, port, password, use_ssl, key) From a42f0dd0ee0638c451c37c25b6506574fa57d7cd Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 25 Jan 2023 02:05:41 +0000 Subject: [PATCH 5/7] try to fix lint Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/_private/gcs_utils.py | 2 +- python/ray/_raylet.pyi | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 9f69256fe7fd..13d092cc7df2 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -594,7 +594,7 @@ def cleanup_redis_storage( storage_namespace: The namespace of the storage to be deleted. """ - from ray._raylet import del_key_from_storage + from ray._raylet import del_key_from_storage # type: ignore if not isinstance(host, str): raise ValueError("Host must be a string") diff --git a/python/ray/_raylet.pyi b/python/ray/_raylet.pyi index 691620b27717..0c5c78362557 100644 --- a/python/ray/_raylet.pyi +++ b/python/ray/_raylet.pyi @@ -3,9 +3,9 @@ from typing import Any, Awaitable, TypeVar R = TypeVar("R") -class ObjectRef(Awaitable[R]): +class ObjectRef(Awaitable[R]): # type: ignore pass -class ObjectID(Awaitable[R]): +class ObjectID(Awaitable[R]): # type: ignore pass From 429dea5d3074994bce988f0701849ddb51b14966 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 25 Jan 2023 02:05:53 +0000 Subject: [PATCH 6/7] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/_private/gcs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_private/gcs_utils.py b/python/ray/_private/gcs_utils.py index 13d092cc7df2..71cbc14dddbd 100644 --- a/python/ray/_private/gcs_utils.py +++ b/python/ray/_private/gcs_utils.py @@ -594,7 +594,7 @@ def cleanup_redis_storage( storage_namespace: The namespace of the storage to be deleted. """ - from ray._raylet import del_key_from_storage # type: ignore + from ray._raylet import del_key_from_storage # type: ignore if not isinstance(host, str): raise ValueError("Host must be a string") From 4fcf0f73d2d5abda635be3128f420470c2dcc33a Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 25 Jan 2023 02:29:56 +0000 Subject: [PATCH 7/7] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BUILD.bazel b/BUILD.bazel index a41b6c758f72..64121a54db08 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2722,8 +2722,8 @@ pyx_library( "//:exported_internal", "//:global_state_accessor_lib", "//:ray_util", - "//:redis_client", "//:raylet_lib", + "//:redis_client", "//:src/ray/ray_exported_symbols.lds", "//:src/ray/ray_version_script.lds", "//:stats_lib",