Skip to content

Commit

Permalink
Add server status update batching (#307)
Browse files Browse the repository at this point in the history
* Rename method to reflect internal-only side effects

* Add server event batcher trigger & default wait duration to server

* Add map for pending server events to send

* Refactor server event batch control vars to ServerEventStream definition

This is a more natural place for these fields than the top-level SchedulerServer type,
which is responsible for various types of streams.
This then provides a template for implementing similar batching functionality for
other stream types in the scheduler server.

* Initialise server event batching control vars in object initialiser

* Refactor server event update logic into fast-path & triggered subscription updater

* Allow configuration of batch wait duration for server status updates

This is beneficial for testing or, in the future, arguments being passed in.
  • Loading branch information
agrski authored Jun 24, 2022
1 parent 6df366a commit f481a65
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
15 changes: 12 additions & 3 deletions scheduler/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"sync"
"time"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

Expand Down Expand Up @@ -32,6 +33,7 @@ const (
serverEventHandlerName = "scheduler.server.servers"
experimentEventHandlerName = "scheduler.server.experiments"
pipelineEventHandlerName = "scheduler.server.pipelines"
defaultBatchWaitMillis = 250 * time.Millisecond
)

var (
Expand All @@ -57,8 +59,12 @@ type ModelEventStream struct {
}

type ServerEventStream struct {
mu sync.Mutex
streams map[pb.Scheduler_SubscribeServerStatusServer]*ServerSubscription
mu sync.Mutex
streams map[pb.Scheduler_SubscribeServerStatusServer]*ServerSubscription
batchWaitMillis time.Duration
trigger *time.Timer
pendingEvents map[string]struct{}
pendingLock sync.Mutex
}

type ExperimentEventStream struct {
Expand Down Expand Up @@ -127,7 +133,10 @@ func NewSchedulerServer(
streams: make(map[pb.Scheduler_SubscribeModelStatusServer]*ModelSubscription),
},
serverEventStream: ServerEventStream{
streams: make(map[pb.Scheduler_SubscribeServerStatusServer]*ServerSubscription),
streams: make(map[pb.Scheduler_SubscribeServerStatusServer]*ServerSubscription),
batchWaitMillis: defaultBatchWaitMillis,
trigger: nil,
pendingEvents: map[string]struct{}{},
},
pipelineEventStream: PipelineEventStream{
streams: make(map[pb.Scheduler_SubscribePipelineStatusServer]*PipelineSubscription),
Expand Down
55 changes: 40 additions & 15 deletions scheduler/pkg/server/server_status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package server

import (
"time"

"github.com/seldonio/seldon-core/scheduler/pkg/coordinator"

pb "github.com/seldonio/seldon-core/scheduler/apis/mlops/scheduler"
Expand Down Expand Up @@ -144,7 +146,7 @@ func (s *SchedulerServer) handleServerEvent(event coordinator.ModelEventMsg) {

// TODO - Should this spawn a goroutine?
// Surely if we do we're risking reordering of events, e.g. load/unload -> unload/load?
err := s.sendServerStatusEvent(event)
err := s.updateServerStatus(event)
if err != nil {
logger.WithError(err).Errorf("Failed to update server status for model event %s", event.String())
}
Expand All @@ -158,8 +160,9 @@ func (s *SchedulerServer) StopSendServerEvents() {
}
}

func (s *SchedulerServer) sendServerStatusEvent(evt coordinator.ModelEventMsg) error {
func (s *SchedulerServer) updateServerStatus(evt coordinator.ModelEventMsg) error {
logger := s.logger.WithField("func", "sendServerStatusEvent")

model, err := s.modelStore.GetModel(evt.ModelName)
if err != nil {
return err
Expand All @@ -174,24 +177,46 @@ func (s *SchedulerServer) sendServerStatusEvent(evt coordinator.ModelEventMsg) e
return nil
}

ss, err := s.modelStore.GetServer(modelVersion.Server(), true, true)
if err != nil {
return err
s.serverEventStream.pendingLock.Lock()
s.serverEventStream.pendingEvents[modelVersion.Server()] = struct{}{}
if s.serverEventStream.trigger == nil {
s.serverEventStream.trigger = time.AfterFunc(defaultBatchWaitMillis, s.sendServerStatus)
}
if ss == nil {
logger.Warnf("Failed to get server %s", modelVersion.Server())
return nil
}
ssr := createServerStatusResponse(ss)
s.serverEventStream.pendingLock.Unlock()

return nil
}

func (s *SchedulerServer) sendServerStatus() {
logger := s.logger.WithField("func", "sendServerStatus")

// Sending events may be slow, so allow a new batch to start building as we send.
s.serverEventStream.pendingLock.Lock()
s.serverEventStream.trigger = nil
pendingServers := s.serverEventStream.pendingEvents
s.serverEventStream.pendingEvents = map[string]struct{}{}
s.serverEventStream.pendingLock.Unlock()

// Inform subscriber
s.serverEventStream.mu.Lock()
for stream, subscription := range s.serverEventStream.streams {
err := stream.Send(ssr)
for serverName := range pendingServers {
server, err := s.modelStore.GetServer(serverName, true, true)
if err != nil {
logger.WithError(err).Errorf("Failed to send server status event to %s", subscription.name)
logger.Errorf("Failed to get server %s", serverName)
continue
}
if server == nil {
logger.Warnf("Server %s does not exist", serverName)
continue
}
ssr := createServerStatusResponse(server)

for stream, subscription := range s.serverEventStream.streams {
err := stream.Send(ssr)
if err != nil {
logger.WithError(err).Errorf("Failed to send server status event to %s", subscription.name)
}
}
}
s.serverEventStream.mu.Unlock()

return nil
}

0 comments on commit f481a65

Please sign in to comment.