diff --git a/horaemeta/server/cluster/manager_test.go b/horaemeta/server/cluster/manager_test.go index 779e1cac62..f9cf46494c 100644 --- a/horaemeta/server/cluster/manager_test.go +++ b/horaemeta/server/cluster/manager_test.go @@ -45,6 +45,9 @@ const ( defaultTopologyType = storage.TopologyTypeStatic node1 = "127.0.0.1:8081" node2 = "127.0.0.2:8081" + node3 = "127.0.0.3:8081" + nodeTypeHoraeDB = storage.NodeTypeHoraeDB + nodeTypeCompactionServer = storage.NodeTypeCompactionServer defaultSchemaID = 0 testRootPath = "/rootPath" defaultIDAllocatorStep = 20 @@ -76,8 +79,11 @@ func TestClusterManager(t *testing.T) { testCreateCluster(ctx, re, manager, cluster1) - testRegisterNode(ctx, re, manager, cluster1, node1) - testRegisterNode(ctx, re, manager, cluster1, node2) + testRegisterNode(ctx, re, manager, cluster1, node1, nodeTypeHoraeDB) + testRegisterNode(ctx, re, manager, cluster1, node2, nodeTypeHoraeDB) + testRegisterNode(ctx, re, manager, cluster1, node3, nodeTypeCompactionServer) + + testFetchCompactionNode(ctx, re, manager, cluster1) testInitShardView(ctx, re, manager, cluster1) @@ -151,20 +157,36 @@ func testCreateCluster(ctx context.Context, re *require.Assertions, manager clus } func testRegisterNode(ctx context.Context, re *require.Assertions, manager cluster.Manager, - clusterName, nodeName string, + clusterName, nodeName string, nodeType storage.NodeType, ) { + var nodeStats storage.NodeStats + + if nodeType == nodeTypeHoraeDB { + nodeStats = storage.NewEmptyNodeStats() + } else { + nodeStats = storage.NewCompactionNodeStats() + } + node := metadata.RegisteredNode{ Node: storage.Node{ Name: nodeName, LastTouchTime: uint64(time.Now().UnixMilli()), State: storage.NodeStateOnline, - NodeStats: storage.NewEmptyNodeStats(), + NodeStats: nodeStats, }, ShardInfos: []metadata.ShardInfo{}, } err := manager.RegisterNode(ctx, clusterName, node) re.NoError(err) } +func testFetchCompactionNode(ctx context.Context, re *require.Assertions, manager cluster.Manager, + clusterName string, +) { + node, err := manager.FetchCompactionNode(ctx, clusterName) + re.NoError(err) + re.Equal(node, node3) +} + func testAllocSchemaID(ctx context.Context, re *require.Assertions, manager cluster.Manager, cluster, schema string, schemaID uint32, ) { diff --git a/horaemeta/server/cluster/metadata/cluster_metadata_test.go b/horaemeta/server/cluster/metadata/cluster_metadata_test.go index e29761491c..3655b9238b 100644 --- a/horaemeta/server/cluster/metadata/cluster_metadata_test.go +++ b/horaemeta/server/cluster/metadata/cluster_metadata_test.go @@ -41,6 +41,7 @@ func TestClusterMetadata(t *testing.T) { testRegisterNode(ctx, re, metadata) testTableOperation(ctx, re, metadata) testShardOperation(ctx, re, metadata) + testCompactionNodeOperation(ctx, re, metadata) testMetadataOperation(ctx, re, metadata) } @@ -221,6 +222,32 @@ func testShardOperation(ctx context.Context, re *require.Assertions, m *metadata re.NoError(err) } +func testCompactionNodeOperation(ctx context.Context, re *require.Assertions, m *metadata.ClusterMetadata) { + // Fetch a compaction node, it will throw error because cluster does not register compaction node yet. + nodeName, err := m.FetchCompactionNode(ctx) + re.Error(err) + re.Equal("", nodeName) + + // Register a compaction node. + newNodeName := "testCompactionNode" + lastTouchTime := uint64(time.Now().UnixMilli()) + err = m.RegisterNode(ctx, metadata.RegisteredNode{ + Node: storage.Node{ + Name: newNodeName, + NodeStats: storage.NewCompactionNodeStats(), + LastTouchTime: lastTouchTime, + State: 0, + }, + ShardInfos: nil, + }) + re.NoError(err) + + // Fetch a compaction node, it will return the new registered node with no error. + nodeName, err = m.FetchCompactionNode(ctx) + re.NoError(err) + re.Equal(newNodeName, nodeName) +} + func testMetadataOperation(ctx context.Context, re *require.Assertions, m *metadata.ClusterMetadata) { // Init cluster metadata, it will throw error because it has been init. err := m.Init(ctx) diff --git a/horaemeta/server/cluster/metadata/compaction_node_manager_test.go b/horaemeta/server/cluster/metadata/compaction_node_manager_test.go new file mode 100644 index 0000000000..0446174dc0 --- /dev/null +++ b/horaemeta/server/cluster/metadata/compaction_node_manager_test.go @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package metadata_test + +import ( + "context" + "testing" + "time" + + "github.com/apache/incubator-horaedb-meta/server/cluster/metadata" + "github.com/apache/incubator-horaedb-meta/server/storage" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +const ( + TestCompactionNodeName = "TestCompactionNodeName" + TestCompactionNodeName1 = "TestCompactionNodeName1" +) + +func TestCompactionManager(t *testing.T) { + ctx := context.Background() + re := require.New(t) + + compactionNodeManager := metadata.NewCompactionNodeManagerImpl(zap.NewNop()) + + testRegisterCompactionNode(ctx, re, compactionNodeManager) + testFetchCompactionNode(ctx, re, compactionNodeManager) +} + +func testRegisterCompactionNode(_ context.Context, re *require.Assertions, manager metadata.CompactionNodeManager) { + // Register a compaction node. + lastTouchTime := uint64(time.Now().UnixMilli()) + manager.RegisterCompactionNode(metadata.RegisteredNode{ + Node: storage.Node{ + Name: TestCompactionNodeName, + NodeStats: storage.NewCompactionNodeStats(), + LastTouchTime: lastTouchTime, + State: 0, + }, + ShardInfos: nil, + }) + + keyList := manager.GetCompactionNodesKeyList() + re.Equal(1, len(keyList)) + re.Equal(TestCompactionNodeName, keyList[0]) + re.Equal(1, len(manager.GetRegisteredCompactionNodes())) + + node, exists := manager.GetRegisteredCompactionNodeByName(TestCompactionNodeName) + re.Equal(exists, true) + re.Equal(lastTouchTime, node.Node.LastTouchTime) + + // Update lastTouchTime + lastTouchTime = uint64(time.Now().UnixMilli()) + node.Node.LastTouchTime = lastTouchTime + manager.RegisterCompactionNode(node) + + keyList = manager.GetCompactionNodesKeyList() + re.Equal(1, len(keyList)) + re.Equal(TestCompactionNodeName, keyList[0]) + re.Equal(1, len(manager.GetRegisteredCompactionNodes())) + + node, exists = manager.GetRegisteredCompactionNodeByName(TestCompactionNodeName) + re.Equal(exists, true) + re.Equal(lastTouchTime, node.Node.LastTouchTime) + + // Register another compaction node. + lastTouchTime = uint64(time.Now().UnixMilli()) + manager.RegisterCompactionNode(metadata.RegisteredNode{ + Node: storage.Node{ + Name: TestCompactionNodeName1, + NodeStats: storage.NewCompactionNodeStats(), + LastTouchTime: lastTouchTime, + State: 0, + }, + ShardInfos: nil, + }) + + keyList = manager.GetCompactionNodesKeyList() + re.Equal(2, len(keyList)) + re.Equal(TestCompactionNodeName1, keyList[1]) + re.Equal(2, len(manager.GetRegisteredCompactionNodes())) +} + +func testFetchCompactionNode(_ context.Context, re *require.Assertions, manager metadata.CompactionNodeManager) { + nodeName, err := manager.FetchCompactionNode() + re.NoError(err) + re.Equal(nodeName, TestCompactionNodeName) + + nodeName, err = manager.FetchCompactionNode() + re.NoError(err) + re.Equal(nodeName, TestCompactionNodeName1) +}