From 370736e85449522716683605c66f649c31847850 Mon Sep 17 00:00:00 2001 From: dv Date: Fri, 21 May 2021 15:40:52 -0700 Subject: [PATCH] exponential Replication check (#19) * exponential Replication check * updating comment * updating comment * updating comment --- .../ConnectionMultiplexer.cs | 7 ----- src/StackExchange.Redis/PhysicalBridge.cs | 15 +++++++--- src/StackExchange.Redis/ServerEndPoint.cs | 29 +++++++++++++++++-- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index 2e6146dcf..43ef1649d 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -3175,13 +3175,6 @@ public Task PublishReconfigureAsync(CommandFlags flags = CommandFlags.None /// The to determine the hash slot for. public int GetHashSlot(RedisKey key) => ServerSelectionStrategy.HashSlot(key); - internal void MarkServerEndpointsForReplicationRoleRefresh() - { - foreach (var s in servers.Values) - { - ((ServerEndPoint)s).ForceReplicationCheck = true; - } - } } internal enum WriteResult diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index 50f0d68f0..2877b8cba 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -427,10 +427,17 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti oldState = default(State); // only defined when isCurrent = true if (isCurrent = (physical == connection)) { - Multiplexer.MarkServerEndpointsForReplicationRoleRefresh(); - Trace("Bridge noting disconnect from active connection" + (isDisposed ? " (disposed)" : "")); oldState = ChangeState(State.Disconnected); + if(oldState == State.ConnectedEstablished && !ServerEndPoint.IsReplica) + { + // if the disconnected endpoint was a master endpoint run info replication + // more frequently on it's replica with exponential increments + foreach (var r in ServerEndPoint.Replicas) + { + r.ForceExponentiallyReplicationCheck(); + } + } physical = null; if (!isDisposed && Interlocked.Increment(ref failConnectCount) == 1) @@ -538,10 +545,10 @@ internal void OnHeartbeat(bool ifConnectedOnly) } tmp.OnBridgeHeartbeat(); int writeEverySeconds = ServerEndPoint.WriteEverySeconds, - checkConfigSeconds = Multiplexer.RawConfig.ConfigCheckSeconds; + checkConfigSeconds = ServerEndPoint.ConfigCheckSeconds; if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive - && ((checkConfigSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= checkConfigSeconds) || ServerEndPoint.ForceReplicationCheck) + && ((checkConfigSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= checkConfigSeconds)) && ServerEndPoint.CheckInfoReplication()) { // that serves as a keep-alive, if it is accepted diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs index b516588af..1b516af5e 100644 --- a/src/StackExchange.Redis/ServerEndPoint.cs +++ b/src/StackExchange.Redis/ServerEndPoint.cs @@ -56,7 +56,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) databases = 0; writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60; serverType = ServerType.Standalone; - + ConfigCheckSeconds = Multiplexer.RawConfig.ConfigCheckSeconds; // overrides for twemproxy if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy) { @@ -537,10 +537,14 @@ public EndPoint MasterEndPoint set { SetConfig(ref masterEndPoint, value); } } + internal volatile int ConfigCheckSeconds; + internal bool CheckInfoReplication() { lastInfoReplicationCheckTicks = Environment.TickCount; - ForceReplicationCheck = false; + + ResetExponentiallyReplicationCheck(); + PhysicalBridge bridge; if (version >= RedisFeatures.v2_8_0 && Multiplexer.CommandMap.IsAvailable(RedisCommand.INFO) && (bridge = GetBridge(ConnectionType.Interactive, false)) != null) @@ -556,7 +560,26 @@ internal bool CheckInfoReplication() } private int lastInfoReplicationCheckTicks; - internal volatile bool ForceReplicationCheck; + [ThreadStatic] + private static Random r; + + + // Forces frequent replication check starting from 1 second upto max ConfigCheckSeconds with an exponential increment + internal void ForceExponentiallyReplicationCheck() + { + ConfigCheckSeconds = 1; // start checking info replication more frequently + } + + private void ResetExponentiallyReplicationCheck() + { + if (ConfigCheckSeconds < Multiplexer.RawConfig.ConfigCheckSeconds) + { + r = r ?? new Random(); + var newExponentialConfigCheck = ConfigCheckSeconds * 2; + var jitter = r.Next(ConfigCheckSeconds + 1, newExponentialConfigCheck); + ConfigCheckSeconds = Math.Min(jitter, Multiplexer.RawConfig.ConfigCheckSeconds); + } + } private int _heartBeatActive; internal void OnHeartbeat()