diff --git a/src/sw/redis++/async_connection.h b/src/sw/redis++/async_connection.h index cfbcb49..b047d1c 100644 --- a/src/sw/redis++/async_connection.h +++ b/src/sw/redis++/async_connection.h @@ -512,8 +512,6 @@ class ClusterEvent : public CommandEvent { // i.e. ClosedError or IoError, we need to update node-slot mapping. try { std::rethrow_exception(err); - } catch (const SlotUncoveredError &) { - detail::update_shards(_key, _pool, AsyncEventUPtr(new UpdateShardsEvent)); } catch (const IoError &) { detail::update_shards(_key, _pool, AsyncEventUPtr(new UpdateShardsEvent)); } catch (const ClosedError &) { diff --git a/src/sw/redis++/async_redis_cluster.cpp b/src/sw/redis++/async_redis_cluster.cpp index 4b0dc6f..51b2d14 100644 --- a/src/sw/redis++/async_redis_cluster.cpp +++ b/src/sw/redis++/async_redis_cluster.cpp @@ -32,14 +32,24 @@ AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts, _loop = std::make_shared(); } - _pool = std::make_shared(_loop, pool_opts, opts, role); + _pool = std::make_shared(_loop, pool_opts, opts, role, ClusterOptions{}); +} + +AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts, + const ConnectionPoolOptions &pool_opts, + Role role, + const ClusterOptions &cluster_opts, + const EventLoopSPtr &loop) : _loop(loop) { + if (!_loop) { + _loop = std::make_shared(); + } + + _pool = std::make_shared(_loop, pool_opts, opts, role, cluster_opts); } AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection) { assert(_pool); - _pool->update(); - auto pool = _pool->fetch(hash_tag); if (new_connection) { // Create a new pool. @@ -52,8 +62,6 @@ AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connect AsyncSubscriber AsyncRedisCluster::subscriber() { assert(_pool); - _pool->update(); - auto opts = _pool->connection_options(); auto connection = std::make_shared(opts, _loop.get()); @@ -65,8 +73,6 @@ AsyncSubscriber AsyncRedisCluster::subscriber() { AsyncSubscriber AsyncRedisCluster::subscriber(const StringView &hash_tag) { assert(_pool); - _pool->update(); - auto opts = _pool->connection_options(hash_tag); auto connection = std::make_shared(opts, _loop.get()); diff --git a/src/sw/redis++/async_redis_cluster.h b/src/sw/redis++/async_redis_cluster.h index 650d986..1e772ce 100644 --- a/src/sw/redis++/async_redis_cluster.h +++ b/src/sw/redis++/async_redis_cluster.h @@ -39,6 +39,12 @@ class AsyncRedisCluster { Role role = Role::MASTER, const EventLoopSPtr &loop = nullptr); + AsyncRedisCluster(const ConnectionOptions &opts, + const ConnectionPoolOptions &pool_opts, + Role role, + const ClusterOptions &cluster_opts, + const EventLoopSPtr &loop = nullptr); + explicit AsyncRedisCluster(const std::string &uri) : AsyncRedisCluster(Uri(uri)) {} AsyncRedisCluster(const AsyncRedisCluster &) = delete; diff --git a/src/sw/redis++/async_shards_pool.cpp b/src/sw/redis++/async_shards_pool.cpp index 257ed16..5ce3857 100644 --- a/src/sw/redis++/async_shards_pool.cpp +++ b/src/sw/redis++/async_shards_pool.cpp @@ -29,10 +29,12 @@ const std::size_t AsyncShardsPool::SHARDS; AsyncShardsPool::AsyncShardsPool(const EventLoopSPtr &loop, const ConnectionPoolOptions &pool_opts, const ConnectionOptions &connection_opts, - Role role) : + Role role, + const ClusterOptions &cluster_opts) : _pool_opts(pool_opts), _connection_opts(connection_opts), _role(role), + _cluster_opts(cluster_opts), _loop(loop) { assert(loop); @@ -208,8 +210,12 @@ auto AsyncShardsPool::_fetch_events() -> std::queue { std::queue events; std::unique_lock lock(_mutex); - if (_events.empty()) { - _cv.wait(lock, [this]() { return !(this->_events).empty(); } ); + + if (!_cv.wait_for(lock, + _cluster_opts.slot_map_refresh_interval, + [this]() { return !(this->_events).empty(); })) { + // Reach timeout, but there's still no event, put an update event. + _events.push(RedeliverEvent{{}, AsyncEventUPtr(new UpdateShardsEvent)}); } events.swap(_events); @@ -238,7 +244,7 @@ Shards AsyncShardsPool::_get_shards(const std::string &host, int port) { auto opts = _connection_opts; opts.host = host; opts.port = port; - ShardsPool pool(_pool_opts, opts, _role); + ShardsPool pool(_pool_opts, opts, _role, _cluster_opts); return pool.shards(); } diff --git a/src/sw/redis++/async_shards_pool.h b/src/sw/redis++/async_shards_pool.h index 98c6419..5e61d51 100644 --- a/src/sw/redis++/async_shards_pool.h +++ b/src/sw/redis++/async_shards_pool.h @@ -43,7 +43,8 @@ class AsyncShardsPool { AsyncShardsPool(const EventLoopSPtr &loop, const ConnectionPoolOptions &pool_opts, const ConnectionOptions &connection_opts, - Role role); + Role role, + const ClusterOptions &cluster_opts); AsyncConnectionPoolSPtr fetch(const StringView &key); @@ -101,6 +102,8 @@ class AsyncShardsPool { Role _role = Role::MASTER; + ClusterOptions _cluster_opts; + Shards _shards; NodeMap _pools; diff --git a/src/sw/redis++/crc16.cpp b/src/sw/redis++/crc16.cpp index 81495ef..24d3dcc 100644 --- a/src/sw/redis++/crc16.cpp +++ b/src/sw/redis++/crc16.cpp @@ -42,7 +42,7 @@ * Output for "123456789" : 31C3 */ -#include "utils.h" +#include "sw/redis++/utils.h" #include namespace sw { diff --git a/src/sw/redis++/redis_cluster.cpp b/src/sw/redis++/redis_cluster.cpp index cc37305..bd1d3bd 100644 --- a/src/sw/redis++/redis_cluster.cpp +++ b/src/sw/redis++/redis_cluster.cpp @@ -32,8 +32,6 @@ RedisCluster::RedisCluster(const Uri &uri) : Redis RedisCluster::redis(const StringView &hash_tag, bool new_connection) { assert(_pool); - _pool->async_update(); - auto pool = _pool->fetch(hash_tag); if (new_connection) { // Create a new pool @@ -46,8 +44,6 @@ Redis RedisCluster::redis(const StringView &hash_tag, bool new_connection) { Pipeline RedisCluster::pipeline(const StringView &hash_tag, bool new_connection) { assert(_pool); - _pool->async_update(); - auto pool = _pool->fetch(hash_tag); if (new_connection) { // Create a new pool @@ -60,8 +56,6 @@ Pipeline RedisCluster::pipeline(const StringView &hash_tag, bool new_connection) Transaction RedisCluster::transaction(const StringView &hash_tag, bool piped, bool new_connection) { assert(_pool); - _pool->async_update(); - auto pool = _pool->fetch(hash_tag); if (new_connection) { // Create a new pool @@ -74,8 +68,6 @@ Transaction RedisCluster::transaction(const StringView &hash_tag, bool piped, bo Subscriber RedisCluster::subscriber() { assert(_pool); - _pool->async_update(); - auto opts = _pool->connection_options(); return Subscriber(Connection(opts)); } @@ -83,8 +75,6 @@ Subscriber RedisCluster::subscriber() { Subscriber RedisCluster::subscriber(const StringView &hash_tag) { assert(_pool); - _pool->async_update(); - auto opts = _pool->connection_options(hash_tag); return Subscriber(Connection(opts)); } diff --git a/src/sw/redis++/redis_cluster.h b/src/sw/redis++/redis_cluster.h index 011c030..8975808 100644 --- a/src/sw/redis++/redis_cluster.h +++ b/src/sw/redis++/redis_cluster.h @@ -47,7 +47,8 @@ class RedisCluster { public: explicit RedisCluster(const ConnectionOptions &connection_opts, const ConnectionPoolOptions &pool_opts = {}, - Role role = Role::MASTER) : _pool(new ShardsPool(pool_opts, connection_opts, role)) {} + Role role = Role::MASTER, + const ClusterOptions &cluster_opts = {}) : _pool(new ShardsPool(pool_opts, connection_opts, role, cluster_opts)) {} // Construct RedisCluster with URI: // "tcp://127.0.0.1" or "tcp://127.0.0.1:6379" diff --git a/src/sw/redis++/redis_cluster.hpp b/src/sw/redis++/redis_cluster.hpp index ab10970..3068862 100644 --- a/src/sw/redis++/redis_cluster.hpp +++ b/src/sw/redis++/redis_cluster.hpp @@ -1351,11 +1351,6 @@ ReplyUPtr RedisCluster::_command(Cmd cmd, const StringView &key, Args &&...args) SafeConnection safe_connection(*pool); return _command(cmd, safe_connection.connection(), std::forward(args)...); - } catch (const SlotUncoveredError &) { - // Some slot is not covered, update asynchronously to see if new node added. - // Check https://github.com/sewenew/redis-plus-plus/issues/255 for detail. - // TODO: should we replace other 'update's with 'async_update's? - _pool->async_update(); } catch (const IoError &) { // When master is down, one of its replicas will be promoted to be the new master. // If we try to send command to the old master, we'll get an *IoError*. diff --git a/src/sw/redis++/shards_pool.cpp b/src/sw/redis++/shards_pool.cpp index aae4932..ddd816f 100644 --- a/src/sw/redis++/shards_pool.cpp +++ b/src/sw/redis++/shards_pool.cpp @@ -26,10 +26,12 @@ const std::size_t ShardsPool::SHARDS; ShardsPool::ShardsPool(const ConnectionPoolOptions &pool_opts, const ConnectionOptions &connection_opts, - Role role) : + Role role, + const ClusterOptions &cluster_opts) : _pool_opts(pool_opts), _connection_opts(connection_opts), - _role(role) { + _role(role), + _cluster_opts(cluster_opts) { if (_connection_opts.type != ConnectionType::TCP) { throw Error("Only support TCP connection for Redis Cluster"); } @@ -47,7 +49,7 @@ ShardsPool::~ShardsPool() { { std::lock_guard lock(_mutex); - _update_status = UpdateStatus::STOP; + _stop = true; } _cv.notify_one(); @@ -172,22 +174,6 @@ std::vector ShardsPool::pools() { return nodes; } -void ShardsPool::async_update() { - bool should_update = false; - { - std::lock_guard lock(_mutex); - - if (_update_status == UpdateStatus::UPDATED) { - should_update = true; - _update_status = UpdateStatus::STALE; - } - } - - if (should_update) { - _cv.notify_one(); - } -} - void ShardsPool::_init_pool(const Shards &shards) { for (const auto &shard : shards) { _add_node(shard.second); @@ -385,34 +371,20 @@ auto ShardsPool::_add_node(const Node &node) -> NodeMap::iterator { void ShardsPool::_run() { while (true) { std::unique_lock lock(_mutex); - if (_update_status == UpdateStatus::UPDATED) { - _cv.wait(lock, [this]() { return this->_update_status != UpdateStatus::UPDATED; }); - } - if (_update_status == UpdateStatus::STOP) { + if (_cv.wait_for(lock, + _cluster_opts.slot_map_refresh_interval, + [this]() { return this->_stop; })) { break; - } else if (_update_status == UpdateStatus::STALE) { - lock.unlock(); - - _do_async_update(); - } else { - assert("invalid UpdateStatus"); } - } -} - -void ShardsPool::_do_async_update() { - try { - update(); - std::lock_guard lock(_mutex); + lock.unlock(); - if (_update_status != UpdateStatus::STOP) { - _update_status = UpdateStatus::UPDATED; + try { + update(); + } catch (...) { + // Ignore exceptions. } - } catch (...) { - // Ignore exceptions. - // TODO: should we sleep a while? } } diff --git a/src/sw/redis++/shards_pool.h b/src/sw/redis++/shards_pool.h index e114351..4e82927 100644 --- a/src/sw/redis++/shards_pool.h +++ b/src/sw/redis++/shards_pool.h @@ -18,6 +18,7 @@ #define SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H #include +#include #include #include #include @@ -34,10 +35,13 @@ namespace sw { namespace redis { +struct ClusterOptions { + // Automatically update slot map every `slot_map_refresh_interval`. + std::chrono::milliseconds slot_map_refresh_interval = std::chrono::seconds(10); +}; + class ShardsPool { public: - ShardsPool() = default; - ShardsPool(const ShardsPool &that) = delete; ShardsPool& operator=(const ShardsPool &that) = delete; @@ -48,7 +52,8 @@ class ShardsPool { ShardsPool(const ConnectionPoolOptions &pool_opts, const ConnectionOptions &connection_opts, - Role role); + Role role, + const ClusterOptions &cluster_opts); // Fetch a connection by key. ConnectionPoolSPtr fetch(const StringView &key); @@ -69,8 +74,6 @@ class ShardsPool { std::vector pools(); - void async_update(); - private: void _init_pool(const Shards &shards); @@ -117,12 +120,7 @@ class ShardsPool { NodeMap _pools; - enum class UpdateStatus { - STALE = 0, - UPDATED, - STOP - }; - UpdateStatus _update_status = UpdateStatus::UPDATED; + bool _stop = false; std::thread _worker; @@ -132,6 +130,8 @@ class ShardsPool { Role _role = Role::MASTER; + ClusterOptions _cluster_opts; + static const std::size_t SHARDS = 16383; };