Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Reset behavior to be more consistent with Redis #685

Merged
merged 10 commits into from
Sep 26, 2024
1 change: 1 addition & 0 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_INVALID_SLOT => "ERR Invalid or out of range slot"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_VALUE_IS_NOT_BOOLEAN => "ERR value is not a boolean."u8;
public static ReadOnlySpan<byte> RESP_ERR_RESET_WITH_KEYS_ASSIGNED => "-ERR CLUSTER RESET can't be called with master nodes containing keys\r\n"u8;
public static ReadOnlySpan<byte> RESP_SYNTAX_ERROR => "ERR syntax error"u8;

/// <summary>
Expand Down
22 changes: 21 additions & 1 deletion libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,26 @@ public List<int> GetLocalPrimarySlots()
return slots;
}

/// <summary>
/// Retrieve a list of slots served by this node.
/// </summary>
/// <returns>List of slots.</returns>
public List<int> GetLocalSlots()
Mathos1432 marked this conversation as resolved.
Show resolved Hide resolved
{
var primaryId = this.LocalNodeId;
List<int> slots = [];

if (primaryId != null)
{
for (var i = 0; i < MAX_HASH_SLOT_VALUE; i++)
{
if (slotMap[i].workerId > 0 && workers[slotMap[i].workerId].Nodeid.Equals(primaryId, StringComparison.OrdinalIgnoreCase))
Mathos1432 marked this conversation as resolved.
Show resolved Hide resolved
slots.Add(i);
}
}
return slots;
}

/// <summary>
/// Find maximum config epoch from local config
/// </summary>
Expand Down Expand Up @@ -319,7 +339,7 @@ public List<string> GetRemoteNodeIds()
/// </summary>
/// <param name="nodeId"></param>
/// <returns>Integer representing offset of worker in worker list.</returns>
public int GetWorkerIdFromNodeId(string nodeId)
public ushort GetWorkerIdFromNodeId(string nodeId)
{
for (ushort i = 1; i <= NumWorkers; i++)
{
Expand Down
12 changes: 8 additions & 4 deletions libs/cluster/Server/ClusterManagerWorkerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,23 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
{
PauseConfigMerge();
var resp = CmdStrings.RESP_OK;

while (true)
{
var current = currentConfig;
var localSlots = current.GetLocalSlots();
if (clusterProvider.storeWrapper.HasKeysInSlots(localSlots))
{
return CmdStrings.RESP_ERR_RESET_WITH_KEYS_ASSIGNED;
}

this.clusterConnectionStore.CloseAll();

var newNodeId = soft ? current.LocalNodeId : Generator.CreateHexId();
var address = current.LocalNodeIp;
var port = current.LocalNodePort;

var configEpoch = soft ? current.LocalNodeConfigEpoch : 0;
var expiry = DateTimeOffset.UtcNow.Ticks + TimeSpan.FromSeconds(expirySeconds).Ticks;
foreach (var nodeId in current.GetRemoteNodeIds())
_ = workerBanList.AddOrUpdate(nodeId, expiry, (key, oldValue) => expiry);

var newConfig = new ClusterConfig().InitializeLocalWorker(
newNodeId,
address,
Expand Down
23 changes: 23 additions & 0 deletions libs/cluster/Server/GarnetClusterConnectionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,29 @@ public bool AddConnection(GarnetServerNode conn)
return true;
}

public void CloseAll()
{
try
{
_lock.WriteLock();

if (_disposed) return;

for (int i = 0; i < numConnection; i++)
connections[i].Dispose();
numConnection = 0;
Array.Clear(connections);
}
catch (Exception ex)
{
logger?.LogError(ex, "GarnetConnectionStore.CloseAll");
}
finally
{
_lock.WriteUnlock();
}
}

/// <summary>
/// Remove GarnetServerNode connection object from store.
/// </summary>
Expand Down
42 changes: 42 additions & 0 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
Expand Down Expand Up @@ -775,5 +776,46 @@ private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType,

logger?.LogInformation("Completed checkpoint");
}

public bool HasKeysInSlots(List<int> slots)
{
if (slots.Count > 0)
{
bool hasKeyInSlots = false;
{
using var iter = store.Iterate<SpanByte, SpanByte, Empty, SimpleSessionFunctions<SpanByte, SpanByte, Empty>>(new SimpleSessionFunctions<SpanByte, SpanByte, Empty>());
while (!hasKeyInSlots && iter.GetNext(out RecordInfo record))
{
ref var key = ref iter.GetKey();
ushort hashSlotForKey = HashSlotUtils.HashSlot(ref key);
if (slots.Contains(hashSlotForKey))
{
hasKeyInSlots = true;
}
}
}

if (!hasKeyInSlots && objectStore != null)
{
var functionsState = CreateFunctionsState();
var objstorefunctions = new ObjectSessionFunctions(functionsState);
var objectStoreSession = objectStore?.NewSession<ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions>(objstorefunctions);
var iter = objectStoreSession.Iterate();
while (!hasKeyInSlots && iter.GetNext(out RecordInfo record))
{
ref var key = ref iter.GetKey();
ushort hashSlotForKey = HashSlotUtils.HashSlot(key.AsSpan());
if (slots.Contains(hashSlotForKey))
{
hasKeyInSlots = true;
}
}
}

return hasKeyInSlots;
}

return false;
}
}
}
185 changes: 157 additions & 28 deletions test/Garnet.test.cluster/ClusterManagementTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
using NUnit.Framework.Legacy;
Expand Down Expand Up @@ -145,20 +146,6 @@ public void ClusterResetTest()
for (var i = 1; i < node_count; i++)
_ = context.clusterTestUtils.ClusterForget(i, nodeIds[0], 10, context.logger);

try
{
// Add data to server
var resp = context.clusterTestUtils.GetServer(0).Execute("SET", "wxz", "1234");
ClassicAssert.AreEqual("OK", (string)resp);

resp = context.clusterTestUtils.GetServer(0).Execute("GET", "wxz");
ClassicAssert.AreEqual("1234", (string)resp);
}
catch (Exception ex)
{
context.logger?.LogError(ex, "An error occured at ClusterResetTest");
}

// Hard reset node state. clean db data and cluster config
_ = context.clusterTestUtils.ClusterReset(0, soft: false, 10, context.logger);
config = context.clusterTestUtils.ClusterNodes(0, context.logger);
Expand All @@ -172,29 +159,171 @@ public void ClusterResetTest()

//Add slotRange for clean node
context.clusterTestUtils.AddSlotsRange(0, slotRanges, context.logger);

// Add node back to the cluster
context.clusterTestUtils.SetConfigEpoch(0, 1, context.logger);
context.clusterTestUtils.Meet(0, 1, context.logger);
context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(0, context.logger);
for (int i = 0; i < node_count; i++)
{
Console.WriteLine(i);
context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(i, context.logger);
}
}

[Test, Order(4)]
public void ClusterResetFailsForMasterWithKeysInSlotsTest()
{
var node_count = 4;
context.CreateInstances(node_count);
context.CreateConnection();
var (_, _) = context.clusterTestUtils.SimpleSetupCluster(node_count, 0, logger: context.logger);

var expectedSlotRange = new SlotRange(0, 4095);
var config = context.clusterTestUtils.ClusterNodes(0, context.logger);
var node = config.Nodes.First();
ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]);
byte[] key = new byte[16];
context.clusterTestUtils.RandomBytesRestrictedToSlot(ref key, node.Slots.First().From);

context.clusterTestUtils.GetServer(0).Execute("SET", key, "1234");
string res = context.clusterTestUtils.GetServer(0).Execute("GET", key).ToString();
ClassicAssert.AreEqual("1234", res);

VerifyClusterResetFails(true);
VerifyClusterResetFails(false);

// soft reset node state. clean db data and cluster config
var nodeIds = context.clusterTestUtils.GetNodeIds(logger: context.logger);
ClassicAssert.AreEqual(4, config.Nodes.Count);
ClassicAssert.AreEqual(nodeIds[0], node.NodeId);
ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]);
ClassicAssert.IsFalse(node.IsReplica);
}

[Test, Order(4)]
public void ClusterResetFailsForMasterWithKeysInSlotsObjectStoreTest()
{
var node_count = 4;
context.CreateInstances(node_count);
context.CreateConnection();
var (_, _) = context.clusterTestUtils.SimpleSetupCluster(node_count, 0, logger: context.logger);
context.kvPairsObj = new Dictionary<string, List<int>>();
context.PopulatePrimaryWithObjects(ref context.kvPairsObj, 16, 10, 0);

var expectedSlotRange = new SlotRange(0, 4095);
var config = context.clusterTestUtils.ClusterNodes(0, context.logger);
var node = config.Nodes.First();
ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]);
byte[] key = new byte[16];
context.clusterTestUtils.RandomBytesRestrictedToSlot(ref key, node.Slots.First().From);

VerifyClusterResetFails(true);
VerifyClusterResetFails(false);

var nodeIds = context.clusterTestUtils.GetNodeIds(logger: context.logger);
ClassicAssert.AreEqual(4, config.Nodes.Count);
ClassicAssert.AreEqual(nodeIds[0], node.NodeId);
ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]);
ClassicAssert.IsFalse(node.IsReplica);
}

[Test, Order(4)]
public void ClusterResetAfterFLushAllTest()
{
var node_count = 4;
context.CreateInstances(node_count);
context.CreateConnection();
var (_, _) = context.clusterTestUtils.SimpleSetupCluster(node_count, 0, logger: context.logger);
context.kvPairsObj = new Dictionary<string, List<int>>();
context.PopulatePrimaryWithObjects(ref context.kvPairsObj, 16, 10, 0);

var expectedSlotRange = new SlotRange(0, 4095);
var config = context.clusterTestUtils.ClusterNodes(0, context.logger);
var node = config.Nodes.First();
ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]);
byte[] key = new byte[16];
context.clusterTestUtils.RandomBytesRestrictedToSlot(ref key, node.Slots.First().From);

VerifyClusterResetFails(true);
VerifyClusterResetFails(false);

var nodeIds = context.clusterTestUtils.GetNodeIds(logger: context.logger);
ClassicAssert.AreEqual(4, config.Nodes.Count);
ClassicAssert.AreEqual(nodeIds[0], node.NodeId);
ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]);
ClassicAssert.IsFalse(node.IsReplica);

context.clusterTestUtils.FlushAll(0, context.logger);
_ = context.clusterTestUtils.ClusterReset(0, soft: false, 10, context.logger);

config = context.clusterTestUtils.ClusterNodes(0, context.logger);
node = config.Nodes.First();
// Assert node 0 does not know anything about the cluster
ClassicAssert.AreEqual(1, config.Nodes.Count);
ClassicAssert.AreNotEqual(nodeIds[0], node.NodeId);
ClassicAssert.AreEqual(0, node.Slots.Count);
ClassicAssert.IsFalse(node.IsReplica);
}

private void VerifyClusterResetFails(bool softReset = true)
{
var server = context.clusterTestUtils.GetServer(0);
var args = new List<object>() {
"reset",
softReset ? "soft" : "hard",
"60"
};

try
{
// Check DB was flushed due to hard reset
var resp = context.clusterTestUtils.GetServer(0).Execute("GET", "wxz");
ClassicAssert.IsTrue(resp.IsNull, "DB not flushed after HARD reset");
_ = (string)server.Execute("cluster", args);
}
catch (RedisServerException ex)
{
ClassicAssert.AreEqual("ERR CLUSTER RESET can't be called with master nodes containing keys", ex.Message);
}
}

// Add data to server
resp = context.clusterTestUtils.GetServer(0).Execute("SET", "wxz", "1234");
ClassicAssert.AreEqual("OK", (string)resp);
[Test, Order(4)]
public async Task ClusterResetDisposesGossipConnections()
{
const string meetRecv = "meet_requests_recv";
const string gossip = "gossip_success_count";

resp = context.clusterTestUtils.GetServer(0).Execute("GET", "wxz");
ClassicAssert.AreEqual("1234", (string)resp);
var node_count = 3;
context.CreateInstances(node_count, metricsSamplingFrequency: 1);
context.CreateConnection();
var endpoints = context.clusterTestUtils.GetEndpoints();
for (int i = 0; i < endpoints.Length - 1; i++)
{
context.clusterTestUtils.Meet(i, i + 1, context.logger);
}
catch (Exception ex)

for (int i = 0; i < node_count; i++)
{
context.logger?.LogError(ex, "An error occured at ClusterResetTest");
context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(i);
}

// Add node back to the cluster
context.clusterTestUtils.SetConfigEpoch(0, 1, context.logger);
context.clusterTestUtils.Meet(0, 1, context.logger);
context.clusterTestUtils.ClusterReset(0, soft: true);
await Task.Delay(1000);

context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(0, context.logger);
// Let's compare and make sure there are no gossip messages coming in to the node that got reset.
var server = context.clusterTestUtils.GetServer(0);
var gossipsAfterReset = GetStat(server, "Stats", gossip);
Mathos1432 marked this conversation as resolved.
Show resolved Hide resolved

await Task.Delay(3000);
var lateMeetAfterReset = GetStat(server, "Stats", meetRecv);
var lateGossipsAfterReset = GetStat(server, "Stats", gossip);

ClassicAssert.AreEqual(gossipsAfterReset, lateGossipsAfterReset, "Expected no new gossip messages after a reset.");

ClassicAssert.AreEqual(1, context.clusterTestUtils.ClusterNodes(0).Nodes.Count(), "Expected the node to only know about itself after a reset.");
}

private string GetStat(IServer server, string section, string statName)
{
return server.Info(section).FirstOrDefault(x => x.Key == section)?.FirstOrDefault(x => x.Key == statName).Value;
}

[Test, Order(5)]
Expand Down
6 changes: 4 additions & 2 deletions test/Garnet.test.cluster/ClusterTestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void CreateInstances(
X509CertificateCollection certificates = null,
ServerCredential clusterCreds = new ServerCredential(),
AadAuthenticationSettings authenticationSettings = null,
bool disablePubSub = true)
bool disablePubSub = true,
int metricsSamplingFrequency = 0)
{
endpoints = TestUtils.GetEndPoints(shards, 7000);
nodes = TestUtils.CreateGarnetCluster(
Expand Down Expand Up @@ -138,7 +139,8 @@ public void CreateInstances(
authUsername: clusterCreds.user,
authPassword: clusterCreds.password,
certificates: certificates,
authenticationSettings: authenticationSettings);
authenticationSettings: authenticationSettings,
metricsSamplingFrequency: metricsSamplingFrequency);

foreach (var node in nodes)
node.Start();
Expand Down
Loading