diff --git a/pkg/clusterservice/cluster.go b/pkg/clusterservice/cluster.go index b51367c033203..a70a6973704c5 100644 --- a/pkg/clusterservice/cluster.go +++ b/pkg/clusterservice/cluster.go @@ -16,6 +16,7 @@ package clusterservice import ( "context" + "sort" "sync" "sync/atomic" "time" @@ -343,13 +344,22 @@ func (c *cluster) refresh() { c.logger.Debug("cn service added", zap.String("cn", v.DebugString())) } } + // sort as the tick, with the bigger one at the front. + sort.Slice(details.TNStores, func(i, j int) bool { + return details.TNStores[i].Tick > details.TNStores[j].Tick + }) for _, tn := range details.TNStores { - if tn.State == logpb.NormalState { - v := newTNService(tn) - new.addTN([]metadata.TNService{v}) + v := newTNService(tn) + new.addTN([]metadata.TNService{v}) + if c.logger.Enabled(zap.DebugLevel) { c.logger.Debug("dn service added", zap.String("dn", v.DebugString())) } } + // if there are multiple tn services, only take the first one, which has + // the biggest tick. + if len(new.tn) > 1 { + new.tn = new.tn[:1] + } c.services.Store(new) c.readyOnce.Do(func() { close(c.readyC) diff --git a/pkg/clusterservice/cluster_test.go b/pkg/clusterservice/cluster_test.go index c8e140953aee5..fc9f5439ff18e 100644 --- a/pkg/clusterservice/cluster_test.go +++ b/pkg/clusterservice/cluster_test.go @@ -78,7 +78,7 @@ func TestClusterRefresh(t *testing.T) { c.GetTNService(NewServiceIDSelector("dn0"), apply) assert.Equal(t, 0, cnt) - hc.addTN(logpb.NormalState, "dn0") + hc.addTN(0, "dn0") time.Sleep(time.Millisecond * 100) c.GetTNService(NewServiceIDSelector("dn0"), apply) assert.Equal(t, 1, cnt) @@ -96,7 +96,7 @@ func BenchmarkGetService(b *testing.B) { } c.GetTNService(NewServiceIDSelector("dn0"), apply) - hc.addTN(logpb.NormalState, "dn0") + hc.addTN(0, "dn0") c.ForceRefresh(true) b.ResetTimer() @@ -150,18 +150,20 @@ func TestCluster_GetTNService(t *testing.T) { runClusterTest( time.Hour, func(hc *testHAKeeperClient, c *cluster) { - hc.addTN(logpb.NormalState, "dn0") - hc.addTN(logpb.TimeoutState, "dn1") + hc.addTN(100, "dn0") + hc.addTN(200, "dn1") + hc.addTN(50, "dn2") c.ForceRefresh(true) - var count int + var tns []metadata.TNService c.GetTNService( NewSelector(), func(service metadata.TNService) bool { - count++ + tns = append(tns, service) return true }, ) - require.Equal(t, 1, count) + require.Equal(t, 1, len(tns)) + require.Equal(t, "dn1", tns[0].ServiceID) }, ) } @@ -199,13 +201,13 @@ func (c *testHAKeeperClient) addCN(serviceIDs ...string) { } } -func (c *testHAKeeperClient) addTN(state logpb.NodeState, serviceIDs ...string) { +func (c *testHAKeeperClient) addTN(tick uint64, serviceIDs ...string) { c.Lock() defer c.Unlock() for _, id := range serviceIDs { c.value.TNStores = append(c.value.TNStores, logpb.TNStore{ - UUID: id, - State: state, + UUID: id, + Tick: tick, }) } }