Skip to content

Commit

Permalink
Merge pull request #1 from polarismesh/release-v1.14.0
Browse files Browse the repository at this point in the history
Release v1.14.0
  • Loading branch information
andrewshan authored Feb 25, 2023
2 parents 453bb3b + 3130527 commit 57c49e3
Show file tree
Hide file tree
Showing 45 changed files with 1,751 additions and 1,444 deletions.
1 change: 1 addition & 0 deletions apiserver/eurekaserver/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestEurekaServer_renew(t *testing.T) {
},
}, nil)

s.EXPECT().GetServicesCount().AnyTimes().Return(uint32(1), nil)
s.EXPECT().GetInstancesCount().AnyTimes().Return(uint32(1), nil)
s.EXPECT().GetUnixSecond().AnyTimes().Return(time.Now().Unix(), nil)
s.EXPECT().Destroy().Return(nil)
Expand Down
5 changes: 5 additions & 0 deletions auth/defaultauth/auth_mgn_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_defaultAuthChecker_VerifyCredential(t *testing.T) {

storage := storemock.NewMockStore(ctrl)

storage.EXPECT().GetServicesCount().Return(uint32(0), nil).AnyTimes()
storage.EXPECT().GetUnixSecond().AnyTimes().Return(time.Now().Unix(), nil)
storage.EXPECT().GetUsersForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(users, nil)
storage.EXPECT().GetGroupsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return([]*model.UserGroupDetail{}, nil)
Expand Down Expand Up @@ -216,6 +217,7 @@ func Test_defaultAuthChecker_CheckPermission_Write_NoStrict(t *testing.T) {

cfg, storage := initCache(ctrl)

storage.EXPECT().GetServicesCount().Return(uint32(0), nil).AnyTimes()
storage.EXPECT().GetUsersForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(users, nil)
storage.EXPECT().GetGroupsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(groups, nil)
storage.EXPECT().GetStrategyDetailsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(strategies, nil)
Expand Down Expand Up @@ -456,6 +458,7 @@ func Test_defaultAuthChecker_CheckPermission_Write_Strict(t *testing.T) {

cfg, storage := initCache(ctrl)

storage.EXPECT().GetServicesCount().Return(uint32(0), nil).AnyTimes()
storage.EXPECT().GetUsersForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(users, nil)
storage.EXPECT().GetGroupsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(groups, nil)
storage.EXPECT().GetStrategyDetailsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(strategies, nil)
Expand Down Expand Up @@ -652,6 +655,7 @@ func Test_defaultAuthChecker_CheckPermission_Read_NoStrict(t *testing.T) {

cfg, storage := initCache(ctrl)

storage.EXPECT().GetServicesCount().Return(uint32(0), nil).AnyTimes()
storage.EXPECT().GetUsersForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(users, nil)
storage.EXPECT().GetGroupsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(groups, nil)
storage.EXPECT().GetStrategyDetailsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(strategies, nil)
Expand Down Expand Up @@ -869,6 +873,7 @@ func Test_defaultAuthChecker_CheckPermission_Read_Strict(t *testing.T) {

cfg, storage := initCache(ctrl)

storage.EXPECT().GetServicesCount().Return(uint32(0), nil).AnyTimes()
storage.EXPECT().GetUsersForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(users, nil)
storage.EXPECT().GetGroupsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(groups, nil)
storage.EXPECT().GetStrategyDetailsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(strategies, nil)
Expand Down
1 change: 1 addition & 0 deletions auth/defaultauth/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func newStrategyTest(t *testing.T) *StrategyTest {

storage := storemock.NewMockStore(ctrl)

storage.EXPECT().GetServicesCount().AnyTimes().Return(uint32(0), nil)
storage.EXPECT().GetUnixSecond().AnyTimes().Return(time.Now().Unix(), nil)
storage.EXPECT().GetUsersForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(users, nil)
storage.EXPECT().GetGroupsForCache(gomock.Any(), gomock.Any()).AnyTimes().Return(groups, nil)
Expand Down
190 changes: 143 additions & 47 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"hash"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/polarismesh/polaris/common/metrics"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/store"
)
Expand All @@ -46,6 +46,7 @@ var (
_ StrategyCache = (*strategyCache)(nil)
_ L5Cache = (*l5Cache)(nil)
_ FileCache = (*fileCache)(nil)
_ FaultDetectCache = (*faultDetectCache)(nil)
)

const (
Expand All @@ -71,45 +72,42 @@ const (
type CacheName string

const (
CacheNameService CacheName = "Service"
CacheNameInstance CacheName = "Instance"
CacheNameRoutingConfig CacheName = "RoutingConfig"
CacheNameCL5 CacheName = "CL5"
CacheNameRateLimit CacheName = "RateLimit"
CacheNameCircuitBreaker CacheName = "CircuitBreaker"
CacheNameUser CacheName = "User"
CacheNameAuthStrategy CacheName = "AuthStrategy"
CacheNameNamespace CacheName = "Namespace"
CacheNameClient CacheName = "Client"
CacheNameConfigFile CacheName = "ConfigFile"
CacheNameService CacheName = "Service"
CacheNameInstance CacheName = "Instance"
CacheNameRoutingConfig CacheName = "RoutingConfig"
CacheNameCL5 CacheName = "CL5"
CacheNameRateLimit CacheName = "RateLimit"
CacheNameCircuitBreaker CacheName = "CircuitBreaker"
CacheNameUser CacheName = "User"
CacheNameAuthStrategy CacheName = "AuthStrategy"
CacheNameNamespace CacheName = "Namespace"
CacheNameClient CacheName = "Client"
CacheNameConfigFile CacheName = "ConfigFile"
CacheNameFaultDetectRule CacheName = "FaultDetectRule"
)

var (
cacheIndexMap = map[CacheName]int{
CacheNameService: CacheService,
CacheNameInstance: CacheInstance,
CacheNameRoutingConfig: CacheRoutingConfig,
CacheNameCL5: CacheCL5,
CacheNameRateLimit: CacheRateLimit,
CacheNameCircuitBreaker: CacheCircuitBreaker,
CacheNameUser: CacheUser,
CacheNameAuthStrategy: CacheAuthStrategy,
CacheNameNamespace: CacheNamespace,
CacheNameClient: CacheClient,
CacheNameConfigFile: CacheConfigFile,
CacheNameService: CacheService,
CacheNameInstance: CacheInstance,
CacheNameRoutingConfig: CacheRoutingConfig,
CacheNameCL5: CacheCL5,
CacheNameRateLimit: CacheRateLimit,
CacheNameCircuitBreaker: CacheCircuitBreaker,
CacheNameUser: CacheUser,
CacheNameAuthStrategy: CacheAuthStrategy,
CacheNameNamespace: CacheNamespace,
CacheNameClient: CacheClient,
CacheNameConfigFile: CacheConfigFile,
CacheNameFaultDetectRule: CacheFaultDetector,
}
)

const (
var (
// DefaultTimeDiff default time diff
DefaultTimeDiff = -10 * time.Second
DefaultTimeDiff = -5 * time.Second
)

type Args struct {
// StoreTimeRollbackSec 存储层时钟回拨情况
StoreTimeRollbackSec time.Duration
}

// Cache 缓存接口
type Cache interface {
// initialize
Expand All @@ -119,7 +117,7 @@ type Cache interface {
addListener(listeners []Listener)

// update
update(storeRollbackSec time.Duration) error
update() error

// clear
clear() error
Expand All @@ -130,19 +128,119 @@ type Cache interface {

// baseCache 对于 Cache 中的一些 func 做统一实现,避免重复逻辑
type baseCache struct {
manager *listenerManager
lock sync.RWMutex
firtstUpdate bool
s store.Store
lastFetchTime int64
lastMtimes map[string]time.Time
manager *listenerManager
}

func newBaseCache(s store.Store) *baseCache {
c := &baseCache{
s: s,
}

c.initialize()
return c
}

func (bc *baseCache) initialize() {
bc.lock.Lock()
defer bc.lock.Unlock()

bc.lastFetchTime = 0
bc.firtstUpdate = true
bc.manager = &listenerManager{
listeners: make([]Listener, 0, 4),
}
bc.lastMtimes = map[string]time.Time{}
}

var (
zeroTime = time.Unix(0, 0)
)

func (bc *baseCache) resetLastMtime(label string) {
bc.lock.Lock()
defer bc.lock.Unlock()
bc.lastMtimes[label] = time.Unix(0, 0)
}

func (bc *baseCache) LastMtime(label string) time.Time {
bc.lock.RLock()
defer bc.lock.RUnlock()
v, ok := bc.lastMtimes[label]
if ok {
return v
}

return time.Unix(0, 0)
}

func (bc *baseCache) LastFetchTime() time.Time {
lastTime := time.Unix(bc.lastFetchTime, 0)
tmp := lastTime.Add(DefaultTimeDiff)
if zeroTime.After(tmp) {
return lastTime
}
lastTime = tmp
return lastTime
}

func (bc *baseCache) isFirstUpdate() bool {
return bc.firtstUpdate
}

func newBaseCache() *baseCache {
return &baseCache{
manager: &listenerManager{
listeners: make([]Listener, 0, 4),
},
// update
func (bc *baseCache) doCacheUpdate(name string, executor func() (map[string]time.Time, int64, error)) error {
curStoreTime, err := bc.s.GetUnixSecond()
if err != nil {
curStoreTime = bc.lastFetchTime
log.Warnf("[Cache][%s] get store timestamp fail, skip update lastMtime, err : %v", name, err)
}
defer func() {
bc.lastFetchTime = curStoreTime
}()

start := time.Now()
lastMtimes, total, err := executor()
if err != nil {
return err
}

bc.lock.Lock()
defer bc.lock.Unlock()
if len(lastMtimes) != 0 {
if len(bc.lastMtimes) != 0 {
for label, lastMtime := range lastMtimes {
preLastMtime := bc.lastMtimes[label]
log.Infof("[Cache][%s] lastMtime update from %s to %s",
label, preLastMtime, lastMtime)
}
}
bc.lastMtimes = lastMtimes
}

if total >= 0 {
metrics.RecordCacheUpdateCost(time.Since(start), name, total)
}
bc.firtstUpdate = false
return nil
}

func (bc *baseCache) clear() {
bc.lock.Lock()
defer bc.lock.Unlock()
bc.lastMtimes = make(map[string]time.Time)
bc.lastFetchTime = 0
bc.firtstUpdate = true
}

// addListener 添加
func (bc *baseCache) addListener(listeners []Listener) {
bc.lock.Lock()
defer bc.lock.Unlock()
bc.manager.listeners = append(bc.manager.listeners, listeners...)
}

Expand Down Expand Up @@ -177,14 +275,17 @@ type CacheManager struct {
storage store.Store
caches []Cache

comRevisionCh chan *revisionNotify
revisions map[string]string // service id -> reversion (所有instance reversion 的累计计算值)
lock sync.RWMutex // for revisions rw lock
storeTimeDiffSec int64
comRevisionCh chan *revisionNotify
revisions map[string]string // service id -> reversion (所有instance reversion 的累计计算值)
lock sync.RWMutex // for revisions rw lock
}

// initialize 缓存对象初始化
func (nc *CacheManager) initialize() error {
if config.DiffTime != 0 {
DefaultTimeDiff = config.DiffTime
}

for _, obj := range nc.caches {
var option map[string]interface{}
for _, entry := range config.Resources {
Expand Down Expand Up @@ -212,10 +313,7 @@ func (nc *CacheManager) update() error {
wg.Add(1)
go func(c Cache) {
defer wg.Done()

sec := atomic.LoadInt64(&nc.storeTimeDiffSec)

_ = c.update(time.Duration(sec * int64(time.Second)))
_ = c.update()
}(nc.caches[index])
}

Expand Down Expand Up @@ -260,8 +358,6 @@ func (nc *CacheManager) Start(ctx context.Context) error {
// 先启动revision计算协程
go nc.revisionWorker(ctx)

go nc.watchStoreTime(ctx)

// 启动的时候,先更新一版缓存
log.Infof("[Cache] cache update now first time")
if err := nc.update(); err != nil {
Expand Down
Loading

0 comments on commit 57c49e3

Please sign in to comment.