Skip to content

Commit

Permalink
[core] Delete old internal kv gcs (#31841)
Browse files Browse the repository at this point in the history
The code has been migrated to the StoreClientKV in PR and the old one is not useful anymore. This PR delete the old code.
  • Loading branch information
fishbone authored Jan 23, 2023
1 parent 96854d5 commit 9ab6421
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 227 deletions.
177 changes: 1 addition & 176 deletions src/ray/gcs/gcs_server/gcs_kv_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,182 +21,6 @@

namespace ray {
namespace gcs {
namespace {

constexpr std::string_view kNamespacePrefix = "@namespace_";
constexpr std::string_view kNamespaceSep = ":";
constexpr std::string_view kClusterSeparator = "@";

} // namespace
std::string RedisInternalKV::MakeKey(const std::string &ns,
const std::string &key) const {
if (ns.empty()) {
return absl::StrCat(external_storage_namespace_, kClusterSeparator, key);
}
return absl::StrCat(external_storage_namespace_,
kClusterSeparator,
kNamespacePrefix,
ns,
kNamespaceSep,
key);
}

Status RedisInternalKV::ValidateKey(const std::string &key) const {
if (absl::StartsWith(key, kNamespacePrefix)) {
return Status::KeyError(absl::StrCat("Key can't start with ", kNamespacePrefix));
}
return Status::OK();
}

std::string RedisInternalKV::ExtractKey(const std::string &key) const {
auto view = std::string_view(key);
RAY_CHECK(absl::StartsWith(view, external_storage_namespace_))
<< "Invalid key: " << view << ". It should start with "
<< external_storage_namespace_;
view = view.substr(external_storage_namespace_.size() + kClusterSeparator.size());
if (absl::StartsWith(view, kNamespacePrefix)) {
std::vector<std::string> parts =
absl::StrSplit(key, absl::MaxSplits(kNamespaceSep, 1));
RAY_CHECK(parts.size() == 2) << "Invalid key: " << key;

return parts[1];
}
return std::string(view.begin(), view.end());
}

RedisInternalKV::RedisInternalKV(const RedisClientOptions &redis_options)
: redis_options_(redis_options),
external_storage_namespace_(::RayConfig::instance().external_storage_namespace()),
work_(io_service_) {
RAY_CHECK(!absl::StrContains(external_storage_namespace_, kClusterSeparator))
<< "Storage namespace (" << external_storage_namespace_ << ") shouldn't contain "
<< kClusterSeparator << ".";
io_thread_ = std::make_unique<std::thread>([this] {
SetThreadName("InternalKV");
io_service_.run();
});
redis_client_ = std::make_unique<RedisClient>(redis_options_);
RAY_CHECK_OK(redis_client_->Connect(io_service_));
}

void RedisInternalKV::Get(const std::string &ns,
const std::string &key,
std::function<void(std::optional<std::string>)> callback) {
auto true_key = MakeKey(ns, key);
std::vector<std::string> cmd = {"HGET", external_storage_namespace_, true_key};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [callback = std::move(callback)](auto redis_reply) {
if (callback) {
if (!redis_reply->IsNil()) {
callback(redis_reply->ReadAsString());
} else {
callback(std::nullopt);
}
}
}));
}

void RedisInternalKV::Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback) {
auto true_key = MakeKey(ns, key);
std::vector<std::string> cmd = {
overwrite ? "HSET" : "HSETNX", external_storage_namespace_, true_key, value};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [callback = std::move(callback)](auto redis_reply) {
if (callback) {
auto added_num = redis_reply->ReadAsInteger();
callback(added_num != 0);
}
}));
}

void RedisInternalKV::Del(const std::string &ns,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback) {
auto true_key = MakeKey(ns, key);
if (del_by_prefix) {
std::vector<std::string> cmd = {"HKEYS", external_storage_namespace_};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd,
[this, true_key = std::move(true_key), callback = std::move(callback)](
auto redis_reply) {
const auto &reply = redis_reply->ReadAsStringArray();
std::vector<std::string> del_cmd = {"HDEL", external_storage_namespace_};
size_t del_num = 0;
for (const auto &r : reply) {
RAY_CHECK(r.has_value());
if (absl::StartsWith(*r, true_key)) {
del_cmd.emplace_back(*r);
++del_num;
}
}

// If there are no keys with this prefix, we don't need to send
// another delete.
if (del_num == 0) {
if (callback) {
callback(0);
}
} else {
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
del_cmd, [callback = std::move(callback)](auto redis_reply) {
if (callback) {
callback(redis_reply->ReadAsInteger());
}
}));
}
}));
} else {
std::vector<std::string> cmd = {"HDEL", external_storage_namespace_, true_key};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [callback = std::move(callback)](auto redis_reply) {
if (callback) {
callback(redis_reply->ReadAsInteger());
}
}));
}
}

void RedisInternalKV::Exists(const std::string &ns,
const std::string &key,
std::function<void(bool)> callback) {
auto true_key = MakeKey(ns, key);
std::vector<std::string> cmd = {"HEXISTS", external_storage_namespace_, true_key};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [callback = std::move(callback)](auto redis_reply) {
if (callback) {
bool exists = redis_reply->ReadAsInteger() > 0;
callback(exists);
}
}));
}

void RedisInternalKV::Keys(const std::string &ns,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback) {
auto true_prefix = MakeKey(ns, prefix);
std::vector<std::string> cmd = {"HKEYS", external_storage_namespace_};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd,
[this, true_prefix = std::move(true_prefix), callback = std::move(callback)](
auto redis_reply) {
if (callback) {
const auto &reply = redis_reply->ReadAsStringArray();
std::vector<std::string> results;
for (const auto &r : reply) {
RAY_CHECK(r.has_value());
if (absl::StartsWith(*r, true_prefix)) {
results.emplace_back(ExtractKey(*r));
}
}
callback(std::move(results));
}
}));
}

void GcsInternalKVManager::HandleInternalKVGet(
rpc::InternalKVGetRequest request,
Expand Down Expand Up @@ -294,6 +118,7 @@ void GcsInternalKVManager::HandleInternalKVKeys(
}

Status GcsInternalKVManager::ValidateKey(const std::string &key) const {
constexpr std::string_view kNamespacePrefix = "@namespace_";
if (absl::StartsWith(key, kNamespacePrefix)) {
return Status::KeyError(absl::StrCat("Key can't start with ", kNamespacePrefix));
}
Expand Down
48 changes: 0 additions & 48 deletions src/ray/gcs/gcs_server/gcs_kv_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,54 +87,6 @@ class InternalKVInterface {
virtual ~InternalKVInterface(){};
};

class RedisInternalKV : public InternalKVInterface {
public:
explicit RedisInternalKV(const RedisClientOptions &redis_options);

~RedisInternalKV() {
io_service_.stop();
io_thread_->join();
redis_client_.reset();
io_thread_.reset();
}

void Get(const std::string &ns,
const std::string &key,
std::function<void(std::optional<std::string>)> callback) override;

void Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback) override;

void Del(const std::string &ns,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback) override;

void Exists(const std::string &ns,
const std::string &key,
std::function<void(bool)> callback) override;

void Keys(const std::string &ns,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback) override;

private:
std::string MakeKey(const std::string &ns, const std::string &key) const;
Status ValidateKey(const std::string &key) const;
std::string ExtractKey(const std::string &key) const;

RedisClientOptions redis_options_;
std::string external_storage_namespace_;
std::unique_ptr<RedisClient> redis_client_;
// The io service used by internal kv.
instrumented_io_context io_service_;
std::unique_ptr<std::thread> io_thread_;
boost::asio::io_service::work work_;
};

/// This implementation class of `InternalKVHandler`.
class GcsInternalKVManager : public rpc::InternalKVHandler {
public:
Expand Down
4 changes: 1 addition & 3 deletions src/ray/gcs/gcs_server/test/gcs_kv_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class GcsKVManagerTest : public ::testing::TestWithParam<std::string> {
ray::gcs::RedisClientOptions redis_client_options(
"127.0.0.1", ray::TEST_REDIS_SERVER_PORTS.front(), "", false);
if (GetParam() == "redis") {
kv_instance = std::make_unique<ray::gcs::RedisInternalKV>(redis_client_options);
} else if (GetParam() == "redis_client") {
auto client = std::make_shared<ray::gcs::RedisClient>(redis_client_options);
RAY_CHECK_OK(client->Connect(io_service));
kv_instance = std::make_unique<ray::gcs::StoreClientInternalKV>(
Expand Down Expand Up @@ -107,7 +105,7 @@ TEST_P(GcsKVManagerTest, TestInternalKV) {

INSTANTIATE_TEST_SUITE_P(GcsKVManagerTestFixture,
GcsKVManagerTest,
::testing::Values("redis", "redis_client", "memory"));
::testing::Values("redis", "memory"));

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
Expand Down

0 comments on commit 9ab6421

Please sign in to comment.