diff --git a/libs/cluster/CmdStrings.cs b/libs/cluster/CmdStrings.cs index f6ee4347f9..9b8d3a765d 100644 --- a/libs/cluster/CmdStrings.cs +++ b/libs/cluster/CmdStrings.cs @@ -52,6 +52,7 @@ static class CmdStrings public static ReadOnlySpan RESP_ERR_INVALID_SLOT => "ERR Invalid or out of range slot"u8; public static ReadOnlySpan RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER => "ERR value is not an integer or out of range."u8; public static ReadOnlySpan RESP_ERR_GENERIC_VALUE_IS_NOT_BOOLEAN => "ERR value is not a boolean."u8; + public static ReadOnlySpan RESP_ERR_RESET_WITH_KEYS_ASSIGNED => "-ERR CLUSTER RESET can't be called with master nodes containing keys\r\n"u8; public static ReadOnlySpan RESP_SYNTAX_ERROR => "ERR syntax error"u8; /// diff --git a/libs/cluster/Server/ClusterConfig.cs b/libs/cluster/Server/ClusterConfig.cs index 4ce43ee449..56c418113d 100644 --- a/libs/cluster/Server/ClusterConfig.cs +++ b/libs/cluster/Server/ClusterConfig.cs @@ -319,7 +319,7 @@ public List GetRemoteNodeIds() /// /// /// Integer representing offset of worker in worker list. - public int GetWorkerIdFromNodeId(string nodeId) + public ushort GetWorkerIdFromNodeId(string nodeId) { for (ushort i = 1; i <= NumWorkers; i++) { @@ -733,7 +733,11 @@ private string GetSlotRange(ushort workerId) return result; } - private List GetSlotList(ushort workerId) + /// + /// Retrieve a list of slots served by this node. + /// + /// List of slots. + public List GetSlotList(ushort workerId) { List result = []; for (var i = 0; i < MAX_HASH_SLOT_VALUE; i++) diff --git a/libs/cluster/Server/ClusterManagerWorkerState.cs b/libs/cluster/Server/ClusterManagerWorkerState.cs index eb6eea67cd..309d102c44 100644 --- a/libs/cluster/Server/ClusterManagerWorkerState.cs +++ b/libs/cluster/Server/ClusterManagerWorkerState.cs @@ -100,19 +100,23 @@ public ReadOnlySpan TryReset(bool soft, int expirySeconds = 60) { PauseConfigMerge(); var resp = CmdStrings.RESP_OK; - while (true) { var current = currentConfig; + var localSlots = current.GetSlotList(1); + if (clusterProvider.storeWrapper.HasKeysInSlots(localSlots)) + { + return CmdStrings.RESP_ERR_RESET_WITH_KEYS_ASSIGNED; + } + + this.clusterConnectionStore.CloseAll(); + var newNodeId = soft ? current.LocalNodeId : Generator.CreateHexId(); var address = current.LocalNodeIp; var port = current.LocalNodePort; var configEpoch = soft ? current.LocalNodeConfigEpoch : 0; var expiry = DateTimeOffset.UtcNow.Ticks + TimeSpan.FromSeconds(expirySeconds).Ticks; - foreach (var nodeId in current.GetRemoteNodeIds()) - _ = workerBanList.AddOrUpdate(nodeId, expiry, (key, oldValue) => expiry); - var newConfig = new ClusterConfig().InitializeLocalWorker( newNodeId, address, diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 95f2c6dcbe..9fe42f614d 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -267,7 +267,8 @@ public MetricsItem[] GetGossipStats(bool metricsDisabled) new("gossip_full_send", metricsDisabled ? "0" : gossipStats.gossip_full_send.ToString()), new("gossip_empty_send", metricsDisabled ? "0" : gossipStats.gossip_empty_send.ToString()), new("gossip_bytes_send", metricsDisabled ? "0" : gossipStats.gossip_bytes_send.ToString()), - new("gossip_bytes_recv", metricsDisabled ? "0" : gossipStats.gossip_bytes_recv.ToString()) + new("gossip_bytes_recv", metricsDisabled ? "0" : gossipStats.gossip_bytes_recv.ToString()), + new("gossip_open_connections", metricsDisabled ? "0" : this.clusterManager.clusterConnectionStore.Count.ToString()) ]; } diff --git a/libs/cluster/Server/GarnetClusterConnectionStore.cs b/libs/cluster/Server/GarnetClusterConnectionStore.cs index 4a99cf0bcd..a78583a3be 100644 --- a/libs/cluster/Server/GarnetClusterConnectionStore.cs +++ b/libs/cluster/Server/GarnetClusterConnectionStore.cs @@ -92,6 +92,29 @@ public bool AddConnection(GarnetServerNode conn) return true; } + public void CloseAll() + { + try + { + _lock.WriteLock(); + + if (_disposed) return; + + for (int i = 0; i < numConnection; i++) + connections[i].Dispose(); + numConnection = 0; + Array.Clear(connections); + } + catch (Exception ex) + { + logger?.LogError(ex, "GarnetConnectionStore.CloseAll"); + } + finally + { + _lock.WriteUnlock(); + } + } + /// /// Remove GarnetServerNode connection object from store. /// diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index ff88ff9449..ce7674a5fa 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.Net.Sockets; @@ -775,5 +776,46 @@ private async Task InitiateCheckpoint(bool full, CheckpointType checkpointType, logger?.LogInformation("Completed checkpoint"); } + + public bool HasKeysInSlots(List slots) + { + if (slots.Count > 0) + { + bool hasKeyInSlots = false; + { + using var iter = store.Iterate>(new SimpleSessionFunctions()); + while (!hasKeyInSlots && iter.GetNext(out RecordInfo record)) + { + ref var key = ref iter.GetKey(); + ushort hashSlotForKey = HashSlotUtils.HashSlot(ref key); + if (slots.Contains(hashSlotForKey)) + { + hasKeyInSlots = true; + } + } + } + + if (!hasKeyInSlots && objectStore != null) + { + var functionsState = CreateFunctionsState(); + var objstorefunctions = new ObjectSessionFunctions(functionsState); + var objectStoreSession = objectStore?.NewSession(objstorefunctions); + var iter = objectStoreSession.Iterate(); + while (!hasKeyInSlots && iter.GetNext(out RecordInfo record)) + { + ref var key = ref iter.GetKey(); + ushort hashSlotForKey = HashSlotUtils.HashSlot(key.AsSpan()); + if (slots.Contains(hashSlotForKey)) + { + hasKeyInSlots = true; + } + } + } + + return hasKeyInSlots; + } + + return false; + } } } \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterManagementTests.cs b/test/Garnet.test.cluster/ClusterManagementTests.cs index 017383bbd0..72a403f4ae 100644 --- a/test/Garnet.test.cluster/ClusterManagementTests.cs +++ b/test/Garnet.test.cluster/ClusterManagementTests.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Net; using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using NUnit.Framework; using NUnit.Framework.Legacy; @@ -145,20 +146,6 @@ public void ClusterResetTest() for (var i = 1; i < node_count; i++) _ = context.clusterTestUtils.ClusterForget(i, nodeIds[0], 10, context.logger); - try - { - // Add data to server - var resp = context.clusterTestUtils.GetServer(0).Execute("SET", "wxz", "1234"); - ClassicAssert.AreEqual("OK", (string)resp); - - resp = context.clusterTestUtils.GetServer(0).Execute("GET", "wxz"); - ClassicAssert.AreEqual("1234", (string)resp); - } - catch (Exception ex) - { - context.logger?.LogError(ex, "An error occured at ClusterResetTest"); - } - // Hard reset node state. clean db data and cluster config _ = context.clusterTestUtils.ClusterReset(0, soft: false, 10, context.logger); config = context.clusterTestUtils.ClusterNodes(0, context.logger); @@ -172,29 +159,170 @@ public void ClusterResetTest() //Add slotRange for clean node context.clusterTestUtils.AddSlotsRange(0, slotRanges, context.logger); + + // Add node back to the cluster + context.clusterTestUtils.SetConfigEpoch(0, 1, context.logger); + context.clusterTestUtils.Meet(0, 1, context.logger); + context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(0, context.logger); + for (int i = 0; i < node_count; i++) + { + Console.WriteLine(i); + context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(i, context.logger); + } + } + + [Test, Order(4)] + public void ClusterResetFailsForMasterWithKeysInSlotsTest() + { + var node_count = 4; + context.CreateInstances(node_count); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(node_count, 0, logger: context.logger); + + var expectedSlotRange = new SlotRange(0, 4095); + var config = context.clusterTestUtils.ClusterNodes(0, context.logger); + var node = config.Nodes.First(); + ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]); + byte[] key = new byte[16]; + context.clusterTestUtils.RandomBytesRestrictedToSlot(ref key, node.Slots.First().From); + + context.clusterTestUtils.GetServer(0).Execute("SET", key, "1234"); + string res = context.clusterTestUtils.GetServer(0).Execute("GET", key).ToString(); + ClassicAssert.AreEqual("1234", res); + + VerifyClusterResetFails(true); + VerifyClusterResetFails(false); + + // soft reset node state. clean db data and cluster config + var nodeIds = context.clusterTestUtils.GetNodeIds(logger: context.logger); + ClassicAssert.AreEqual(4, config.Nodes.Count); + ClassicAssert.AreEqual(nodeIds[0], node.NodeId); + ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]); + ClassicAssert.IsFalse(node.IsReplica); + } + + [Test, Order(4)] + public void ClusterResetFailsForMasterWithKeysInSlotsObjectStoreTest() + { + var node_count = 4; + context.CreateInstances(node_count); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(node_count, 0, logger: context.logger); + context.kvPairsObj = new Dictionary>(); + context.PopulatePrimaryWithObjects(ref context.kvPairsObj, 16, 10, 0); + + var expectedSlotRange = new SlotRange(0, 4095); + var config = context.clusterTestUtils.ClusterNodes(0, context.logger); + var node = config.Nodes.First(); + ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]); + byte[] key = new byte[16]; + context.clusterTestUtils.RandomBytesRestrictedToSlot(ref key, node.Slots.First().From); + + VerifyClusterResetFails(true); + VerifyClusterResetFails(false); + + var nodeIds = context.clusterTestUtils.GetNodeIds(logger: context.logger); + ClassicAssert.AreEqual(4, config.Nodes.Count); + ClassicAssert.AreEqual(nodeIds[0], node.NodeId); + ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]); + ClassicAssert.IsFalse(node.IsReplica); + } + + [Test, Order(4)] + public void ClusterResetAfterFLushAllTest() + { + var node_count = 4; + context.CreateInstances(node_count); + context.CreateConnection(); + var (_, _) = context.clusterTestUtils.SimpleSetupCluster(node_count, 0, logger: context.logger); + context.kvPairsObj = new Dictionary>(); + context.PopulatePrimaryWithObjects(ref context.kvPairsObj, 16, 10, 0); + + var expectedSlotRange = new SlotRange(0, 4095); + var config = context.clusterTestUtils.ClusterNodes(0, context.logger); + var node = config.Nodes.First(); + ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]); + byte[] key = new byte[16]; + context.clusterTestUtils.RandomBytesRestrictedToSlot(ref key, node.Slots.First().From); + + VerifyClusterResetFails(true); + VerifyClusterResetFails(false); + + var nodeIds = context.clusterTestUtils.GetNodeIds(logger: context.logger); + ClassicAssert.AreEqual(4, config.Nodes.Count); + ClassicAssert.AreEqual(nodeIds[0], node.NodeId); + ClassicAssert.AreEqual(expectedSlotRange, node.Slots[0]); + ClassicAssert.IsFalse(node.IsReplica); + + context.clusterTestUtils.FlushAll(0, context.logger); + _ = context.clusterTestUtils.ClusterReset(0, soft: false, 10, context.logger); + + config = context.clusterTestUtils.ClusterNodes(0, context.logger); + node = config.Nodes.First(); + // Assert node 0 does not know anything about the cluster + ClassicAssert.AreEqual(1, config.Nodes.Count); + ClassicAssert.AreNotEqual(nodeIds[0], node.NodeId); + ClassicAssert.AreEqual(0, node.Slots.Count); + ClassicAssert.IsFalse(node.IsReplica); + } + + private void VerifyClusterResetFails(bool softReset = true) + { + var server = context.clusterTestUtils.GetServer(0); + var args = new List() { + "reset", + softReset ? "soft" : "hard", + "60" + }; + try { - // Check DB was flushed due to hard reset - var resp = context.clusterTestUtils.GetServer(0).Execute("GET", "wxz"); - ClassicAssert.IsTrue(resp.IsNull, "DB not flushed after HARD reset"); + _ = (string)server.Execute("cluster", args); + } + catch (RedisServerException ex) + { + ClassicAssert.AreEqual("ERR CLUSTER RESET can't be called with master nodes containing keys", ex.Message); + } + } - // Add data to server - resp = context.clusterTestUtils.GetServer(0).Execute("SET", "wxz", "1234"); - ClassicAssert.AreEqual("OK", (string)resp); + [Test, Order(4)] + public async Task ClusterResetDisposesGossipConnections() + { + const string meetRecv = "meet_requests_recv"; + const string gossip = "gossip_success_count"; - resp = context.clusterTestUtils.GetServer(0).Execute("GET", "wxz"); - ClassicAssert.AreEqual("1234", (string)resp); + var node_count = 3; + context.CreateInstances(node_count, metricsSamplingFrequency: 1); + context.CreateConnection(); + var endpoints = context.clusterTestUtils.GetEndpoints(); + for (int i = 0; i < endpoints.Length - 1; i++) + { + context.clusterTestUtils.Meet(i, i + 1, context.logger); } - catch (Exception ex) + + for (int i = 0; i < node_count; i++) { - context.logger?.LogError(ex, "An error occured at ClusterResetTest"); + context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(i); } - // Add node back to the cluster - context.clusterTestUtils.SetConfigEpoch(0, 1, context.logger); - context.clusterTestUtils.Meet(0, 1, context.logger); + await Task.Delay(1000); - context.clusterTestUtils.WaitUntilNodeIsKnownByAllNodes(0, context.logger); + var server = context.clusterTestUtils.GetServer(0); + var gossipConnections = GetStat(server, "Stats", "gossip_open_connections"); + ClassicAssert.AreEqual(node_count - 1, int.Parse(gossipConnections), "Expected one gossip connection per node."); + + context.clusterTestUtils.ClusterReset(0, soft: true); + + await Task.Delay(1000); + + gossipConnections = GetStat(server, "Stats", "gossip_open_connections"); + ClassicAssert.AreEqual("0", gossipConnections, "All gossip connections should be closed after a reset."); + ClassicAssert.AreEqual(1, context.clusterTestUtils.ClusterNodes(0).Nodes.Count(), "Expected the node to only know about itself after a reset."); + } + + private string GetStat(IServer server, string section, string statName) + { + return server.Info(section).FirstOrDefault(x => x.Key == section)?.FirstOrDefault(x => x.Key == statName).Value; } [Test, Order(5)] diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index e00559c4a8..c16e596f53 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -108,7 +108,8 @@ public void CreateInstances( X509CertificateCollection certificates = null, ServerCredential clusterCreds = new ServerCredential(), AadAuthenticationSettings authenticationSettings = null, - bool disablePubSub = true) + bool disablePubSub = true, + int metricsSamplingFrequency = 0) { endpoints = TestUtils.GetEndPoints(shards, 7000); nodes = TestUtils.CreateGarnetCluster( @@ -138,7 +139,8 @@ public void CreateInstances( authUsername: clusterCreds.user, authPassword: clusterCreds.password, certificates: certificates, - authenticationSettings: authenticationSettings); + authenticationSettings: authenticationSettings, + metricsSamplingFrequency: metricsSamplingFrequency); foreach (var node in nodes) node.Start(); diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index 73480a7ceb..30c22b93b3 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -1994,6 +1994,23 @@ public int ClusterKeySlot(IPEndPoint endPoint, string key, ILogger logger = null } } + public void FlushAll(int nodeIndex, ILogger logger = null) + => FlushAll((IPEndPoint)endpoints[nodeIndex], logger); + + public void FlushAll(IPEndPoint endPoint, ILogger logger = null) + { + try + { + var server = redis.GetServer(endPoint); + server.FlushAllDatabases(); + } + catch (Exception ex) + { + logger?.LogError(ex, "An error has occured; FlushAllDatabases"); + Assert.Fail(); + } + } + public ClusterConfiguration ClusterNodes(int nodeIndex, ILogger logger = null) => ClusterNodes((IPEndPoint)endpoints[nodeIndex], logger); diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index 247d23f49a..09a5d8b424 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -333,7 +333,8 @@ public static GarnetServer[] CreateGarnetCluster( string aclFile = null, X509CertificateCollection certificates = null, ILoggerFactory loggerFactory = null, - AadAuthenticationSettings authenticationSettings = null) + AadAuthenticationSettings authenticationSettings = null, + int metricsSamplingFrequency = 0) { if (UseAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -372,7 +373,8 @@ public static GarnetServer[] CreateGarnetCluster( aclFile: aclFile, certificates: certificates, logger: loggerFactory?.CreateLogger("GarnetServer"), - aadAuthenticationSettings: authenticationSettings); + aadAuthenticationSettings: authenticationSettings, + metricsSamplingFrequency: metricsSamplingFrequency); ClassicAssert.IsNotNull(opts); int iter = 0; @@ -417,6 +419,7 @@ public static GarnetServerOptions GetGarnetServerOptions( string aclFile = null, X509CertificateCollection certificates = null, AadAuthenticationSettings aadAuthenticationSettings = null, + int metricsSamplingFrequency = 0, ILogger logger = null) { if (UseAzureStorage) @@ -468,6 +471,7 @@ public static GarnetServerOptions GetGarnetServerOptions( MemorySize = "1g", GossipDelay = gossipDelay, EnableFastCommit = FastCommit, + MetricsSamplingFrequency = metricsSamplingFrequency, TlsOptions = UseTLS ? new GarnetTlsOptions( certFileName: certFile, certPassword: certPassword,