Skip to content

Commit

Permalink
suport sending command to all nodes in cluster with the for_each inte…
Browse files Browse the repository at this point in the history
…rface.
  • Loading branch information
sewenew committed Nov 19, 2023
1 parent b67d9b3 commit 5eccac2
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/sw/redis++/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ class RedisCluster {

Subscriber subscriber();

/// @brief Run the given callback with each node in the cluster.
/// The following is the prototype of the callback: void (Redis &r);
///
/// Example:
/// @code{.cpp}
/// cluster.for_each([](Redis &r) { r.ping(); });
/// @endcode
template <typename Callback>
void for_each(Callback &&cb);

template <typename Cmd, typename Key, typename ...Args>
auto command(Cmd cmd, Key &&key, Args &&...args)
-> typename std::enable_if<!std::is_convertible<Cmd, StringView>::value, ReplyUPtr>::type;
Expand Down
14 changes: 14 additions & 0 deletions src/sw/redis++/redis_cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ namespace sw {

namespace redis {

template <typename Callback>
void RedisCluster::for_each(Callback &&cb) {
// Update the underlying slot-node mapping to ensure we get the latest one.
_pool.update();

auto pools = _pool.pools();
for (auto &pool : pools) {
auto connection = std::make_shared<GuardedConnection>(pool);
auto r = Redis(connection);

cb(r);
}
}

template <typename Cmd, typename Key, typename ...Args>
auto RedisCluster::command(Cmd cmd, Key &&key, Args &&...args)
-> typename std::enable_if<!std::is_convertible<Cmd, StringView>::value, ReplyUPtr>::type {
Expand Down
12 changes: 12 additions & 0 deletions src/sw/redis++/shards_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ Shards ShardsPool::shards() {
return _shards;
}

std::vector<ConnectionPoolSPtr> ShardsPool::pools() {
std::lock_guard<std::mutex> lock(_mutex);

std::vector<ConnectionPoolSPtr> nodes;
nodes.reserve(_pools.size());
for (const auto &pool : _pools) {
nodes.push_back(pool.second);
}

return nodes;
}

void ShardsPool::_move(ShardsPool &&that) {
_pool_opts = that._pool_opts;
_connection_opts = that._connection_opts;
Expand Down
3 changes: 3 additions & 0 deletions src/sw/redis++/shards_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <random>
#include <memory>
#include <vector>
#include "sw/redis++/reply.h"
#include "sw/redis++/connection_pool.h"
#include "sw/redis++/shards.h"
Expand Down Expand Up @@ -63,6 +64,8 @@ class ShardsPool {

Shards shards();

std::vector<ConnectionPoolSPtr> pools();

private:
void _move(ShardsPool &&that);

Expand Down
57 changes: 57 additions & 0 deletions test/src/sw/redis++/cluster_test.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**************************************************************************
Copyright (c) 2023 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/

#ifndef SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_H
#define SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_H

#include <sw/redis++/redis++.h>

namespace sw {

namespace redis {

namespace test {

template <typename RedisInstance>
class ClusterTest {
public:
explicit ClusterTest(RedisInstance &instance) : _redis(instance) {}

void run();

private:
RedisInstance &_redis;
};

template <>
class ClusterTest<sw::redis::Redis> {
public:
explicit ClusterTest(sw::redis::Redis &) {}

void run() {
// Do nothing, since this is cluster specific test.
}
};

}

}

}

#include "cluster_test.hpp"

#endif // end SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_H
41 changes: 41 additions & 0 deletions test/src/sw/redis++/cluster_test.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**************************************************************************
Copyright (c) 2023 sewenew
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************/

#ifndef SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_HPP
#define SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_HPP

#include "utils.h"

namespace sw {

namespace redis {

namespace test {

template <typename RedisInstance>
void ClusterTest<RedisInstance>::run() {
_redis.for_each([](sw::redis::Redis &r) {
REDIS_ASSERT(r.ping() == "PONG", "failed to test for_each");
});
}

}

}

}

#endif // end SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_HPP
6 changes: 6 additions & 0 deletions test/src/sw/redis++/test_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "pipeline_transaction_test.h"
#include "threads_test.h"
#include "stream_cmds_test.h"
#include "cluster_test.h"
#include "benchmark_test.h"

#ifdef REDIS_PLUS_PLUS_RUN_ASYNC_TEST
Expand Down Expand Up @@ -451,6 +452,11 @@ void run_test(const sw::redis::ConnectionOptions &opts, const TestOptions &test_
stream_test.run();

std::cout << "Pass stream commands tests" << std::endl;

sw::redis::test::ClusterTest<RedisInstance> cluster_test(instance);
cluster_test.run();

std::cout << "Pass cluster specific tests" << std::endl;
}

template <typename RedisInstance>
Expand Down

0 comments on commit 5eccac2

Please sign in to comment.