Skip to content

Commit

Permalink
fix:change file will not notify to client
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Sep 18, 2023
1 parent 06e5409 commit f3707bb
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 133 deletions.
31 changes: 13 additions & 18 deletions cache/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,10 @@ func (fc *fileCache) GetGroupActiveReleases(namespace, group string) ([]*model.C
return nil, ""
}
ret := make([]*model.ConfigFileRelease, 0, 8)
groupBucket.Range(func(key string, val *model.SimpleConfigFileRelease) bool {
groupBucket.ReadRange(func(key string, val *model.SimpleConfigFileRelease) {
ret = append(ret, &model.ConfigFileRelease{
SimpleConfigFileRelease: val,
})
return true
})
groupRevisions, ok := fc.activeReleaseRevisions.Load(namespace)
if !ok {
Expand Down Expand Up @@ -431,46 +430,42 @@ func (fc *fileCache) QueryReleases(args *types.ConfigReleaseArgs) (uint32, []*mo
}

values := make([]*model.SimpleConfigFileRelease, 0, args.Limit)
fc.name2release.Range(func(namespace string, groups *utils.SyncMap[string, *utils.SyncMap[string,
*utils.SyncMap[string, *model.SimpleConfigFileRelease]]]) bool {
fc.name2release.ReadRange(func(namespace string, groups *utils.SyncMap[string, *utils.SyncMap[string,
*utils.SyncMap[string, *model.SimpleConfigFileRelease]]]) {

if args.Namespace != "" && utils.IsWildNotMatch(namespace, args.Namespace) {
return true
return
}
groups.Range(func(group string, files *utils.SyncMap[string, *utils.SyncMap[string,
*model.SimpleConfigFileRelease]]) bool {
groups.ReadRange(func(group string, files *utils.SyncMap[string, *utils.SyncMap[string,
*model.SimpleConfigFileRelease]]) {

if args.Group != "" && utils.IsWildNotMatch(group, args.Group) {
return true
return
}
files.Range(func(fileName string, releases *utils.SyncMap[string, *model.SimpleConfigFileRelease]) bool {
files.Range(func(fileName string, releases *utils.SyncMap[string, *model.SimpleConfigFileRelease]) {
if args.FileName != "" && utils.IsWildNotMatch(fileName, args.FileName) {
return true
return
}
releases.Range(func(releaseName string, item *model.SimpleConfigFileRelease) bool {
releases.Range(func(releaseName string, item *model.SimpleConfigFileRelease) {
if args.ReleaseName != "" && utils.IsWildNotMatch(item.Name, args.ReleaseName) {
return true
return
}
if args.OnlyActive && !item.Active {
return true
return
}
if len(args.Metadata) > 0 {
for k, v := range args.Metadata {
sv := item.Metadata[k]
if sv != v {
return true
return
}
}
}

values = append(values, item)
return true
})
return true
})
return true
})
return true
})

sort.Slice(values, func(i, j int) bool {
Expand Down
6 changes: 2 additions & 4 deletions cache/config/config_file_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (fc *fileCache) reportMetricsInfo() {
}
}

fc.metricsReleaseCount.Range(func(namespace string, groups *utils.SyncMap[string, uint64]) bool {
groups.Range(func(groupName string, count uint64) bool {
fc.metricsReleaseCount.ReadRange(func(namespace string, groups *utils.SyncMap[string, uint64]) {
groups.ReadRange(func(groupName string, count uint64) {
metricValues = append(metricValues, metrics.ConfigMetrics{
Type: metrics.ReleaseFileMetric,
Total: int64(count),
Expand All @@ -79,9 +79,7 @@ func (fc *fileCache) reportMetricsInfo() {
metrics.LabelGroup: groupName,
},
})
return true
})
return true
})

plugin.GetStatis().ReportConfigMetrics(metricValues...)
Expand Down
16 changes: 7 additions & 9 deletions cache/config/config_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,32 +194,30 @@ func (fc *configGroupCache) Query(args *types.ConfigGroupArgs) (uint32, []*model
}

values := make([]*model.ConfigFileGroup, 0, 8)
fc.name2groups.Range(func(namespce string, groups *utils.SyncMap[string, *model.ConfigFileGroup]) bool {
fc.name2groups.ReadRange(func(namespce string, groups *utils.SyncMap[string, *model.ConfigFileGroup]) {
if args.Namespace != "" && utils.IsWildNotMatch(namespce, args.Namespace) {
return true
return
}
groups.Range(func(name string, group *model.ConfigFileGroup) bool {
groups.ReadRange(func(name string, group *model.ConfigFileGroup) {
if args.Name != "" && utils.IsWildNotMatch(name, args.Name) {
return true
return
}
if args.Business != "" && utils.IsWildNotMatch(group.Business, args.Business) {
return true
return
}
if args.Department != "" && utils.IsWildNotMatch(group.Department, args.Department) {
return true
return
}
if len(args.Metadata) > 0 {
for k, v := range args.Metadata {
sv, ok := group.Metadata[k]
if !ok || sv != v {
return true
return
}
}
}
values = append(values, group)
return true
})
return true
})

sort.Slice(values, func(i, j int) bool {
Expand Down
9 changes: 3 additions & 6 deletions cache/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,16 @@ func (nsCache *namespaceCache) GetVisibleNamespaces(namespace string) []*model.N

// 根据命名空间级别的可见性进行查询
// 先看精确的
nsCache.exportNamespace.Range(func(exportNs string, viewerNs *utils.SyncSet[string]) bool {
nsCache.exportNamespace.Range(func(exportNs string, viewerNs *utils.SyncSet[string]) {
exactMatch := viewerNs.Contains(namespace)
allMatch := viewerNs.Contains(types.AllMatched)
if !exactMatch && !allMatch {
return true
return
}
val := nsCache.GetNamespace(exportNs)
if val != nil {
ret[val.Name] = val
}
return true
})

values := make([]*model.Namespace, 0, len(ret))
Expand Down Expand Up @@ -209,10 +208,8 @@ func (nsCache *namespaceCache) GetNamespacesByName(names []string) []*model.Name
func (nsCache *namespaceCache) GetNamespaceList() []*model.Namespace {
nsArr := make([]*model.Namespace, 0, 8)

nsCache.ids.Range(func(key string, ns *model.Namespace) bool {
nsCache.ids.Range(func(key string, ns *model.Namespace) {
nsArr = append(nsArr, ns)

return true
})

return nsArr
Expand Down
22 changes: 5 additions & 17 deletions cache/service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,7 @@ func (ic *instanceCache) IteratorInstancesWithService(serviceID string, iterProc

// GetInstancesCount 获取实例的个数
func (ic *instanceCache) GetInstancesCount() int {
count := 0
ic.ids.Range(func(key string, value *model.Instance) bool {
count++
return true
})

return count
return ic.ids.Len()
}

// GetInstanceLabels 获取某个服务下实例的所有标签信息集合
Expand Down Expand Up @@ -548,17 +542,11 @@ func (ic *instanceCache) GetServicePorts(serviceID string) []*model.ServicePort

// iteratorInstancesProc 迭代指定的instance数据,id->instance
func iteratorInstancesProc(data *utils.SyncMap[string, *model.Instance], iterProc types.InstanceIterProc) error {
var (
cont bool
err error
)

proc := func(k string, v *model.Instance) bool {
cont, err = iterProc(k, v)
if err != nil {
return false
var err error
proc := func(k string, v *model.Instance) {
if _, err = iterProc(k, v); err != nil {
return
}
return cont
}

data.Range(proc)
Expand Down
6 changes: 3 additions & 3 deletions cache/service/instance_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func (ic *instanceCache) reportMetricsInfo() {
})

// instance count metrics
ic.instanceCounts.Range(func(serviceID string, countInfo *model.InstanceCount) bool {
ic.instanceCounts.ReadRange(func(serviceID string, countInfo *model.InstanceCount) {
svc := serviceCache.GetServiceByID(serviceID)
if svc == nil {
log.Debug("[Cache][Instance] report metrics get service not found", zap.String("svc-id", serviceID))
return true
return
}

if _, ok := onlineService[svc.Namespace]; !ok {
Expand Down Expand Up @@ -95,7 +95,7 @@ func (ic *instanceCache) reportMetricsInfo() {
},
})

return true
return
})

for ns := range allServices {
Expand Down
13 changes: 5 additions & 8 deletions cache/service/l5.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (lc *l5Cache) GetRouteByIP(ip uint32) []*model.Route {
return out
}

entry.Range(func(key string, value string) bool {
entry.ReadRange(func(key string, value string) {
// sidStr -> setID
sid, err := model.UnmarshalSid(key)
if err != nil {
return true
return
}

item := &model.Route{
Expand All @@ -126,7 +126,6 @@ func (lc *l5Cache) GetRouteByIP(ip uint32) []*model.Route {
SetID: value,
}
out = append(out, item)
return true
})

return out
Expand All @@ -140,19 +139,17 @@ func (lc *l5Cache) CheckRouteExisted(ip uint32, modID uint32, cmdID uint32) bool
}

found := false
entry.Range(func(key string, value string) bool {
entry.ReadRange(func(key string, value string) {
sid, err := model.UnmarshalSid(key)
if err != nil {
// continue range
return true
return
}

if modID == sid.ModID && cmdID == sid.CmdID {
found = true
// break range
return false
return
}
return true
})

return found
Expand Down
10 changes: 2 additions & 8 deletions cache/service/ratelimit_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,13 @@ type rateLimitRuleBucket struct {
}

func (r *rateLimitRuleBucket) foreach(proc types.RateLimitIterProc) {
r.rules.Range(func(key string, val *subRateLimitRuleBucket) bool {
r.rules.Range(func(key string, val *subRateLimitRuleBucket) {
val.foreach(proc)
return true
})
}

func (r *rateLimitRuleBucket) count() int {
count := 0
r.rules.Range(func(key string, val *subRateLimitRuleBucket) bool {
count += val.count()
return true
})
return count
return r.rules.Len()
}

func (r *rateLimitRuleBucket) saveRule(rule *model.RateLimit) {
Expand Down
45 changes: 14 additions & 31 deletions cache/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,12 @@ func (sc *serviceCache) CleanNamespace(namespace string) {

// IteratorServices 对缓存中的服务进行迭代
func (sc *serviceCache) IteratorServices(iterProc types.ServiceIterProc) error {
var (
cont bool
err error
)

proc := func(k string, svc *model.Service) bool {
var err error
proc := func(k string, svc *model.Service) {
sc.fillServicePorts(svc)
cont, err = iterProc(k, svc)
if err != nil {
return false
if _, err = iterProc(k, svc); err != nil {
return
}
return cont
}
sc.ids.Range(proc)
return err
Expand All @@ -330,13 +324,7 @@ func (sc *serviceCache) GetNamespaceCntInfo(namespace string) model.NamespaceSer

// GetServicesCount 获取缓存中服务的个数
func (sc *serviceCache) GetServicesCount() int {
count := 0
sc.ids.Range(func(key string, value *model.Service) bool {
count++
return true
})

return count
return sc.ids.Len()
}

// ListServices get service list and revision by namespace
Expand Down Expand Up @@ -505,14 +493,13 @@ func (sc *serviceCache) appendServiceCountChangeNamespace(changeNs map[string]st
sc.plock.Lock()
defer sc.plock.Unlock()
waitDel := map[string]struct{}{}
sc.pendingServices.Range(func(svcId string, _ struct{}) bool {
sc.pendingServices.ReadRange(func(svcId string, _ struct{}) {
svc, ok := sc.ids.Load(svcId)
if !ok {
return true
return
}
changeNs[svc.Namespace] = struct{}{}
waitDel[svcId] = struct{}{}
return true
})
for svcId := range waitDel {
sc.pendingServices.Delete(svcId)
Expand Down Expand Up @@ -563,12 +550,11 @@ func (sc *serviceCache) postProcessUpdatedServices(affect map[string]struct{}) {
count.ServiceCount = 0
count.InstanceCnt = &model.InstanceCount{}

value.Range(func(key string, svc *model.Service) bool {
value.ReadRange(func(key string, svc *model.Service) {
count.ServiceCount++
insCnt := sc.instCache.GetInstancesCountByServiceID(svc.ID)
count.InstanceCnt.TotalInstanceCount += insCnt.TotalInstanceCount
count.InstanceCnt.HealthyInstanceCount += insCnt.HealthyInstanceCount
return true
})
}
}
Expand Down Expand Up @@ -603,32 +589,29 @@ func (sc *serviceCache) updateCl5SidAndNames(service *model.Service) {
func (sc *serviceCache) GetVisibleServicesInOtherNamespace(svcName, namespace string) []*model.Service {
ret := make(map[string]*model.Service)
// 根据服务级别的可见性进行查询, 先查询精确匹配
sc.exportServices.Range(func(exportToNs string, services *utils.SyncMap[string, *model.Service]) bool {
sc.exportServices.ReadRange(func(exportToNs string, services *utils.SyncMap[string, *model.Service]) {
if exportToNs != namespace && exportToNs != types.AllMatched {
return true
return
}
services.Range(func(_ string, svc *model.Service) bool {
services.ReadRange(func(_ string, svc *model.Service) {
if svc.Name == svcName && svc.Namespace != namespace {
ret[svc.ID] = svc
}
return true
})
return true
})

// 根据命名空间级别的可见性进行查询, 先看精确的
sc.exportNamespace.Range(func(exportNs string, viewerNs *utils.SyncSet[string]) bool {
sc.exportNamespace.ReadRange(func(exportNs string, viewerNs *utils.SyncSet[string]) {
exactMatch := viewerNs.Contains(namespace)
allMatch := viewerNs.Contains(types.AllMatched)
if !exactMatch && !allMatch {
return true
return
}
svc := sc.GetServiceByName(svcName, exportNs)
if svc == nil {
return true
return
}
ret[svc.ID] = svc
return true
})

visibleServices := make([]*model.Service, 0, len(ret))
Expand Down
Loading

0 comments on commit f3707bb

Please sign in to comment.