Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ROLE command #1551

Merged
merged 5 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2407,7 +2407,7 @@ public ConnectionMultiplexer GetSentinelMasterConnection(ConfigurationOptions co

// verify role is master according to:
// https://redis.io/topics/sentinel-clients
if (connection.GetServer(newMasterEndPoint)?.Role() == RedisLiterals.master)
if (connection.GetServer(newMasterEndPoint)?.Role().Value == RedisLiterals.master)
{
success = true;
break;
Expand Down
1 change: 1 addition & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ internal enum RedisCommand
RENAMENX,
REPLICAOF,
RESTORE,
ROLE,
RPOP,
RPOPLPUSH,
RPUSH,
Expand Down
22 changes: 13 additions & 9 deletions src/StackExchange.Redis/Interfaces/IServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ public partial interface IServer : IRedis
/// Get summary statistics associates with this server
/// </summary>
ServerCounters GetCounters();

/// <summary>
/// The INFO command returns information and statistics about the server in a format that is simple to parse by computers and easy to read by humans.
/// </summary>
Expand Down Expand Up @@ -423,6 +424,18 @@ public partial interface IServer : IRedis
/// <param name="log">The log to write output to.</param>
void MakeMaster(ReplicationChangeOptions options, TextWriter log = null);

/// <summary>
/// Returns the role info for the current server.
/// </summary>
/// <remarks>https://redis.io/commands/role</remarks>
Role Role(CommandFlags flags = CommandFlags.None);

/// <summary>
/// Returns the role info for the current server.
/// </summary>
/// <remarks>https://redis.io/commands/role</remarks>
Task<Role> RoleAsync(CommandFlags flags = CommandFlags.None);

/// <summary>
/// Explicitly request the database to persist the current state to disk
/// </summary>
Expand Down Expand Up @@ -1042,14 +1055,5 @@ internal static class IServerExtensions
/// </summary>
/// <param name="server">The server to simulate failure on.</param>
public static void SimulateConnectionFailure(this IServer server) => (server as RedisServer)?.SimulateConnectionFailure();

public static string Role(this IServer server)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the existing test helper and changed the usage to use the new code

{
var result = (RedisResult[])server.Execute("ROLE");
if (result != null && result.Length > 0)
return result[0].ToString();

return null;
}
}
}
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public bool IsAdmin
case RedisCommand.KEYS:
case RedisCommand.MONITOR:
case RedisCommand.REPLICAOF:
case RedisCommand.ROLE:
case RedisCommand.SAVE:
case RedisCommand.SHUTDOWN:
case RedisCommand.SLAVEOF:
Expand Down Expand Up @@ -573,6 +574,7 @@ internal static bool RequiresDatabase(RedisCommand command)
case RedisCommand.READONLY:
case RedisCommand.READWRITE:
case RedisCommand.REPLICAOF:
case RedisCommand.ROLE:
case RedisCommand.SAVE:
case RedisCommand.SCRIPT:
case RedisCommand.SHUTDOWN:
Expand Down
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/RawResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ internal bool TryGetDouble(out double val)

internal bool TryGetInt64(out long value)
{
if(IsNull || Payload.IsEmpty || Payload.Length > PhysicalConnection.MaxInt64TextLen)
if (IsNull || Payload.IsEmpty || Payload.Length > PhysicalConnection.MaxInt64TextLen)
{
value = 0;
return false;
Expand Down
8 changes: 8 additions & 0 deletions src/StackExchange.Redis/RedisLiterals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ public static readonly RedisValue
REMOVE = "REMOVE",
// SET = "SET",

// replication states
connect = "connect",
connected = "connected",
connecting = "connecting",
handshake = "handshake",
none = "none",
sync = "sync",

MinusSymbol = "-",
PlusSumbol = "+",
Wildcard = "*",
Expand Down
12 changes: 12 additions & 0 deletions src/StackExchange.Redis/RedisServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,18 @@ public void MakeMaster(ReplicationChangeOptions options, TextWriter log = null)
}
}

public Role Role(CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.ROLE);
return ExecuteSync(msg, ResultProcessor.Role);
}

public Task<Role> RoleAsync(CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.ROLE);
return ExecuteAsync(msg, ResultProcessor.Role);
}

public void Save(SaveType type, CommandFlags flags = CommandFlags.None)
{
var msg = GetSaveMessage(type, flags);
Expand Down
132 changes: 132 additions & 0 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public static readonly ResultProcessor<GeoPosition?>
public static readonly ResultProcessor<TimeSpan>
ResponseTimer = new TimingProcessor();

public static readonly ResultProcessor<Role>
Role = new RoleProcessor();

public static readonly ResultProcessor<RedisResult>
ScriptResult = new ScriptResultProcessor();

Expand Down Expand Up @@ -1363,6 +1366,135 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
}

private sealed class RoleProcessor : ResultProcessor<Role>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
{
var items = result.GetItems();
if (items.IsEmpty)
{
return false;
}

ref var val = ref items[0];
Role role;
if (val.IsEqual(RedisLiterals.master)) role = ParseMaster(items);
else if (val.IsEqual(RedisLiterals.slave)) role = ParseReplica(items, RedisLiterals.slave);
else if (val.IsEqual(RedisLiterals.replica)) role = ParseReplica(items, RedisLiterals.replica); // for when "slave" is deprecated
else if (val.IsEqual(RedisLiterals.sentinel)) role = ParseSentinel(items);
else role = new Role.Unknown(val.GetString());

if (role is null) return false;
SetResult(message, role);
return true;
}

private static Role ParseMaster(in Sequence<RawResult> items)
{
if (items.Length < 3)
{
return null;
}

if (!items[1].TryGetInt64(out var offset))
{
return null;
}

var replicaItems = items[2].GetItems();
ICollection<Role.Master.Replica> replicas;
if (replicaItems.IsEmpty)
{
replicas = Array.Empty<Role.Master.Replica>();
}
else
{
replicas = new List<Role.Master.Replica>((int)replicaItems.Length);
for (int i = 0; i < replicaItems.Length; i++)
{
if (TryParseMasterReplica(replicaItems[i].GetItems(), out var replica))
{
replicas.Add(replica);
}
else
{
return null;
}
}
}

return new Role.Master(offset, replicas);
}

private static bool TryParseMasterReplica(in Sequence<RawResult> items, out Role.Master.Replica replica)
{
if (items.Length < 3)
{
replica = default;
return false;
}

var masterIp = items[0].GetString();

if (!items[1].TryGetInt64(out var masterPort) || masterPort > int.MaxValue)
{
replica = default;
return false;
}

if (!items[2].TryGetInt64(out var replicationOffset))
{
replica = default;
return false;
}

replica = new Role.Master.Replica(masterIp, (int)masterPort, replicationOffset);
return true;
}

private static Role ParseReplica(in Sequence<RawResult> items, string role)
{
if (items.Length < 5)
{
return null;
}

var masterIp = items[1].GetString();

if (!items[2].TryGetInt64(out var masterPort) || masterPort > int.MaxValue)
{
return null;
}

ref var val = ref items[3];
string replicationState;
if (val.IsEqual(RedisLiterals.connect)) replicationState = RedisLiterals.connect;
else if (val.IsEqual(RedisLiterals.connecting)) replicationState = RedisLiterals.connecting;
else if (val.IsEqual(RedisLiterals.sync)) replicationState = RedisLiterals.sync;
else if (val.IsEqual(RedisLiterals.connected)) replicationState = RedisLiterals.connected;
else if (val.IsEqual(RedisLiterals.none)) replicationState = RedisLiterals.none;
else if (val.IsEqual(RedisLiterals.handshake)) replicationState = RedisLiterals.handshake;
else replicationState = val.GetString();

if (!items[4].TryGetInt64(out var replicationOffset))
{
return null;
}

return new Role.Replica(role, masterIp, (int)masterPort, replicationState, replicationOffset);
}

private static Role ParseSentinel(in Sequence<RawResult> items)
{
if (items.Length < 2)
{
return null;
}
var masters = items[1].GetItemsAsStrings();
return new Role.Sentinel(masters);
}
}

private sealed class LeaseProcessor : ResultProcessor<Lease<byte>>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
Expand Down
133 changes: 133 additions & 0 deletions src/StackExchange.Redis/Role.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace StackExchange.Redis
{
/// <summary>
/// Result of the ROLE command. Values depend on the role: master, replica, or sentinel.
/// </summary>
/// <remarks>https://redis.io/commands/role</remarks>
public abstract class Role
{
/// <summary>
/// One of "master", "slave" (aka replica), or "sentinel".
/// </summary>
public string Value { get; }

private Role(string role) => Value = role;

/// <summary>
/// Result of the ROLE command for a master node.
/// </summary>
/// <remarks>https://redis.io/commands/role#master-output</remarks>
public sealed class Master : Role
{
/// <summary>
/// The replication offset. To be consumed by replica nodes.
/// </summary>
public long ReplicationOffset { get; }

/// <summary>
/// Connected replica nodes.
/// </summary>
public ICollection<Replica> Replicas { get; }

/// <summary>
/// A connected replica node.
/// </summary>
public new readonly struct Replica
{
/// <summary>
/// The IP address of this replica node.
/// </summary>
public string Ip { get; }

/// <summary>
/// The port number of this replica node.
/// </summary>
public int Port { get; }

/// <summary>
/// The last replication offset acked by this replica node.
/// </summary>
public long ReplicationOffset { get; }

internal Replica(string ip, int port, long offset)
{
Ip = ip;
Port = port;
ReplicationOffset = offset;
}
}

internal Master(long offset, ICollection<Replica> replicas) : base(RedisLiterals.master)
{
ReplicationOffset = offset;
Replicas = replicas;
}
}

/// <summary>
/// Result of the ROLE command for a replica node.
/// </summary>
/// <remarks>https://redis.io/commands/role#output-of-the-command-on-replicas</remarks>
public sealed class Replica : Role
{
/// <summary>
/// The IP address of the master node for this replica.
/// </summary>
public string MasterIp { get; }

/// <summary>
/// The port number of the master node for this replica.
/// </summary>
public int MasterPort { get; }

/// <summary>
/// This replica's replication state.
/// </summary>
public string State { get; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be an enum, but returning the raw value seemed better for allowing the caller to do something useful with an unrecognized value.


/// <summary>
/// The last replication offset received by this replica.
/// </summary>
public long ReplicationOffset { get; }

internal Replica(string role, string ip, int port, string state, long offset) : base(role)
{
MasterIp = ip;
MasterPort = port;
State = state;
ReplicationOffset = offset;
}
}

/// <summary>
/// Result of the ROLE command for a sentinel node.
/// </summary>
/// <remarks>https://redis.io/commands/role#sentinel-output</remarks>
public sealed class Sentinel : Role
{
/// <summary>
/// Master names monitored by this sentinel node.
/// </summary>
public ICollection<string> MonitoredMasters { get; }

internal Sentinel(ICollection<string> masters) : base(RedisLiterals.sentinel)
{
MonitoredMasters = masters;
}
}

/// <summary>
/// An unexpected result of the ROLE command.
/// </summary>
public sealed class Unknown : Role
{
internal Unknown(string role) : base(role) { }
}
}
}
Loading