diff --git a/src/sw/redis++/redis_cluster.h b/src/sw/redis++/redis_cluster.h index 3d30e92..161fe50 100644 --- a/src/sw/redis++/redis_cluster.h +++ b/src/sw/redis++/redis_cluster.h @@ -68,6 +68,16 @@ class RedisCluster { Subscriber subscriber(); + /// @brief Run the given callback with each node in the cluster. + /// The following is the prototype of the callback: void (Redis &r); + /// + /// Example: + /// @code{.cpp} + /// cluster.for_each([](Redis &r) { r.ping(); }); + /// @endcode + template + void for_each(Callback &&cb); + template auto command(Cmd cmd, Key &&key, Args &&...args) -> typename std::enable_if::value, ReplyUPtr>::type; diff --git a/src/sw/redis++/redis_cluster.hpp b/src/sw/redis++/redis_cluster.hpp index 62c454b..a0bd152 100644 --- a/src/sw/redis++/redis_cluster.hpp +++ b/src/sw/redis++/redis_cluster.hpp @@ -28,6 +28,20 @@ namespace sw { namespace redis { +template +void RedisCluster::for_each(Callback &&cb) { + // Update the underlying slot-node mapping to ensure we get the latest one. + _pool.update(); + + auto pools = _pool.pools(); + for (auto &pool : pools) { + auto connection = std::make_shared(pool); + auto r = Redis(connection); + + cb(r); + } +} + template auto RedisCluster::command(Cmd cmd, Key &&key, Args &&...args) -> typename std::enable_if::value, ReplyUPtr>::type { diff --git a/src/sw/redis++/shards_pool.cpp b/src/sw/redis++/shards_pool.cpp index 77d68a2..7fd05dd 100644 --- a/src/sw/redis++/shards_pool.cpp +++ b/src/sw/redis++/shards_pool.cpp @@ -162,6 +162,18 @@ Shards ShardsPool::shards() { return _shards; } +std::vector ShardsPool::pools() { + std::lock_guard lock(_mutex); + + std::vector nodes; + nodes.reserve(_pools.size()); + for (const auto &pool : _pools) { + nodes.push_back(pool.second); + } + + return nodes; +} + void ShardsPool::_move(ShardsPool &&that) { _pool_opts = that._pool_opts; _connection_opts = that._connection_opts; diff --git a/src/sw/redis++/shards_pool.h b/src/sw/redis++/shards_pool.h index 28b275e..cbbdb97 100644 --- a/src/sw/redis++/shards_pool.h +++ b/src/sw/redis++/shards_pool.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "sw/redis++/reply.h" #include "sw/redis++/connection_pool.h" #include "sw/redis++/shards.h" @@ -63,6 +64,8 @@ class ShardsPool { Shards shards(); + std::vector pools(); + private: void _move(ShardsPool &&that); diff --git a/test/src/sw/redis++/cluster_test.h b/test/src/sw/redis++/cluster_test.h new file mode 100644 index 0000000..1e260e8 --- /dev/null +++ b/test/src/sw/redis++/cluster_test.h @@ -0,0 +1,57 @@ +/************************************************************************** + Copyright (c) 2023 sewenew + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + *************************************************************************/ + +#ifndef SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_H +#define SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_H + +#include + +namespace sw { + +namespace redis { + +namespace test { + +template +class ClusterTest { +public: + explicit ClusterTest(RedisInstance &instance) : _redis(instance) {} + + void run(); + +private: + RedisInstance &_redis; +}; + +template <> +class ClusterTest { +public: + explicit ClusterTest(sw::redis::Redis &) {} + + void run() { + // Do nothing, since this is cluster specific test. + } +}; + +} + +} + +} + +#include "cluster_test.hpp" + +#endif // end SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_H diff --git a/test/src/sw/redis++/cluster_test.hpp b/test/src/sw/redis++/cluster_test.hpp new file mode 100644 index 0000000..3001b07 --- /dev/null +++ b/test/src/sw/redis++/cluster_test.hpp @@ -0,0 +1,41 @@ +/************************************************************************** + Copyright (c) 2023 sewenew + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + *************************************************************************/ + +#ifndef SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_HPP +#define SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_HPP + +#include "utils.h" + +namespace sw { + +namespace redis { + +namespace test { + +template +void ClusterTest::run() { + _redis.for_each([](sw::redis::Redis &r) { + REDIS_ASSERT(r.ping() == "PONG", "failed to test for_each"); + }); +} + +} + +} + +} + +#endif // end SEWENEW_REDISPLUSPLUS_TEST_CLUSTER_TEST_HPP diff --git a/test/src/sw/redis++/test_main.cpp b/test/src/sw/redis++/test_main.cpp index 5998e90..05f5258 100644 --- a/test/src/sw/redis++/test_main.cpp +++ b/test/src/sw/redis++/test_main.cpp @@ -46,6 +46,7 @@ #include "pipeline_transaction_test.h" #include "threads_test.h" #include "stream_cmds_test.h" +#include "cluster_test.h" #include "benchmark_test.h" #ifdef REDIS_PLUS_PLUS_RUN_ASYNC_TEST @@ -451,6 +452,11 @@ void run_test(const sw::redis::ConnectionOptions &opts, const TestOptions &test_ stream_test.run(); std::cout << "Pass stream commands tests" << std::endl; + + sw::redis::test::ClusterTest cluster_test(instance); + cluster_test.run(); + + std::cout << "Pass cluster specific tests" << std::endl; } template