-
Notifications
You must be signed in to change notification settings - Fork 835
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* adjust and fix tests for flattened versions * tidy up state manager * model_state.go cleanup * re-instate getVersionsForAllModels * fix model_state_test.go * fix rproxy_grpc_test.go * fixes after sorting build * lint * sperate lock in a different stuct * fix tests * remove LRU dep from state manager * add a test for reload lock * remove reload lock * tidy up names * add defer * add logging * add transaction for unload * wait for item in case of of get and delete * add a test to check state of models * set log level to info for now * fix failing test * add state check to remaining tests * wait for item lock outside of global mutex * revert change as will be considered in another PR * post merge fixes * display diff in model states (test) * clean up load model from versions * tidy up failure in loadModel * dont do remove model version in client.go * move message to debug * combine cache and tx implementation * working without unloading vs evict * state is inconsistent still (evict - unload) * adding peek , state is ok * fix lrucache test * simplify apis * simplify unload logic * only hold write lock when model is not in memory * remove extra internal locks * remove control plane lock! * re introduce txmanager * tidy up comments * further tidy up * fix lint * minor fix log msgs * fix lint * add scheduler address and port * tidy up post merge * Review comments
- Loading branch information
Showing
11 changed files
with
455 additions
and
344 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package cache | ||
|
||
import ( | ||
"sync" | ||
|
||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
type CacheTransactionManager struct { | ||
cache CacheManager | ||
itemLocks sync.Map | ||
logger log.FieldLogger | ||
} | ||
|
||
func (tx *CacheTransactionManager) itemLock(id string) { | ||
var lock sync.RWMutex | ||
existingLock, _ := tx.itemLocks.LoadOrStore(id, &lock) | ||
existingLock.(*sync.RWMutex).Lock() | ||
} | ||
|
||
func (tx *CacheTransactionManager) itemRLock(id string) { | ||
var lock sync.RWMutex | ||
existingLock, _ := tx.itemLocks.LoadOrStore(id, &lock) | ||
existingLock.(*sync.RWMutex).RLock() | ||
} | ||
|
||
func (tx *CacheTransactionManager) itemRUnLock(id string) { | ||
existingLock, loaded := tx.itemLocks.Load(id) | ||
if loaded { | ||
existingLock.(*sync.RWMutex).RUnlock() | ||
} | ||
} | ||
|
||
func (tx *CacheTransactionManager) itemUnLock(id string) { | ||
existingLock, loaded := tx.itemLocks.Load(id) | ||
if loaded { | ||
existingLock.(*sync.RWMutex).Unlock() | ||
} | ||
} | ||
|
||
func (tx *CacheTransactionManager) itemWait(id string) { | ||
existingLock, loaded := tx.itemLocks.Load(id) | ||
if loaded { | ||
existingLock.(*sync.RWMutex).RLock() | ||
defer existingLock.(*sync.RWMutex).RUnlock() | ||
} | ||
} | ||
|
||
func (tx *CacheTransactionManager) Lock(id string) { | ||
tx.itemLock(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) Unlock(id string) { | ||
tx.itemUnLock(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) RLock(id string) { | ||
tx.itemRLock(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) RUnlock(id string) { | ||
tx.itemRUnLock(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) StartEvict(id string) (func(), error) { | ||
// we also return a function to call at the end of the transaction | ||
|
||
tx.itemLock(id) | ||
// TODO: this can be made efficient as top of queue could still be id? | ||
return func() { | ||
tx.itemUnLock(id) | ||
}, tx.cache.Delete(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) StartReloadIfNotExists(id string) (func(), bool) { | ||
// TODO: how can we simplify the logic in this function, perhaps by introducing another lock? | ||
// we also return a function to call at the end of the transaction | ||
|
||
exists := tx.Exists(id, true) | ||
if !exists { | ||
tx.itemLock(id) | ||
// check again here if item is still not in cache | ||
exists = tx.Exists(id, false) | ||
|
||
if exists { | ||
// TODO: what will happen if something happens between unlock and rlock? | ||
// 404? | ||
tx.itemUnLock(id) | ||
tx.itemRLock(id) | ||
} | ||
} else { | ||
// TODO: it is possible because of race conditions that the item is not anymore | ||
// in the cache. this will probably fail downstream somewhere | ||
// perhaps it is fine because of have retries anyway | ||
tx.itemRLock(id) | ||
} | ||
if exists { | ||
return func() { | ||
tx.itemRUnLock(id) | ||
}, true | ||
} else { | ||
return func() { | ||
tx.itemUnLock(id) | ||
}, false | ||
} | ||
|
||
} | ||
|
||
func (tx *CacheTransactionManager) Exists(id string, waitOnItem bool) bool { | ||
if waitOnItem { | ||
tx.itemWait(id) | ||
} | ||
return tx.cache.Exists(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) UpdateDefault(id string) error { | ||
return tx.cache.UpdateDefault(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) AddDefault(id string) error { | ||
return tx.cache.AddDefault(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) Delete(id string) error { | ||
return tx.cache.Delete(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) GetItems() ([]string, []int64) { | ||
return tx.cache.GetItems() | ||
} | ||
|
||
func (tx *CacheTransactionManager) Get(id string) (int64, error) { | ||
return tx.cache.Get(id) | ||
} | ||
|
||
func (tx *CacheTransactionManager) Peek() (string, int64, error) { | ||
return tx.cache.Peek() | ||
} | ||
|
||
func newCacheTransactionManager(cache CacheManager, logger log.FieldLogger) *CacheTransactionManager { | ||
return &CacheTransactionManager{ | ||
cache: cache, | ||
itemLocks: sync.Map{}, | ||
logger: logger.WithField("Source", "CacheTransactionManager"), | ||
} | ||
} | ||
|
||
func NewLRUCacheTransactionManager(logger log.FieldLogger) *CacheTransactionManager { | ||
lru := MakeLRU(map[string]int64{}) | ||
return newCacheTransactionManager(lru, logger) | ||
} |
105 changes: 105 additions & 0 deletions
105
scheduler/pkg/agent/cache/cache_transaction_manager_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package cache | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
. "github.com/onsi/gomega" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
func setupTxManager() *CacheTransactionManager { | ||
logger := log.New() | ||
logger.SetLevel(log.DebugLevel) | ||
|
||
return NewLRUCacheTransactionManager(logger) | ||
} | ||
|
||
func TestLRUCacheReloadTransaction(t *testing.T) { | ||
g := NewGomegaWithT(t) | ||
|
||
type test struct { | ||
name string | ||
} | ||
tests := []test{ | ||
{name: "evicted_ok"}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
id := "dummy_1" | ||
|
||
txManager := setupTxManager() | ||
|
||
result := make(chan bool, 1) | ||
|
||
endReloadFn, exists := txManager.StartReloadIfNotExists(id) | ||
g.Expect(exists).To(Equal(false)) | ||
go func(r chan<- bool) { | ||
// this should be waiting | ||
_, result := txManager.StartReloadIfNotExists(id) | ||
r <- result | ||
}(result) | ||
_ = txManager.AddDefault(id) | ||
endReloadFn() | ||
|
||
actualResult := <-result | ||
g.Expect(actualResult).To(Equal(true)) | ||
|
||
}) | ||
} | ||
} | ||
|
||
func TestLRUCacheEvictTransaction(t *testing.T) { | ||
g := NewGomegaWithT(t) | ||
|
||
type test struct { | ||
name string | ||
} | ||
tests := []test{ | ||
{name: "evicted_ok"}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
id := "dummy_1" | ||
|
||
txManager := setupTxManager() | ||
_ = txManager.AddDefault(id) | ||
|
||
result := make(chan bool, 1) | ||
|
||
itemFromCache, _, _ := txManager.Peek() | ||
endEvictFn, _ := txManager.StartEvict(itemFromCache) // only one model really | ||
go func(r chan<- bool) { | ||
r <- txManager.Exists(itemFromCache, true) | ||
}(result) | ||
|
||
endEvictFn() | ||
|
||
actualResult := <-result | ||
g.Expect(actualResult).To(Equal(false)) | ||
|
||
}) | ||
} | ||
} | ||
|
||
func TestLRUCacheEvictTransactionEdgeCases(t *testing.T) { | ||
g := NewGomegaWithT(t) | ||
|
||
id := "model_1" | ||
|
||
txManager := setupTxManager() | ||
|
||
_ = txManager.AddDefault(id) | ||
|
||
endEvictFn, err := txManager.StartEvict(id) | ||
g.Expect(err).To(BeNil()) // no error here | ||
endEvictFn() | ||
|
||
endEvictFn, err = txManager.StartEvict(id) | ||
g.Expect(err).To(Equal(fmt.Errorf("could not find item model_1"))) | ||
endEvictFn() | ||
|
||
g.Expect(txManager.Exists(id, true)).To(Equal(false)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,24 @@ | ||
package cache | ||
|
||
type CacheManager interface { | ||
// evict the least priority node and return key, this should start a transaction (lock on id) | ||
StartEvict() (string, int64, error) | ||
// end the transaction, if false is provided the id should be put back in the queue | ||
EndEvict(string, int64, bool) error | ||
// add a new node with specific id and priority/value | ||
Add(id string, value int64) error | ||
Add(string, int64) error | ||
// add a new node with specific id and default priority/value | ||
AddDefault(id string) error | ||
AddDefault(string) error | ||
// update value for given id, which would reflect in order | ||
Update(id string, value int64) error | ||
// default bump value for given id, which would reflect in order | ||
UpdateDefault(id string) error | ||
UpdateDefault(string) error | ||
// check if value exists | ||
Exists(id string) bool | ||
Exists(string) bool | ||
// get value/priority of given id | ||
Get(id string) (int64, error) | ||
Get(string) (int64, error) | ||
// delete item with id from cache | ||
Delete(id string) error | ||
// get a list of all keys / values | ||
GetItems() ([]string, []int64) | ||
// peek top of queue (no evict) | ||
Peek() (string, int64, error) | ||
// evict | ||
Evict() (string, int64, error) | ||
} |
Oops, something went wrong.