Skip to content

Commit

Permalink
Update server snapshot creation in scheduler (#214)
Browse files Browse the repository at this point in the history
* Update server snapshot creation in scheduler

* review comments
  • Loading branch information
ukclivecox authored May 18, 2022
1 parent b370f3a commit 485bce0
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 12 deletions.
14 changes: 2 additions & 12 deletions scheduler/pkg/store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,22 +190,12 @@ func (m *MemoryStore) RemoveModel(req *pb.UnloadModelRequest) error {
}
}

func createServerSnapshot(server *Server) *ServerSnapshot {
return &ServerSnapshot{
Name: server.name,
Replicas: server.replicas,
Shared: server.shared,
ExpectedReplicas: server.expectedReplicas,
KubernetesMeta: server.kubernetesMeta,
}
}

func (m *MemoryStore) GetServers() ([]*ServerSnapshot, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var servers []*ServerSnapshot
for _, server := range m.store.servers {
servers = append(servers, createServerSnapshot(server))
servers = append(servers, server.CreateSnapshot())
}
return servers, nil
}
Expand All @@ -217,7 +207,7 @@ func (m *MemoryStore) GetServer(serverKey string) (*ServerSnapshot, error) {
if server == nil {
return nil, nil
} else {
return createServerSnapshot(server), nil
return server.CreateSnapshot(), nil
}
}

Expand Down
35 changes: 35 additions & 0 deletions scheduler/pkg/store/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ type Server struct {
kubernetesMeta *pb.KubernetesMeta
}

func (s *Server) CreateSnapshot() *ServerSnapshot {
replicas := make(map[int]*ServerReplica, len(s.replicas))
for k, v := range s.replicas {
replicas[k] = v.createSnapshot()
}
return &ServerSnapshot{
Name: s.name,
Replicas: replicas,
Shared: s.shared,
ExpectedReplicas: s.expectedReplicas,
KubernetesMeta: proto.Clone(s.kubernetesMeta).(*pb.KubernetesMeta),
}
}

func (s *Server) SetExpectedReplicas(replicas int) {
s.expectedReplicas = replicas
}
Expand Down Expand Up @@ -462,6 +476,27 @@ func (s *Server) GetReplicaInferenceHttpPort(idx int) int32 {
return s.replicas[idx].inferenceHttpPort
}

func (s *ServerReplica) createSnapshot() *ServerReplica {
capabilities := make([]string, len(s.capabilities))
copy(capabilities, s.capabilities)
loadedModels := make(map[ModelVersionID]bool, len(s.loadedModels))
for k, v := range s.loadedModels {
loadedModels[k] = v
}
return &ServerReplica{
inferenceSvc: s.inferenceSvc,
inferenceHttpPort: s.inferenceHttpPort,
inferenceGrpcPort: s.inferenceGrpcPort,
replicaIdx: s.replicaIdx,
server: nil, //TODO change ServerReplica to snapshot struct
capabilities: capabilities,
memory: s.memory,
availableMemory: s.availableMemory,
loadedModels: loadedModels,
overCommitPercentage: s.overCommitPercentage,
}
}

func (s *ServerReplica) GetLoadedModelVersions() []ModelVersionID {
var models []ModelVersionID
for model := range s.loadedModels {
Expand Down
37 changes: 37 additions & 0 deletions scheduler/pkg/store/mesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package store
import (
"testing"

pb "github.com/seldonio/seldon-core/scheduler/apis/mlops/scheduler"

. "github.com/onsi/gomega"
)

Expand Down Expand Up @@ -36,3 +38,38 @@ func TestCleanCapabilities(t *testing.T) {
})
}
}

func TestCreateSnapshot(t *testing.T) {
g := NewGomegaWithT(t)

server := &Server{
name: "test",
replicas: map[int]*ServerReplica{
0: {
inferenceSvc: "svc",
loadedModels: map[ModelVersionID]bool{
ModelVersionID{Name: "model1", Version: 1}: true,
ModelVersionID{Name: "model2", Version: 2}: true,
},
},
},
kubernetesMeta: &pb.KubernetesMeta{Namespace: "default"},
}

snapshot := server.CreateSnapshot()

server.replicas[1] = &ServerReplica{
inferenceSvc: "svc",
loadedModels: map[ModelVersionID]bool{
ModelVersionID{Name: "model3", Version: 1}: true,
ModelVersionID{Name: "model4", Version: 2}: true,
},
}
server.name = "foo"
server.kubernetesMeta.Namespace = "test"

g.Expect(snapshot.Name).To(Equal("test"))
g.Expect(len(snapshot.Replicas)).To(Equal(1))
g.Expect(snapshot.KubernetesMeta.Namespace).To(Equal("default"))

}

0 comments on commit 485bce0

Please sign in to comment.