diff --git a/samples/models/error-bad-capabilities.yaml b/samples/models/error-bad-capabilities.yaml new file mode 100644 index 0000000000..1c70f19644 --- /dev/null +++ b/samples/models/error-bad-capabilities.yaml @@ -0,0 +1,9 @@ +apiVersion: mlops.seldon.io/v1alpha1 +kind: Model +metadata: + name: badcapabilities + namespace: seldon-mesh +spec: + storageUri: "gs://seldon-models/mlserver/iris" + requirements: + - foobar diff --git a/scheduler/pkg/scheduler/filters/deletedserver.go b/scheduler/pkg/scheduler/filters/deletedserver.go index a008ab01e8..d616d3f24a 100644 --- a/scheduler/pkg/scheduler/filters/deletedserver.go +++ b/scheduler/pkg/scheduler/filters/deletedserver.go @@ -1,6 +1,10 @@ package filters -import "github.com/seldonio/seldon-core/scheduler/pkg/store" +import ( + "fmt" + + "github.com/seldonio/seldon-core/scheduler/pkg/store" +) type DeletedServerFilter struct{} @@ -11,3 +15,7 @@ func (e DeletedServerFilter) Name() string { func (e DeletedServerFilter) Filter(model *store.ModelVersion, server *store.ServerSnapshot) bool { return server.ExpectedReplicas != 0 } + +func (e DeletedServerFilter) Description(model *store.ModelVersion, server *store.ServerSnapshot) string { + return fmt.Sprintf("expected replicas %d != 0", server.ExpectedReplicas) +} diff --git a/scheduler/pkg/scheduler/filters/replicamemory.go b/scheduler/pkg/scheduler/filters/replicamemory.go index 9e0e508515..f564ff9ca9 100644 --- a/scheduler/pkg/scheduler/filters/replicamemory.go +++ b/scheduler/pkg/scheduler/filters/replicamemory.go @@ -1,6 +1,10 @@ package filters -import "github.com/seldonio/seldon-core/scheduler/pkg/store" +import ( + "fmt" + + "github.com/seldonio/seldon-core/scheduler/pkg/store" +) type AvailableMemoryReplicaFilter struct{} @@ -11,3 +15,7 @@ func (r AvailableMemoryReplicaFilter) Name() string { func (r AvailableMemoryReplicaFilter) Filter(model *store.ModelVersion, replica *store.ServerReplica) bool { return model.GetRequiredMemory() <= replica.GetAvailableMemory() } + +func (s AvailableMemoryReplicaFilter) Description(model *store.ModelVersion, replica *store.ServerReplica) string { + return fmt.Sprintf("model memory %d replica memory %d", model.GetRequiredMemory(), replica.GetAvailableMemory()) +} diff --git a/scheduler/pkg/scheduler/filters/replicarequirements.go b/scheduler/pkg/scheduler/filters/replicarequirements.go index b2a6be43ce..6b9943033f 100644 --- a/scheduler/pkg/scheduler/filters/replicarequirements.go +++ b/scheduler/pkg/scheduler/filters/replicarequirements.go @@ -1,6 +1,7 @@ package filters import ( + "fmt" "strings" "github.com/seldonio/seldon-core/scheduler/pkg/store" @@ -27,3 +28,7 @@ func (s RequirementsReplicaFilter) Filter(model *store.ModelVersion, replica *st } return true } + +func (s RequirementsReplicaFilter) Description(model *store.ModelVersion, replica *store.ServerReplica) string { + return fmt.Sprintf("model requirements %v replica capabilities %v", model.GetRequirements(), replica.GetCapabilities()) +} diff --git a/scheduler/pkg/scheduler/filters/sharing.go b/scheduler/pkg/scheduler/filters/sharing.go index ec695382c2..0e57ee6113 100644 --- a/scheduler/pkg/scheduler/filters/sharing.go +++ b/scheduler/pkg/scheduler/filters/sharing.go @@ -1,6 +1,10 @@ package filters -import "github.com/seldonio/seldon-core/scheduler/pkg/store" +import ( + "fmt" + + "github.com/seldonio/seldon-core/scheduler/pkg/store" +) type SharingServerFilter struct{} @@ -12,3 +16,12 @@ func (e SharingServerFilter) Filter(model *store.ModelVersion, server *store.Ser requestedServer := model.GetRequestedServer() return (requestedServer == nil && server.Shared) || (requestedServer != nil && *requestedServer == server.Name) } + +func (e SharingServerFilter) Description(model *store.ModelVersion, server *store.ServerSnapshot) string { + requestedServer := model.GetRequestedServer() + if requestedServer != nil { + return fmt.Sprintf("requested server %s == %s", *requestedServer, server.Name) + } else { + return fmt.Sprintf("sharing %v", server.Shared) + } +} diff --git a/scheduler/pkg/scheduler/interface.go b/scheduler/pkg/scheduler/interface.go index 3834c1acfa..1bb4618579 100644 --- a/scheduler/pkg/scheduler/interface.go +++ b/scheduler/pkg/scheduler/interface.go @@ -12,9 +12,11 @@ type Scheduler interface { type ReplicaFilter interface { Name() string Filter(model *store.ModelVersion, replica *store.ServerReplica) bool + Description(model *store.ModelVersion, replica *store.ServerReplica) string } type ServerFilter interface { Name() string Filter(model *store.ModelVersion, server *store.ServerSnapshot) bool + Description(model *store.ModelVersion, server *store.ServerSnapshot) string } diff --git a/scheduler/pkg/scheduler/scheduler.go b/scheduler/pkg/scheduler/scheduler.go index 24d64eca74..036ca87937 100644 --- a/scheduler/pkg/scheduler/scheduler.go +++ b/scheduler/pkg/scheduler/scheduler.go @@ -107,19 +107,22 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { } delete(s.failedModels, modelName) // Ensure model removed from failed models if its there } else { + var debugTrail []string + var filteredServers []*store.ServerSnapshot // Get all servers servers, err := s.store.GetServers() if err != nil { return err } // Filter and sort servers - filteredServers := s.filterServers(latestModel, servers) + filteredServers, debugTrail = s.filterServers(latestModel, servers, debugTrail) s.sortServers(latestModel, filteredServers) ok := false logger.Debugf("Model %s candidate servers %v", modelName, filteredServers) // For each server filter and sort replicas and attempt schedule if enough replicas for _, candidateServer := range filteredServers { - candidateReplicas := s.filterReplicas(latestModel, candidateServer) + var candidateReplicas *sorters.CandidateServer + candidateReplicas, debugTrail = s.filterReplicas(latestModel, candidateServer, debugTrail) if len(candidateReplicas.ChosenReplicas) < latestModel.DesiredReplicas() { continue } @@ -135,7 +138,7 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) error { } if !ok { s.store.FailedScheduling(latestModel, "Failed to schedule") - return fmt.Errorf("failed to schedule model %s", modelName) + return fmt.Errorf("failed to schedule model %s. %v", modelName, debugTrail) } } @@ -191,14 +194,19 @@ func (s *SimpleScheduler) sortReplicas(candidateServer *sorters.CandidateServer) } // Filter servers for this model -func (s *SimpleScheduler) filterServers(model *store.ModelVersion, servers []*store.ServerSnapshot) []*store.ServerSnapshot { +func (s *SimpleScheduler) filterServers(model *store.ModelVersion, servers []*store.ServerSnapshot, debugTrail []string) ([]*store.ServerSnapshot, []string) { logger := s.logger.WithField("func", "filterServer") var filteredServers []*store.ServerSnapshot for _, server := range servers { ok := true for _, serverFilter := range s.serverFilters { if !serverFilter.Filter(model, server) { - logger.Debugf("Scheduling for %s failed replica filter %s for server %s", model.Key(), serverFilter.Name(), server.Name) + msg := fmt.Sprintf("failed server filter %s for server replica %s : %s", + serverFilter.Name(), + server.Name, + serverFilter.Description(model, server)) + logger.Debugf(msg) + debugTrail = append(debugTrail, msg) ok = false break } @@ -207,17 +215,23 @@ func (s *SimpleScheduler) filterServers(model *store.ModelVersion, servers []*st filteredServers = append(filteredServers, server) } } - return filteredServers + return filteredServers, debugTrail } -func (s *SimpleScheduler) filterReplicas(model *store.ModelVersion, server *store.ServerSnapshot) *sorters.CandidateServer { +func (s *SimpleScheduler) filterReplicas(model *store.ModelVersion, server *store.ServerSnapshot, debugTrail []string) (*sorters.CandidateServer, []string) { logger := s.logger.WithField("func", "filterReplicas") candidateServer := sorters.CandidateServer{Model: model, Server: server} for _, replica := range server.Replicas { ok := true for _, replicaFilter := range s.replicaFilters { if !replicaFilter.Filter(model, replica) { - logger.Debugf("Scheduling for %s failed replica filter %s for server replica %s:%d", model.Key(), replicaFilter.Name(), server.Name, replica.GetReplicaIdx()) + msg := fmt.Sprintf("failed replica filter %s for server replica %s:%d : %s", + replicaFilter.Name(), + server.Name, + replica.GetReplicaIdx(), + replicaFilter.Description(model, replica)) + logger.Debugf(msg) + debugTrail = append(debugTrail, msg) ok = false break } @@ -226,5 +240,5 @@ func (s *SimpleScheduler) filterReplicas(model *store.ModelVersion, server *stor candidateServer.ChosenReplicas = append(candidateServer.ChosenReplicas, replica) } } - return &candidateServer + return &candidateServer, debugTrail }