Skip to content

Commit

Permalink
Allow for more informative scheduling errors (#182)
Browse files Browse the repository at this point in the history
* Allow for more informative scheduling errors

* review fix
  • Loading branch information
ukclivecox authored May 5, 2022
1 parent 4f9b6a2 commit b0461c5
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 12 deletions.
9 changes: 9 additions & 0 deletions samples/models/error-bad-capabilities.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: badcapabilities
namespace: seldon-mesh
spec:
storageUri: "gs://seldon-models/mlserver/iris"
requirements:
- foobar
10 changes: 9 additions & 1 deletion scheduler/pkg/scheduler/filters/deletedserver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package filters

import "github.com/seldonio/seldon-core/scheduler/pkg/store"
import (
"fmt"

"github.com/seldonio/seldon-core/scheduler/pkg/store"
)

type DeletedServerFilter struct{}

Expand All @@ -11,3 +15,7 @@ func (e DeletedServerFilter) Name() string {
func (e DeletedServerFilter) Filter(model *store.ModelVersion, server *store.ServerSnapshot) bool {
return server.ExpectedReplicas != 0
}

func (e DeletedServerFilter) Description(model *store.ModelVersion, server *store.ServerSnapshot) string {
return fmt.Sprintf("expected replicas %d != 0", server.ExpectedReplicas)
}
10 changes: 9 additions & 1 deletion scheduler/pkg/scheduler/filters/replicamemory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package filters

import "github.com/seldonio/seldon-core/scheduler/pkg/store"
import (
"fmt"

"github.com/seldonio/seldon-core/scheduler/pkg/store"
)

type AvailableMemoryReplicaFilter struct{}

Expand All @@ -11,3 +15,7 @@ func (r AvailableMemoryReplicaFilter) Name() string {
func (r AvailableMemoryReplicaFilter) Filter(model *store.ModelVersion, replica *store.ServerReplica) bool {
return model.GetRequiredMemory() <= replica.GetAvailableMemory()
}

func (s AvailableMemoryReplicaFilter) Description(model *store.ModelVersion, replica *store.ServerReplica) string {
return fmt.Sprintf("model memory %d replica memory %d", model.GetRequiredMemory(), replica.GetAvailableMemory())
}
5 changes: 5 additions & 0 deletions scheduler/pkg/scheduler/filters/replicarequirements.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filters

import (
"fmt"
"strings"

"github.com/seldonio/seldon-core/scheduler/pkg/store"
Expand All @@ -27,3 +28,7 @@ func (s RequirementsReplicaFilter) Filter(model *store.ModelVersion, replica *st
}
return true
}

func (s RequirementsReplicaFilter) Description(model *store.ModelVersion, replica *store.ServerReplica) string {
return fmt.Sprintf("model requirements %v replica capabilities %v", model.GetRequirements(), replica.GetCapabilities())
}
15 changes: 14 additions & 1 deletion scheduler/pkg/scheduler/filters/sharing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package filters

import "github.com/seldonio/seldon-core/scheduler/pkg/store"
import (
"fmt"

"github.com/seldonio/seldon-core/scheduler/pkg/store"
)

type SharingServerFilter struct{}

Expand All @@ -12,3 +16,12 @@ func (e SharingServerFilter) Filter(model *store.ModelVersion, server *store.Ser
requestedServer := model.GetRequestedServer()
return (requestedServer == nil && server.Shared) || (requestedServer != nil && *requestedServer == server.Name)
}

func (e SharingServerFilter) Description(model *store.ModelVersion, server *store.ServerSnapshot) string {
requestedServer := model.GetRequestedServer()
if requestedServer != nil {
return fmt.Sprintf("requested server %s == %s", *requestedServer, server.Name)
} else {
return fmt.Sprintf("sharing %v", server.Shared)
}
}
2 changes: 2 additions & 0 deletions scheduler/pkg/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ type Scheduler interface {
type ReplicaFilter interface {
Name() string
Filter(model *store.ModelVersion, replica *store.ServerReplica) bool
Description(model *store.ModelVersion, replica *store.ServerReplica) string
}

type ServerFilter interface {
Name() string
Filter(model *store.ModelVersion, server *store.ServerSnapshot) bool
Description(model *store.ModelVersion, server *store.ServerSnapshot) string
}
32 changes: 23 additions & 9 deletions scheduler/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,22 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
}
delete(s.failedModels, modelName) // Ensure model removed from failed models if its there
} else {
var debugTrail []string
var filteredServers []*store.ServerSnapshot
// Get all servers
servers, err := s.store.GetServers()
if err != nil {
return err
}
// Filter and sort servers
filteredServers := s.filterServers(latestModel, servers)
filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail)
s.sortServers(latestModel, filteredServers)
ok := false
logger.Debugf("Model %s candidate servers %v", modelName, filteredServers)
// For each server filter and sort replicas and attempt schedule if enough replicas
for _, candidateServer := range filteredServers {
candidateReplicas := s.filterReplicas(latestModel, candidateServer)
var candidateReplicas *sorters.CandidateServer
candidateReplicas, debugTrail = s.filterReplicas(latestModel, candidateServer, debugTrail)
if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() {
continue
}
Expand All @@ -135,7 +138,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error {
}
if !ok {
s.store.FailedScheduling(latestModel, "Failed to schedule")
return fmt.Errorf("failed to schedule model %s", modelName)
return fmt.Errorf("failed to schedule model %s. %v", modelName, debugTrail)
}
}

Expand Down Expand Up @@ -191,14 +194,19 @@ func (s *SimpleScheduler) sortReplicas(candidateServer *sorters.CandidateServer)
}

// Filter servers for this model
func (s *SimpleScheduler) filterServers(model *store.ModelVersion, servers []*store.ServerSnapshot) []*store.ServerSnapshot {
func (s *SimpleScheduler) filterServers(model *store.ModelVersion, servers []*store.ServerSnapshot, debugTrail []string) ([]*store.ServerSnapshot, []string) {
logger := s.logger.WithField("func", "filterServer")
var filteredServers []*store.ServerSnapshot
for _, server := range servers {
ok := true
for _, serverFilter := range s.serverFilters {
if !serverFilter.Filter(model, server) {
logger.Debugf("Scheduling for %s failed replica filter %s for server %s", model.Key(), serverFilter.Name(), server.Name)
msg := fmt.Sprintf("failed server filter %s for server replica %s : %s",
serverFilter.Name(),
server.Name,
serverFilter.Description(model, server))
logger.Debugf(msg)
debugTrail = append(debugTrail, msg)
ok = false
break
}
Expand All @@ -207,17 +215,23 @@ func (s *SimpleScheduler) filterServers(model *store.ModelVersion, servers []*st
filteredServers = append(filteredServers, server)
}
}
return filteredServers
return filteredServers, debugTrail
}

func (s *SimpleScheduler) filterReplicas(model *store.ModelVersion, server *store.ServerSnapshot) *sorters.CandidateServer {
func (s *SimpleScheduler) filterReplicas(model *store.ModelVersion, server *store.ServerSnapshot, debugTrail []string) (*sorters.CandidateServer, []string) {
logger := s.logger.WithField("func", "filterReplicas")
candidateServer := sorters.CandidateServer{Model: model, Server: server}
for _, replica := range server.Replicas {
ok := true
for _, replicaFilter := range s.replicaFilters {
if !replicaFilter.Filter(model, replica) {
logger.Debugf("Scheduling for %s failed replica filter %s for server replica %s:%d", model.Key(), replicaFilter.Name(), server.Name, replica.GetReplicaIdx())
msg := fmt.Sprintf("failed replica filter %s for server replica %s:%d : %s",
replicaFilter.Name(),
server.Name,
replica.GetReplicaIdx(),
replicaFilter.Description(model, replica))
logger.Debugf(msg)
debugTrail = append(debugTrail, msg)
ok = false
break
}
Expand All @@ -226,5 +240,5 @@ func (s *SimpleScheduler) filterReplicas(model *store.ModelVersion, server *stor
candidateServer.ChosenReplicas = append(candidateServer.ChosenReplicas, replica)
}
}
return &candidateServer
return &candidateServer, debugTrail
}

0 comments on commit b0461c5

Please sign in to comment.