Skip to content

Commit

Permalink
test:add unit test for service visible feature (#1309)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Dec 21, 2023
1 parent dfa9741 commit 756c945
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 26 deletions.
5 changes: 4 additions & 1 deletion cache/api/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ func ComputeRevisionBySlice(h hash.Hash, slice []string) (string, error) {

// CompositeComputeRevision 将多个 revision 合并计算为一个
func CompositeComputeRevision(revisions []string) (string, error) {
h := sha1.New()
if len(revisions) == 1 {
return revisions[0], nil
}

h := sha1.New()
sort.Strings(revisions)

for i := range revisions {
Expand Down
2 changes: 1 addition & 1 deletion cache/service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func fillInternalLabels(item *model.Instance) *model.Instance {
func (ic *instanceCache) postProcessUpdatedServices(affect map[string]bool) {
progress := 0
for serviceID := range affect {
ic.svcCache.GetRevisionWorker().Notify(serviceID, true)
ic.svcCache.notifyRevisionWorker(serviceID, true)
progress++
if progress%10000 == 0 {
log.Infof("[Cache][Instance] revision notify progress(%d / %d)", progress, len(affect))
Expand Down
33 changes: 27 additions & 6 deletions cache/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type serviceCache struct {
exportNamespace *utils.SyncMap[string, *utils.SyncSet[string]]
// exportServices 某个服务对部分命名空间全部可见 exportNamespace -> svcName -> model.Service
exportServices *utils.SyncMap[string, *utils.SyncMap[string, *model.Service]]

subCtx *eventhub.SubscribtionContext
}

// NewServiceCache 返回一个serviceCache
Expand All @@ -98,14 +100,18 @@ func (sc *serviceCache) Initialize(opt map[string]interface{}) error {
sc.cl5Names = utils.NewSyncMap[string, *model.Service]()
sc.pendingServices = utils.NewSyncMap[string, struct{}]()
sc.namespaceServiceCnt = utils.NewSyncMap[string, *model.NamespaceServiceCount]()
sc.revisionWorker = newRevisionWorker(sc, sc.instCache.(*instanceCache), opt)
sc.exportNamespace = utils.NewSyncMap[string, *utils.SyncSet[string]]()
sc.exportServices = utils.NewSyncMap[string, *utils.SyncMap[string, *model.Service]]()

ctx, cancel := context.WithCancel(context.Background())
sc.cancel = cancel
sc.revisionWorker = newRevisionWorker(sc, sc.instCache.(*instanceCache), opt)
// 先启动revision计算协程
go sc.revisionWorker.revisionWorker(ctx)
subCtx, err := eventhub.SubscribeWithFunc(eventhub.CacheNamespaceEventTopic, sc.handleNamespaceChange)
if err != nil {
return err
}
sc.subCtx = subCtx
if opt == nil {
return nil
}
Expand All @@ -119,6 +125,9 @@ func (sc *serviceCache) Close() error {
if err := sc.BaseCache.Close(); err != nil {
return err
}
if sc.subCtx != nil {
sc.subCtx.Cancel()
}
if sc.cancel != nil {
sc.cancel()
}
Expand Down Expand Up @@ -413,27 +422,30 @@ func (sc *serviceCache) setServices(services map[string]*model.Service) (map[str
if service.IsAlias() {
aliases = append(aliases, service)
}
oldVal, exist := sc.ids.Load(service.ID)
if oldVal != nil {
service.OldExportTo = oldVal.ExportTo
}

spaceName := service.Namespace
changeNs[spaceName] = struct{}{}
// 发现有删除操作
if !service.Valid {
sc.removeServices(service)
sc.revisionWorker.Notify(service.ID, false)
sc.notifyRevisionWorker(service.ID, false)
del++
svcCount--
continue
}

update++
_, exist := sc.ids.Load(service.ID)
if !exist {
svcCount++
}

sc.ids.Store(service.ID, service)
sc.serviceList.addService(service)
sc.revisionWorker.Notify(service.ID, true)
sc.notifyRevisionWorker(service.ID, true)

spaces, ok := sc.names.Load(spaceName)
if !ok {
Expand All @@ -455,6 +467,7 @@ func (sc *serviceCache) setServices(services map[string]*model.Service) (map[str

sc.postProcessServiceAlias(aliases)
sc.postProcessUpdatedServices(changeNs)
sc.postProcessServiceExports(services)
sc.serviceList.reloadRevision()
return map[string]time.Time{
sc.Name(): time.Unix(lastMtime, 0),
Expand Down Expand Up @@ -633,7 +646,7 @@ func (sc *serviceCache) GetVisibleServicesInOtherNamespace(svcName, namespace st
return visibleServices
}

func (sc *serviceCache) postProcessServiceExports(services []*model.Service) {
func (sc *serviceCache) postProcessServiceExports(services map[string]*model.Service) {

for i := range services {
svc := services[i]
Expand Down Expand Up @@ -680,6 +693,14 @@ func (sc *serviceCache) handleNamespaceChange(ctx context.Context, args interfac
return nil
}

func (sc *serviceCache) notifyRevisionWorker(serviceID string, valid bool) {
revisionWorker := sc.revisionWorker
if revisionWorker == nil {
return
}
revisionWorker.Notify(serviceID, valid)
}

// GetRevisionWorker
func (sc *serviceCache) GetRevisionWorker() types.ServiceRevisionWorker {
return sc.revisionWorker
Expand Down
120 changes: 120 additions & 0 deletions cache/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package service

import (
"context"
"fmt"
"math/rand"
"reflect"
Expand All @@ -30,6 +31,7 @@ import (

types "github.com/polarismesh/polaris/cache/api"
cachemock "github.com/polarismesh/polaris/cache/mock"
"github.com/polarismesh/polaris/common/eventhub"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/store"
Expand Down Expand Up @@ -593,3 +595,121 @@ func TestComputeRevision(t *testing.T) {
assert.NotEqual(t, lhs, rhs)
})
}

func Test_serviceCache_GetVisibleServicesInOtherNamespace(t *testing.T) {
ctl := gomock.NewController(t)
storage := mock.NewMockStore(ctl)
mockCacheMgr := cachemock.NewMockCacheManager(ctl)
defer ctl.Finish()

t.Run("服务可见性查询判断", func(t *testing.T) {
serviceList := map[string]*model.Service{
"service-1": {
ID: "service-1",
Name: "service-1",
Namespace: "ns-1",
ExportTo: map[string]struct{}{
"ns-2": {},
},
Valid: true,
},
"service-2": {
ID: "service-2",
Name: "service-2",
Namespace: "ns-2",
ExportTo: map[string]struct{}{},
Valid: true,
},
"service-3": {
ID: "service-3",
Name: "service-3",
Namespace: "ns-3",
ExportTo: map[string]struct{}{
"ns-2": {},
},
Valid: true,
},
}

svcCache := NewServiceCache(storage, mockCacheMgr).(*serviceCache)
mockInstCache := NewInstanceCache(storage, mockCacheMgr)
mockCacheMgr.EXPECT().GetCacher(types.CacheInstance).Return(mockInstCache).AnyTimes()
mockCacheMgr.EXPECT().GetCacher(types.CacheService).Return(svcCache).AnyTimes()
_ = svcCache.Initialize(map[string]interface{}{})
_ = mockInstCache.Initialize(map[string]interface{}{})
t.Cleanup(func() {
_ = svcCache.Close()
_ = mockInstCache.Close()
})

_, _, _ = svcCache.setServices(serviceList)
visibles := svcCache.GetVisibleServicesInOtherNamespace("service-1", "ns-2")
assert.Equal(t, 1, len(visibles))
assert.Equal(t, "ns-1", visibles[0].Namespace)
})

t.Run("服务可见性查询判断", func(t *testing.T) {
serviceList := map[string]*model.Service{
"service-1": {
ID: "service-1",
Name: "service-1",
Namespace: "ns-1",
Valid: true,
},
"service-2": {
ID: "service-2",
Name: "service-2",
Namespace: "ns-2",
Valid: true,
},
"service-3": {
ID: "service-3",
Name: "service-3",
Namespace: "ns-3",
Valid: true,
},
"service-4": {
ID: "service-4",
Name: "service-4",
Namespace: "ns-4",
Valid: true,
},
}

svcCache := NewServiceCache(storage, mockCacheMgr).(*serviceCache)
mockInstCache := NewInstanceCache(storage, mockCacheMgr)
mockCacheMgr.EXPECT().GetCacher(types.CacheInstance).Return(mockInstCache).AnyTimes()
mockCacheMgr.EXPECT().GetCacher(types.CacheService).Return(svcCache).AnyTimes()
_ = svcCache.Initialize(map[string]interface{}{})
_ = mockInstCache.Initialize(map[string]interface{}{})
t.Cleanup(func() {
_ = svcCache.Close()
_ = mockInstCache.Close()
})

_, _, _ = svcCache.setServices(serviceList)

svcCache.handleNamespaceChange(context.Background(), &eventhub.CacheNamespaceEvent{
EventType: eventhub.EventCreated,
Item: &model.Namespace{
Name: "ns-1",
ServiceExportTo: map[string]struct{}{
"ns-2": {},
"ns-3": {},
},
},
})

visibles := svcCache.GetVisibleServicesInOtherNamespace("service-1", "ns-2")
assert.Equal(t, 1, len(visibles))
assert.Equal(t, "ns-1", visibles[0].Namespace)

visibles = svcCache.GetVisibleServicesInOtherNamespace("service-1", "ns-3")
assert.Equal(t, 1, len(visibles))
assert.Equal(t, "ns-1", visibles[0].Namespace)

visibles = svcCache.GetVisibleServicesInOtherNamespace("service-1", "ns-4")
assert.Equal(t, 0, len(visibles))
})

}
8 changes: 8 additions & 0 deletions common/model/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ import (
"github.com/polarismesh/polaris/common/utils"
)

func ExportToMap(exportTo []*wrappers.StringValue) map[string]struct{} {
ret := make(map[string]struct{})
for _, v := range exportTo {
ret[v.Value] = struct{}{}
}
return ret
}

// Namespace 命名空间结构体
type Namespace struct {
Name string
Expand Down
18 changes: 13 additions & 5 deletions namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func (s *Server) CreateNamespace(ctx context.Context, req *apimodel.Namespace) *
*/
func (s *Server) createNamespaceModel(req *apimodel.Namespace) *model.Namespace {
namespace := &model.Namespace{
Name: req.GetName().GetValue(),
Comment: req.GetComment().GetValue(),
Owner: req.GetOwners().GetValue(),
Token: utils.NewUUID(),
Name: req.GetName().GetValue(),
Comment: req.GetComment().GetValue(),
Owner: req.GetOwners().GetValue(),
Token: utils.NewUUID(),
ServiceExportTo: model.ExportToMap(req.GetServiceExportTo()),
}

return namespace
}

Expand Down Expand Up @@ -298,6 +298,13 @@ func (s *Server) updateNamespaceAttribute(req *apimodel.Namespace, namespace *mo
if req.GetOwners() != nil {
namespace.Owner = req.GetOwners().GetValue()
}

exportTo := map[string]struct{}{}
for i := range req.GetServiceExportTo() {
exportTo[req.GetServiceExportTo()[i].GetValue()] = struct{}{}
}

namespace.ServiceExportTo = exportTo
}

// UpdateNamespaceToken 更新命名空间token
Expand Down Expand Up @@ -360,6 +367,7 @@ func (s *Server) GetNamespaces(ctx context.Context, query map[string][]string) *
TotalServiceCount: utils.NewUInt32Value(nsCntInfo.ServiceCount),
TotalInstanceCount: utils.NewUInt32Value(nsCntInfo.InstanceCnt.TotalInstanceCount),
TotalHealthInstanceCount: utils.NewUInt32Value(nsCntInfo.InstanceCnt.HealthyInstanceCount),
ServiceExportTo: namespace.ListServiceExportTo(),
})
totalServiceCount += nsCntInfo.ServiceCount
totalInstanceCount += nsCntInfo.InstanceCnt.TotalInstanceCount
Expand Down
27 changes: 23 additions & 4 deletions service/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,12 @@ func (s *Server) ServiceInstancesCache(ctx context.Context, filter *apiservice.D
}

// 数据源都来自Cache,这里拿到的service,已经是源服务
aliasFor := s.getServiceCache(serviceName, namespaceName)
if aliasFor == nil {
aliasFor, visibleServices := s.findVisibleServices(serviceName, namespaceName, req)
if len(visibleServices) == 0 {
log.Infof("[Server][Service][Instance] not found name(%s) namespace(%s) service",
serviceName, namespaceName)
return api.NewDiscoverInstanceResponse(apimodel.Code_NotFoundResource, req)
}
visibleServices := s.caches.Service().GetVisibleServicesInOtherNamespace(aliasFor.Name, aliasFor.Namespace)
visibleServices = append(visibleServices, aliasFor)

revisions := make([]string, 0, len(visibleServices)+1)
finalInstances := make(map[string]*apiservice.Instance, 128)
Expand Down Expand Up @@ -264,6 +262,27 @@ func (s *Server) ServiceInstancesCache(ctx context.Context, filter *apiservice.D
return resp
}

func (s *Server) findVisibleServices(serviceName, namespaceName string, req *apiservice.Service) (*model.Service, []*model.Service) {
visibleServices := make([]*model.Service, 0, 4)
// 数据源都来自Cache,这里拿到的service,已经是源服务
aliasFor := s.getServiceCache(serviceName, namespaceName)
if aliasFor == nil {
aliasFor = &model.Service{
Name: serviceName,
Namespace: namespaceName,
}
ret := s.caches.Service().GetVisibleServicesInOtherNamespace(serviceName, namespaceName)
if len(ret) == 0 {
return nil, nil
}
visibleServices = append(visibleServices, ret...)
} else {
visibleServices = append(visibleServices, aliasFor)
}

return aliasFor, visibleServices
}

// GetRoutingConfigWithCache 获取缓存中的路由配置信息
func (s *Server) GetRoutingConfigWithCache(ctx context.Context, req *apiservice.Service) *apiservice.DiscoverResponse {
resp := createCommonDiscoverResponse(req, apiservice.DiscoverResponse_ROUTING)
Expand Down
Loading

0 comments on commit 756c945

Please sign in to comment.