Skip to content

Commit

Permalink
exponential Replication check (#19)
Browse files Browse the repository at this point in the history
* exponential Replication check

* updating comment

* updating comment

* updating comment
  • Loading branch information
deepakverma authored May 21, 2021
1 parent ed96413 commit 370736e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
7 changes: 0 additions & 7 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3175,13 +3175,6 @@ public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None
/// <param name="key">The <see cref="RedisKey"/> to determine the hash slot for.</param>
public int GetHashSlot(RedisKey key) => ServerSelectionStrategy.HashSlot(key);

internal void MarkServerEndpointsForReplicationRoleRefresh()
{
foreach (var s in servers.Values)
{
((ServerEndPoint)s).ForceReplicationCheck = true;
}
}
}

internal enum WriteResult
Expand Down
15 changes: 11 additions & 4 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,17 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
oldState = default(State); // only defined when isCurrent = true
if (isCurrent = (physical == connection))
{
Multiplexer.MarkServerEndpointsForReplicationRoleRefresh();

Trace("Bridge noting disconnect from active connection" + (isDisposed ? " (disposed)" : ""));
oldState = ChangeState(State.Disconnected);
if(oldState == State.ConnectedEstablished && !ServerEndPoint.IsReplica)
{
// if the disconnected endpoint was a master endpoint run info replication
// more frequently on it's replica with exponential increments
foreach (var r in ServerEndPoint.Replicas)
{
r.ForceExponentiallyReplicationCheck();
}
}
physical = null;

if (!isDisposed && Interlocked.Increment(ref failConnectCount) == 1)
Expand Down Expand Up @@ -538,10 +545,10 @@ internal void OnHeartbeat(bool ifConnectedOnly)
}
tmp.OnBridgeHeartbeat();
int writeEverySeconds = ServerEndPoint.WriteEverySeconds,
checkConfigSeconds = Multiplexer.RawConfig.ConfigCheckSeconds;
checkConfigSeconds = ServerEndPoint.ConfigCheckSeconds;

if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive
&& ((checkConfigSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= checkConfigSeconds) || ServerEndPoint.ForceReplicationCheck)
&& ((checkConfigSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= checkConfigSeconds))
&& ServerEndPoint.CheckInfoReplication())
{
// that serves as a keep-alive, if it is accepted
Expand Down
29 changes: 26 additions & 3 deletions src/StackExchange.Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
databases = 0;
writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60;
serverType = ServerType.Standalone;

ConfigCheckSeconds = Multiplexer.RawConfig.ConfigCheckSeconds;
// overrides for twemproxy
if (multiplexer.RawConfig.Proxy == Proxy.Twemproxy)
{
Expand Down Expand Up @@ -537,10 +537,14 @@ public EndPoint MasterEndPoint
set { SetConfig(ref masterEndPoint, value); }
}

internal volatile int ConfigCheckSeconds;

internal bool CheckInfoReplication()
{
lastInfoReplicationCheckTicks = Environment.TickCount;
ForceReplicationCheck = false;

ResetExponentiallyReplicationCheck();

PhysicalBridge bridge;
if (version >= RedisFeatures.v2_8_0 && Multiplexer.CommandMap.IsAvailable(RedisCommand.INFO)
&& (bridge = GetBridge(ConnectionType.Interactive, false)) != null)
Expand All @@ -556,7 +560,26 @@ internal bool CheckInfoReplication()
}

private int lastInfoReplicationCheckTicks;
internal volatile bool ForceReplicationCheck;
[ThreadStatic]
private static Random r;


// Forces frequent replication check starting from 1 second upto max ConfigCheckSeconds with an exponential increment
internal void ForceExponentiallyReplicationCheck()
{
ConfigCheckSeconds = 1; // start checking info replication more frequently
}

private void ResetExponentiallyReplicationCheck()
{
if (ConfigCheckSeconds < Multiplexer.RawConfig.ConfigCheckSeconds)
{
r = r ?? new Random();
var newExponentialConfigCheck = ConfigCheckSeconds * 2;
var jitter = r.Next(ConfigCheckSeconds + 1, newExponentialConfigCheck);
ConfigCheckSeconds = Math.Min(jitter, Multiplexer.RawConfig.ConfigCheckSeconds);
}
}

private int _heartBeatActive;
internal void OnHeartbeat()
Expand Down

0 comments on commit 370736e

Please sign in to comment.