Skip to content

Commit

Permalink
Agent startup bug fix (#558)
Browse files Browse the repository at this point in the history
  • Loading branch information
ukclivecox authored Nov 3, 2022
1 parent 89f1bcc commit 5b41a94
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 16 deletions.
9 changes: 5 additions & 4 deletions scheduler/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,14 @@ func main() {

promMetrics, err := metrics.NewPrometheusModelMetrics(cli.ServerName, cli.ReplicaIdx, logger)
if err != nil {
logger.WithError(err).Fatalf("Can't create prometheus metrics")
logger.WithError(err).Fatal("Can't create prometheus metrics")
}
go func() {
err := promMetrics.Start(cli.MetricsPort)
if errors.Is(err, http.ErrServerClosed) {
return
}
logger.WithError(err).Error("Can't start metrics server")
logger.WithError(err).Fatal("Can't start metrics server")
close(done)
}()
defer func() { _ = promMetrics.Stop() }()
Expand Down Expand Up @@ -259,14 +259,14 @@ func main() {
// Wait for required services to be ready
err = client.WaitReady()
if err != nil {
logger.WithError(err).Errorf("Failed to wait for all agent dependent services to be ready")
logger.WithError(err).Fatal("Failed to wait for all agent dependent services to be ready")
close(done)
}

// Now we are ready start config listener
err = rcloneClient.StartConfigListener(agentConfigHandler)
if err != nil {
logger.WithError(err).Error("Failed to initialise rclone config listener")
logger.WithError(err).Fatal("Failed to initialise rclone config listener")
close(done)
}

Expand All @@ -281,4 +281,5 @@ func main() {

// Wait for completion
<-done
logger.Warning("Agent shutting down")
}
4 changes: 3 additions & 1 deletion scheduler/pkg/agent/agent_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type agentDebug struct {

func NewAgentDebug(logger log.FieldLogger, port uint) *agentDebug {
return &agentDebug{
logger: logger,
logger: logger.WithField("source", "AgentDebug"),
port: port,
}
}
Expand Down Expand Up @@ -71,10 +71,12 @@ func (cd *agentDebug) Start() error {
}

func (cd *agentDebug) Stop() error {
cd.logger.Info("Start graceful shutdown")
cd.mu.Lock()
defer cd.mu.Unlock()
cd.grpcServer.GracefulStop()
cd.serverReady = false
cd.logger.Info("Finished graceful shutdown")
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions scheduler/pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (c *Client) WaitReady() error {
//TODO make retry configurable
err := backoff.RetryNotify(c.ModelRepository.Ready, backoff.NewExponentialBackOff(), logFailure)
if err != nil {
logger.WithError(err).Error("Failed to wait for model repository to be ready")
return err
}

Expand All @@ -210,37 +211,44 @@ func (c *Client) WaitReady() error {
logger.Infof("Waiting for inference server to be ready")
err = backoff.RetryNotify(c.stateManager.v2Client.Ready, backoff.NewExponentialBackOff(), logFailure)
if err != nil {
logger.WithError(err).Error("Failed to wait for inference server to be ready")
return err
}

// Unload any existing models on server to ensure we start in a clean state
logger.Infof("Unloading any existing models")
err = c.UnloadAllModels()
if err != nil {
return err
}

// http reverse proxy
if err := startSubService(c.rpHTTP, logger); err != nil {
logger.WithError(err).Error("Failed to start http proxy")
return err
}

// grpc reverse proxy
if err := startSubService(c.rpGRPC, logger); err != nil {
logger.WithError(err).Error("Failed to start grpc proxy")
return err
}

// agent debug service
if err := startSubService(c.agentDebugService, logger); err != nil {
logger.WithError(err).Error("Failed to start agent debug service")
return err
}

// model scaling service
if err := startSubService(c.modelScalingService, logger); err != nil {
logger.WithError(err).Error("Failed to start scaling service")
return err
}

// drainer service
if err := startSubService(c.drainerService, logger); err != nil {
logger.WithError(err).Error("Failed to start drainer service")
return err
}

Expand Down
4 changes: 3 additions & 1 deletion scheduler/pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type AgentConfigHandler struct {

func NewAgentConfigHandler(configPath string, namespace string, logger log.FieldLogger, clientset kubernetes.Interface) (*AgentConfigHandler, error) {
configHandler := &AgentConfigHandler{
logger: logger,
logger: logger.WithField("source", "AgentConfigHandler"),
namespace: namespace,
}
if configPath != "" {
Expand Down Expand Up @@ -110,6 +110,7 @@ func (a *AgentConfigHandler) Close() error {
if a == nil {
return nil
}
a.logger.Info("Starting graceful shutdown")
if a.fileWatcherDone != nil {
close(a.fileWatcherDone)
}
Expand All @@ -122,6 +123,7 @@ func (a *AgentConfigHandler) Close() error {
for _, c := range a.listeners {
close(c)
}
a.logger.Infof("Finished graceful shutdown")
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion scheduler/pkg/agent/drainservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewDrainerService(logger log.FieldLogger, port uint) *DrainerService {
schedulerWg.Add(1)
return &DrainerService{
port: port,
logger: logger,
logger: logger.WithField("source", "DrainerService"),
serverReady: false,
triggered: false,
drainingFinishedWg: &schedulerWg,
Expand Down Expand Up @@ -80,11 +80,13 @@ func (drainer *DrainerService) Ready() bool {
}

func (drainer *DrainerService) Stop() error {
drainer.logger.Info("Start graceful shutdown")
// Shutdown is graceful
drainer.muServerReady.Lock()
defer drainer.muServerReady.Unlock()
err := drainer.server.Shutdown(context.Background())
drainer.serverReady = false
drainer.logger.Info("Finished graceful shutdown")
return err
}

Expand Down
4 changes: 3 additions & 1 deletion scheduler/pkg/agent/modelscaling/stats_analyser.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewStatsAnalyserService(
mu: sync.RWMutex{},
isReady: false,
events: make(chan *ModelScalingEvent, channelBufferSize),
logger: logger,
logger: logger.WithField("source", "StatsAnalyzerService"),
}
}

Expand All @@ -81,7 +81,9 @@ func (ss *StatsAnalyserService) Ready() bool {
}

func (ss *StatsAnalyserService) Stop() error {
ss.logger.Info("Start graceful shutdown")
ss.done <- true
ss.logger.Info("Finished graceful shutdown")
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions scheduler/pkg/agent/rproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,13 @@ func (rp *reverseHTTPProxy) getBackEndPath() *url.URL {
}

func (rp *reverseHTTPProxy) Stop() error {
rp.logger.Info("Start graceful shutdown")
// Shutdown is graceful
rp.mu.Lock()
defer rp.mu.Unlock()
err := rp.server.Shutdown(context.Background())
rp.serverReady = false
rp.logger.Info("Finished graceful shutdown")
return err
}

Expand Down
2 changes: 2 additions & 0 deletions scheduler/pkg/agent/rproxy_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@ func (rp *reverseGRPCProxy) Start() error {
}

func (rp *reverseGRPCProxy) Stop() error {
rp.logger.Info("Start graceful shutdown")
// Shutdown is graceful
rp.mu.Lock()
defer rp.mu.Unlock()
rp.grpcServer.GracefulStop()
rp.serverReady = false
rp.logger.Info("Finished graceful shutdown")
return nil
}

Expand Down
39 changes: 31 additions & 8 deletions scheduler/pkg/agent/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type V2ServerError struct {
}

var ErrV2BadRequest = errors.New("V2 Bad Request")
var ErrServerNotReady = errors.New("Server not ready")

func getV2GrpcConnection(host string, plainTxtPort int) (*grpc.ClientConn, error) {
retryOpts := []grpc_retry.CallOption{
Expand Down Expand Up @@ -305,24 +306,46 @@ func (v *V2Client) unloadModelGrpc(name string) *V2Err {
}

func (v *V2Client) Ready() error {
var ready bool
var err error
if v.isGrpc {
return v.readyGrpc()
ready, err = v.readyGrpc()
} else {
return v.readyHttp()
ready, err = v.readyHttp()
}
if err != nil {
v.logger.WithError(err).Debugf("Server ready check failed on error")
return err
}
v.logger.Debugf("Server ready check returned with value %v", ready)
if ready {
return nil
} else {
return ErrServerNotReady
}
}

func (v *V2Client) readyHttp() error {
_, err := http.Get(v.getUrl("v2/health/ready").String())
return err
func (v *V2Client) readyHttp() (bool, error) {
res, err := http.Get(v.getUrl("v2/health/ready").String())
if err != nil {
return false, err
}
if res.StatusCode == http.StatusOK {
return true, nil
} else {
return false, nil
}
}

func (v *V2Client) readyGrpc() error {
func (v *V2Client) readyGrpc() (bool, error) {
ctx := context.Background()
req := &v2.ServerReadyRequest{}

_, err := v.grpcClient.ServerReady(ctx, req)
return err
res, err := v.grpcClient.ServerReady(ctx, req)
if err != nil {
return false, err
}
return res.Ready, nil
}

func (v *V2Client) GetModels() ([]MLServerModelInfo, error) {
Expand Down
1 change: 1 addition & 0 deletions scheduler/pkg/metrics/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,5 +500,6 @@ func (pm *PrometheusMetrics) Start(port int) error {
}

func (pm *PrometheusMetrics) Stop() error {
pm.logger.Info("Graceful shutdown")
return pm.server.Shutdown(context.Background())
}

0 comments on commit 5b41a94

Please sign in to comment.