Skip to content

Commit

Permalink
Sherif akoush/scv2 50/flatten versions (and various other fixes) (#86)
Browse files Browse the repository at this point in the history
* agent k6 testing helpers

* add agent testing scenario

* do not readd model to cache if error unload

* refactor test with setup and teardown

* increase setup timeout

* fetch model memory from agent state

* mlserver_exp1

* limit number of connections per host for rest rproxy

* add clients pool for reverse grpc proxy

* reduce number of models in test

* fix bug when mem is 0

* add grpc result

* tidy up protos

* failing test for memory calc

* try to lock model on evict

* add tx around evict

* add a test for evict transaction

* fix test and add EndEvict in interface

* integrate tx in state manager

* simple test to check models passed to mlserver

* fix lint and test

* remove unused func

* fix test

* add a delay

* tidy up test post merge

* remove extra check

* add helpers

* tests passing

* agent test changes

* fix to over commit notebook

* client tidy up

* wire up envoy

* rename Evict to StartEvict

* add grpc changes

* remove versions from incoming request from envoy

* remove version from grpc payload

* notebook changes (overcommit)

* changes for k6
  • Loading branch information
sakoush authored Feb 17, 2022
1 parent 592e1f7 commit 29bb402
Show file tree
Hide file tree
Showing 26 changed files with 1,151 additions and 3,184 deletions.
2 changes: 1 addition & 1 deletion scheduler/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ lint: ## Run go linters against code.
golangci-lint run --fix

test:
go test ./pkg/... -coverprofile cover.out
go test -v ./pkg/... -coverprofile cover.out


#####################################
Expand Down
58 changes: 50 additions & 8 deletions scheduler/notebooks/local-over-commit-test.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
"## Setup\n",
"\n",
"\n",
"* `make start-proxy`\n",
"* `make start-rclone`\n",
"* `make start-agent`\n",
"* `make start-mlserver`"
"* `make start-proxy-local`\n",
"* `make start-rclone-local`\n",
"* `make start-agent-local`\n",
"* `make start-mlserver-local` (note check that the models directory refelct the one for rclone)"
]
},
{
Expand Down Expand Up @@ -81,12 +81,53 @@
"for i in {1..10}; \n",
"do\n",
"\n",
"time curl -v http://0.0.0.0:9999/v2/models/iris$i/infer -H \"Content-Type: application/json\" -H \"Host: iris\"\\\n",
"curl http://0.0.0.0:9999/v2/models/iris/infer -H \"Content-Type: application/json\" -H \"Host: iris\" \\\n",
" -H \"seldon-internal-model: iris${i}_1\" -H \"seldon-model: iris\" \\\n",
" -d '{\"inputs\": [{\"name\": \"predict\", \"shape\": [1, 4], \"datatype\": \"FP32\", \"data\": [[1, 2, 3, 4]]}]}'\n",
"\n",
"done"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cc1210b5",
"metadata": {},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"curl http://0.0.0.0:9999/v2/models/iris/versions/xx/infer -H \"Content-Type: application/json\" -H \"Host: iris\" \\\n",
" -H \"seldon-internal-model: iris2_1\" -H \"seldon-model: iris\" \\\n",
" -d '{\"inputs\": [{\"name\": \"predict\", \"shape\": [1, 4], \"datatype\": \"FP32\", \"data\": [[1, 2, 3, 4]]}]}'"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "535d1b78",
"metadata": {},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"curl -v http://0.0.0.0:9999/v2/models/iris/versions/xx/ready -H \"Content-Type: application/json\" -H \"Host: iris\" \\\n",
" -H \"seldon-internal-model: iris2_1\" -H \"seldon-model: iris\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "792dc8a9",
"metadata": {},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"curl -v http://0.0.0.0:9999/v2/models/iris -H \"Content-Type: application/json\" -H \"Host: iris\" \\\n",
" -H \"seldon-internal-model: iris2_1\" -H \"seldon-model: iris\""
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -96,10 +137,11 @@
"source": [
"%%bash\n",
"\n",
"grpcurl -d '{\"model_name\":\"iris\",\"inputs\":[{\"name\":\"input\",\"contents\":{\"fp32_contents\":[1,2,3,4]},\"datatype\":\"FP32\",\"shape\":[1,4]}]}' \\\n",
"grpcurl -d '{\"model_version\":\"xx\", \"model_name\":\"iris\",\"inputs\":[{\"name\":\"input\",\"contents\":{\"fp32_contents\":[1,2,3,4]},\"datatype\":\"FP32\",\"shape\":[1,4]}]}' \\\n",
" -plaintext \\\n",
" -import-path ../../apis/mlops/v2_dataplane/ \\\n",
" -proto v2_dataplane.proto \\\n",
" -rpc-header seldon-internal-model:iris2_1 \\\n",
" -rpc-header seldon-model:iris2 \\\n",
" 0.0.0.0:9998 inference.GRPCInferenceService/ModelInfer"
]
Expand All @@ -119,8 +161,8 @@
"\n",
"for j in {1..10};\n",
"do\n",
"echo iris$j\n",
"time curl http://0.0.0.0:9999/v2/models/iris$j/infer -H \"Content-Type: application/json\" -H \"Host: iris\"\\\n",
"curl http://0.0.0.0:9999/v2/models/iris$j/infer -H \"Content-Type: application/json\" -H \"Host: iris\" \\\n",
" -H \"seldon-internal-model: iris${i}_1\" -H \"seldon-model: iris\" \\\n",
" -d '{\"inputs\": [{\"name\": \"predict\", \"shape\": [1, 4], \"datatype\": \"FP32\", \"data\": [[1, 2, 3, 4]]}]}' &\n",
"done\n",
"\n",
Expand Down
6 changes: 4 additions & 2 deletions scheduler/pkg/agent/cache/interface.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cache

type CacheManager interface {
// evict the least priority node and return key
Evict() (string, int64, error)
// evict the least priority node and return key, this should start a transaction (lock on id)
StartEvict() (string, int64, error)
// end the transaction, if false is provided the id should be put back in the queue
EndEvict(string, int64, bool) error
// add a new node with specific id and priority/value
Add(id string, value int64) error
// add a new node with specific id and default priority/value
Expand Down
72 changes: 62 additions & 10 deletions scheduler/pkg/agent/cache/lru_cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,74 @@ import (
)

type LRUCacheManager struct {
pq PriorityQueue
mu sync.RWMutex
pq PriorityQueue
mu sync.RWMutex
itemLocks sync.Map
}

func (cache *LRUCacheManager) Evict() (string, int64, error) {
func (cache *LRUCacheManager) itemLock(id string) error {
var lock sync.RWMutex
existingLock, loaded := cache.itemLocks.LoadOrStore(id, &lock)
if loaded {
return fmt.Errorf("Model is already dirty %s", id)
}
existingLock.(*sync.RWMutex).Lock()
return nil
}

func (cache *LRUCacheManager) itemUnLock(id string) {
existingLock, loaded := cache.itemLocks.LoadAndDelete(id)
if loaded {
existingLock.(*sync.RWMutex).Unlock()
}
}

func (cache *LRUCacheManager) itemWait(id string) {
existingLock, loaded := cache.itemLocks.Load(id)
if loaded {
existingLock.(*sync.RWMutex).RLock()
defer existingLock.(*sync.RWMutex).RUnlock()
}
}

func (cache *LRUCacheManager) StartEvict() (string, int64, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.pq.Len() > 0 {
item := heap.Pop(&(cache.pq)).(*Item)
if err := cache.itemLock(item.id); err != nil {
// re-add
cache.add(item.id, item.priority)
cache.itemUnLock(item.id)
return "", 0, fmt.Errorf("cannot evict")
}
return item.id, item.priority, nil
}
return "", 0, fmt.Errorf("empty cache, cannot evict")
}

func (cache *LRUCacheManager) EndEvict(id string, value int64, rollback bool) error {
_, loaded := cache.itemLocks.Load(id)
if !loaded {
// item is not dirty, abort
return fmt.Errorf("id %s is not dirty", id)
}
defer cache.itemUnLock(id)
if rollback {
// no locking here
cache.add(id, value)
}
return nil
}

func (cache *LRUCacheManager) add(id string, value int64) {
item := &Item{
id: id,
priority: value,
}
heap.Push(&(cache.pq), item)
}

func (cache *LRUCacheManager) Add(id string, value int64) error {
if id == "" {
return fmt.Errorf("cannot use empty string")
Expand All @@ -31,11 +85,7 @@ func (cache *LRUCacheManager) Add(id string, value int64) error {
}
cache.mu.Lock()
defer cache.mu.Unlock()
item := &Item{
id: id,
priority: value,
}
heap.Push(&(cache.pq), item)
cache.add(id, value)
return nil
}

Expand Down Expand Up @@ -66,6 +116,7 @@ func (cache *LRUCacheManager) Exists(id string) bool {
// TODO: make it efficient?
cache.mu.RLock()
defer cache.mu.RUnlock()
cache.itemWait(id)
for _, item := range cache.pq {
if item.id == id {
return true
Expand Down Expand Up @@ -127,8 +178,9 @@ func MakeLRU(initItems map[string]int64) *LRUCacheManager {
}
heap.Init(&pq)
return &LRUCacheManager{
pq: pq,
mu: sync.RWMutex{},
pq: pq,
mu: sync.RWMutex{},
itemLocks: sync.Map{},
}
}

Expand Down
84 changes: 76 additions & 8 deletions scheduler/pkg/agent/cache/lru_cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func deleter(jobs <-chan int, wg *sync.WaitGroup, cache *LRUCacheManager) {

func evicter(jobs <-chan int, wg *sync.WaitGroup, cache *LRUCacheManager) {
for range jobs {
_, _, _ = cache.Evict()
k, v, _ := cache.StartEvict()
_ = cache.EndEvict(k, v, false)
wg.Done()
}
}
Expand All @@ -57,7 +58,8 @@ func randomer(jobs <-chan int, wg *sync.WaitGroup, cache *LRUCacheManager) {
modelId := fmt.Sprintf("model_%d", i)
switch rand.Intn(6) {
case 0:
_, _, _ = cache.Evict()
k, v, _ := cache.StartEvict()
_ = cache.EndEvict(k, v, false)
case 1:
_ = cache.AddDefault(modelId)
case 2:
Expand Down Expand Up @@ -92,7 +94,8 @@ func createJobs(f fn, numJobs int, numWorkers int, lruCache *LRUCacheManager) {
func checkEvictOrder(numJobs int, lruCache *LRUCacheManager, g *WithT) {
counter := numJobs - 1
for {
id, value, err := lruCache.Evict()
id, value, err := lruCache.StartEvict()
_ = lruCache.EndEvict(id, value, false)
if err != nil {
break
}
Expand Down Expand Up @@ -246,7 +249,8 @@ func TestPQCacheSmoke(t *testing.T) {
g.Expect(priority).To(Equal(int64(7)))

// evict
deleted, deletedValue, _ := lruCache.Evict()
deleted, deletedValue, _ := lruCache.StartEvict()
_ = lruCache.EndEvict(deleted, deletedValue, false)
g.Expect(deleted).To(Equal("model_1"))
g.Expect(deletedValue).To(Equal(int64(7)))

Expand All @@ -267,12 +271,15 @@ func TestLRUCacheSmoke(t *testing.T) {
_ = lruCache.AddDefault("model_1")
_ = lruCache.AddDefault("model_2")
_ = lruCache.UpdateDefault("model_1")
deleted, _, _ := lruCache.Evict()
deleted, deletedValue, _ := lruCache.StartEvict()
_ = lruCache.EndEvict(deleted, deletedValue, false)
g.Expect(deleted).To(Equal("model_2"))
_ = lruCache.AddDefault("model_3")
deleted, _, _ = lruCache.Evict()
deleted, deletedValue, _ = lruCache.StartEvict()
_ = lruCache.EndEvict(deleted, deletedValue, false)
g.Expect(deleted).To(Equal("model_1"))
deleted, _, _ = lruCache.Evict()
deleted, deletedValue, _ = lruCache.StartEvict()
_ = lruCache.EndEvict(deleted, deletedValue, false)
g.Expect(deleted).To(Equal("model_3"))

t.Logf("Done!")
Expand All @@ -284,7 +291,7 @@ func TestLRUCacheSmokeEdgeCases(t *testing.T) {
//add new/update item with default priority
lruCache := MakeLRU(map[string]int64{})

_, _, err := lruCache.Evict()
_, _, err := lruCache.StartEvict()
g.Expect(err).ToNot(BeNil())

err = lruCache.Delete("model_1")
Expand All @@ -308,3 +315,64 @@ func TestLRUCacheSmokeEdgeCases(t *testing.T) {

t.Logf("Done!")
}

func TestLRUCacheEvictTransaction(t *testing.T) {
g := NewGomegaWithT(t)

type test struct {
name string
rollback bool
}
tests := []test{
{name: "evicted_rollback", rollback: true},
{name: "evicted_ok", rollback: false},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
id := "dummy_1"

lruCache := MakeLRU(map[string]int64{})
_ = lruCache.AddDefault(id)

result := make(chan bool, 1)

_, _, _ = lruCache.StartEvict() // only one model really
go func(r chan<- bool) {
r <- lruCache.Exists(id)
}(result)

_ = lruCache.EndEvict(id, 0, test.rollback)

actualResult := <-result
g.Expect(actualResult).To(Equal(test.rollback))

})
}
}

func TestLRUCacheEvictTransactionEdgeCases(t *testing.T) {
g := NewGomegaWithT(t)

id := "model_1"

lruCache := MakeLRU(map[string]int64{})
_ = lruCache.AddDefault(id)

_, _, err := lruCache.StartEvict()
g.Expect(err).To(BeNil()) // no error here

_, _, err = lruCache.StartEvict()
g.Expect(err).To(Equal(fmt.Errorf("empty cache, cannot evict"))) // err because cache is empty

_ = lruCache.EndEvict(id, 0, true) // rollback
id2, _, _ := lruCache.StartEvict()
g.Expect(id2).To(Equal(id)) // get the same id back now

err = lruCache.EndEvict(id, 0, false)
g.Expect(err).To(BeNil()) // no error here
err = lruCache.EndEvict(id, 0, false)
g.Expect(err).ToNot(BeNil()) // error because item is not dirty anymore

g.Expect(lruCache.Exists(id)).To(Equal(false))
}
Loading

0 comments on commit 29bb402

Please sign in to comment.