From 7077f7ffa6e4be3af8e13b7ed5bab601c1551f89 Mon Sep 17 00:00:00 2001 From: Jesse Claven Date: Thu, 31 Aug 2023 14:12:06 +0100 Subject: [PATCH] test: Ensure agent client starts (#5112) I've also added some comments based on my understanding of the code. --- scheduler/cmd/scheduler/main.go | 19 +++++----- scheduler/docs/design.md | 32 ++++++++--------- scheduler/pkg/agent/client.go | 11 ++++-- scheduler/pkg/agent/client_test.go | 36 ++++++++++++++----- scheduler/pkg/agent/rclone/rclone.go | 5 +++ .../agent/repository/triton/triton_test.go | 9 +++-- scheduler/pkg/kafka/dataflow/server_test.go | 10 +++--- scheduler/pkg/store/pipeline/utils_test.go | 14 ++++---- 8 files changed, 80 insertions(+), 56 deletions(-) diff --git a/scheduler/cmd/scheduler/main.go b/scheduler/cmd/scheduler/main.go index d139f88ac3..77d97f860c 100644 --- a/scheduler/cmd/scheduler/main.go +++ b/scheduler/cmd/scheduler/main.go @@ -24,19 +24,19 @@ import ( "time" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" + envoyServerControlPlaneV3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" log "github.com/sirupsen/logrus" "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent" "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/server" + envoyServer "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/server" "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache" "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/config" "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/dataflow" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler/cleaner" - server2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/server" + schedulerServer "github.com/seldonio/seldon-core/scheduler/v2/pkg/server" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline" @@ -158,8 +158,8 @@ func main() { // Create a cache xdsCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger) ctx := context.Background() - srv := serverv3.NewServer(ctx, xdsCache, nil) - xdsServer := server.NewXDSServer(srv, logger) + srv := envoyServerControlPlaneV3.NewServer(ctx, xdsCache, nil) + xdsServer := envoyServer.NewXDSServer(srv, logger) err = xdsServer.StartXDSServer(envoyPort) if err != nil { log.WithError(err).Fatalf("Failed to start envoy xDS server") @@ -228,25 +228,28 @@ func main() { log.Warn("Not running with scheduler local DB") } - s := server2.NewSchedulerServer(logger, ss, es, ps, sched, eventHub) + s := schedulerServer.NewSchedulerServer(logger, ss, es, ps, sched, eventHub) err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort) if err != nil { - log.WithError(err).Fatalf("Scheduler start servers error") + log.WithError(err).Fatalf("Failed to start server gRPC servers") } err = as.StartGrpcServer(allowPlaintxt, agentPort, agentMtlsPort) if err != nil { - log.Fatalf("Failed to start agent grpc server %s", err.Error()) + log.WithError(err).Fatalf("Failed to start agent gRPC server") } // Wait for completion <-done log.Info("Shutting down services") + s.StopSendModelEvents() s.StopSendServerEvents() s.StopSendExperimentEvents() s.StopSendPipelineEvents() cs.StopSendPipelineEvents() as.StopAgentStreams() + + log.Info("Shutdown services") } diff --git a/scheduler/docs/design.md b/scheduler/docs/design.md index fb58ccf130..c1a881e173 100644 --- a/scheduler/docs/design.md +++ b/scheduler/docs/design.md @@ -6,12 +6,12 @@ * Calls scheduler with load/unload model requests * **Agent** * Runs on each server pod. On start calls scheduler to inform of new server replica with given capabilities, memory. - * Hanldes load requests: - * Tell rclone server to download artifacts + * Handles load requests: + * Tell Rclone server to download artifacts * Tell server to load/unload model - * **Sheduler** + * **Scheduler** * Assigns models to server replicas - * Manages grpc connections to: + * Manages gRPC connections to: * Agent to load/unload models assigned to a replica * Envoy to update routing to replicas for models * **Server** @@ -26,18 +26,17 @@ * Scale from zero * Payload logging - -## Agent-scheduler design +## Agent-Scheduler design Requirements: - * Handle server updates, e.g. user changes server configuration (more memory, different image with capabilities (sklearn, alibi etc) + * Handle server updates, e.g. user changes server configuration (more memory, different image with capabilities (sklearn, alibi etc)) * Handle server failures Due to above current design: - * Agent calls schedueler on startup when pod is ready and informs scheduler of new replica with a server with given capabilities - * Scheduler tells agent of models to load - * Scheduler handles loss of grpc connection by rescheduling models (if possible) + * Agent calls scheduler on startup, when pod is ready, and informs scheduler of new replica (if any, there may be 0 replicas) with a server with given capabilities + * Scheduler tells agent of model(s) to load + * Scheduler handles loss of gRPC connection by rescheduling models (if possible) * Scheduler reschedules failed scheduling models when replicas restart ## Scheduler design @@ -49,17 +48,15 @@ Requirements: * Handle syncing of Envoy and Agents when model->server changes * Handle running as multiple pods with remote DB storage - - -## GRPC Services +## gRPC Services * [Scheduler](../apis/mlops/scheduler/scheduler.proto) - * [agent](../apis/mlops/agent/agent.proto) + * [Agent](../apis/mlops/agent/agent.proto) ## Model Replica State -A model state can be in a set of states. +A model state can be in a set of states. ```golang const ( @@ -98,12 +95,12 @@ The idea is the core scheduler, the scheduler-agent server and the scheduler-env ### Scheduler #### Loading - 1. Scheduler gprc receives load model rpc + 1. Scheduler gprc receives load model RPC 1. Scheduler assigns model to 1 or more replicas updating the core model->server state 1. Model replica state it set to `LoadRequested` and Model state is set to `ModelProgressing` #### Unloading - 1. Scheduler gprc receives unload model rpc + 1. Scheduler gprc receives unload model RPC 1. Scheduler removes model replicas updating the core model->server state 1. Model replica state it set to `UnloadEnvoyRequested` and Model state is set to `ModelTerminating` @@ -124,4 +121,3 @@ When it syncs ### Scheduler-Envoy 1. Envoy syncs and updates mapping for any models it sees that have state `Loaded` to be `Available` and removes any whose state is not `Loaded` 1. Envoy sets all model replicas marked as `UnloadEnvoyRequested` to `UnloadRequested`, which would trigger Agent-server model replica unload - diff --git a/scheduler/pkg/agent/client.go b/scheduler/pkg/agent/client.go index 8aaaa0d56e..78909ff335 100644 --- a/scheduler/pkg/agent/client.go +++ b/scheduler/pkg/agent/client.go @@ -428,6 +428,7 @@ func (c *Client) StartService() error { grpcClient := agent.NewAgentServiceClient(c.conn) + // Connect to the scheduler for server-side streaming stream, err := grpcClient.Subscribe( context.Background(), &agent.AgentSubscribeRequest{ @@ -457,7 +458,7 @@ func (c *Client) StartService() error { _, _ = clientStream.CloseAndRecv() }() - // start stream to server + // Start the main control loop for the agent<-scheduler stream for { if c.stop.Load() { logger.Info("Stopping") @@ -489,6 +490,7 @@ func (c *Client) StartService() error { case agent.ModelOperationMessage_UNLOAD_MODEL: c.logger.Infof("calling unload model") + go func() { err := c.UnloadModel(operation) if err != nil { @@ -677,8 +679,9 @@ func (c *Client) sendModelEventError( event agent.ModelEventMessage_Event, err error, ) { + c.logger.WithError(err).Errorf("Failed to load model, sending error to scheduler") grpcClient := agent.NewAgentServiceClient(c.conn) - _, err = grpcClient.AgentEvent(context.Background(), &agent.ModelEventMessage{ + modelEventResponse, err := grpcClient.AgentEvent(context.Background(), &agent.ModelEventMessage{ ServerName: c.serverName, ReplicaIdx: c.replicaIdx, ModelName: modelName, @@ -688,8 +691,10 @@ func (c *Client) sendModelEventError( AvailableMemoryBytes: c.stateManager.GetAvailableMemoryBytesWithOverCommit(), }) if err != nil { - c.logger.WithError(err).Errorf("Failed to send error back on load model") + c.logger.WithError(err).Errorf("Failed to send error back to scheduler on load model") + return } + c.logger.WithField("modelEventResponse", modelEventResponse).Infof("Sent agent model event to scheduler") } func (c *Client) sendAgentEvent( diff --git a/scheduler/pkg/agent/client_test.go b/scheduler/pkg/agent/client_test.go index 99710278c0..544372fd8f 100644 --- a/scheduler/pkg/agent/client_test.go +++ b/scheduler/pkg/agent/client_test.go @@ -257,8 +257,10 @@ func TestLoadModel(t *testing.T) { success bool autoscalingEnabled bool } + smallMemory := uint64(500) largeMemory := uint64(2000) + tests := []test{ { name: "simple", @@ -339,12 +341,15 @@ func TestLoadModel(t *testing.T) { for tidx, test := range tests { t.Run(test.name, func(t *testing.T) { t.Logf("Test #%d", tidx) + + // Set up dependencies v2Client := createTestV2Client(addVerionToModels(test.models, 0), test.v2Status) httpmock.ActivateNonDefault(v2Client.(*testing_utils.V2RestClientForTest).HttpClient) modelRepository := FakeModelRepository{err: test.modelRepoErr} rpHTTP := FakeDependencyService{err: nil} rpGRPC := FakeDependencyService{err: nil} agentDebug := FakeDependencyService{err: nil} + lags := modelscaling.ModelScalingStatsWrapper{ Stats: modelscaling.NewModelReplicaLagsKeeper(), Operator: interfaces.Gte, @@ -352,6 +357,7 @@ func TestLoadModel(t *testing.T) { Reset: true, EventType: modelscaling.ScaleUpEvent, } + lastUsed := modelscaling.ModelScalingStatsWrapper{ Stats: modelscaling.NewModelReplicaLastUsedKeeper(), Operator: interfaces.Gte, @@ -359,23 +365,36 @@ func TestLoadModel(t *testing.T) { Reset: false, EventType: modelscaling.ScaleDownEvent, } + modelScalingService := modelscaling.NewStatsAnalyserService( []modelscaling.ModelScalingStatsWrapper{lags, lastUsed}, logger, 10) + drainerServicePort, _ := testing_utils2.GetFreePortForTest() drainerService := drainservice.NewDrainerService(logger, uint(drainerServicePort)) + client := NewClient( NewClientSettings("mlserver", 1, "scheduler", 9002, 9055, 1*time.Minute, 1*time.Minute, 1*time.Minute, 1, 1), logger, modelRepository, v2Client, test.replicaConfig, "default", rpHTTP, rpGRPC, agentDebug, modelScalingService, drainerService, newFakeMetricsHandler()) + mockAgentV2Server := &mockAgentV2Server{models: []string{}} conn, cerr := grpc.DialContext(context.Background(), "", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(dialerv2(mockAgentV2Server))) g.Expect(cerr).To(BeNil()) + client.conn = conn + go func() { - _ = client.Start() + err := client.Start() + // Regardless if this is/isn't a success test case, the client should've started without error + g.Expect(err).To(BeNil()) }() + + // Give the client time to start (?) time.Sleep(50 * time.Millisecond) + + // Do the actual function call that is being tested err := client.LoadModel(test.op) + if test.success { g.Expect(err).To(BeNil()) g.Expect(mockAgentV2Server.loadedEvents).To(Equal(1)) @@ -386,6 +405,7 @@ func TestLoadModel(t *testing.T) { g.Expect(proto.Clone(loadedVersions[0])).To(Equal(proto.Clone(test.op.ModelVersion))) // we have set model stats state if autoscaling is enabled versionedModelName := util.GetVersionedModelName(test.op.GetModelVersion().Model.Meta.Name, test.op.GetModelVersion().GetVersion()) + if test.autoscalingEnabled { _, err := lags.Stats.Get(versionedModelName) g.Expect(err).To(BeNil()) @@ -428,13 +448,13 @@ func TestLoadModelWithAuth(t *testing.T) { rcloneConfig := `{"type":"s3","name":"s3","parameters":{"provider":"minio","env_auth":"false","access_key_id":"minioadmin","secret_access_key":"minioadmin","endpoint":"http://172.18.255.2:9000"}}` rcloneSecret := "minio-secret" yamlSecretDataOK := ` -type: s3 -name: s3 -parameters: - provider: minio - env_auth: false - access_key_id: minioadmin - secret_access_key: minioadmin +type: s3 +name: s3 +parameters: + provider: minio + env_auth: false + access_key_id: minioadmin + secret_access_key: minioadmin endpoint: http://172.18.255.2:9000 ` smallMemory := uint64(500) diff --git a/scheduler/pkg/agent/rclone/rclone.go b/scheduler/pkg/agent/rclone/rclone.go index dd5657d42b..3056f0b863 100644 --- a/scheduler/pkg/agent/rclone/rclone.go +++ b/scheduler/pkg/agent/rclone/rclone.go @@ -157,6 +157,7 @@ func (r *RCloneClient) listenForConfigUpdates() { } } +// Sends a serialised operation (op) payload to Rclone's HTTP API. func (r *RCloneClient) call(op []byte, path string) ([]byte, error) { rcloneUrl := url.URL{ Scheme: "http", @@ -188,12 +189,16 @@ func (r *RCloneClient) call(op []byte, path string) ([]byte, error) { return b, nil } +// Ready sends a no-op request to the RClone client and checks if it is ready. +// It returns an error if there was a problem marshaling the request or if the client is not ready. +// It uses Rclone's built-in no-op HTTP endpoint. func (r *RCloneClient) Ready() error { noop := Noop{Foo: "bar"} b, err := json.Marshal(noop) if err != nil { return err } + // It's a no-op, so ignore the response. _, err = r.call(b, RcloneNoopPath) return err } diff --git a/scheduler/pkg/agent/repository/triton/triton_test.go b/scheduler/pkg/agent/repository/triton/triton_test.go index 35ad84d723..22df360716 100644 --- a/scheduler/pkg/agent/repository/triton/triton_test.go +++ b/scheduler/pkg/agent/repository/triton/triton_test.go @@ -120,11 +120,10 @@ func TestUpdateModelRepository(t *testing.T) { g := NewGomegaWithT(t) type test struct { - name string - config *pb.ModelConfig - repoConfig *pb.ModelConfig - isVersionFolder bool - modelName string + name string + config *pb.ModelConfig + repoConfig *pb.ModelConfig + modelName string } tests := []test{ diff --git a/scheduler/pkg/kafka/dataflow/server_test.go b/scheduler/pkg/kafka/dataflow/server_test.go index b373731020..43e1baa160 100644 --- a/scheduler/pkg/kafka/dataflow/server_test.go +++ b/scheduler/pkg/kafka/dataflow/server_test.go @@ -88,11 +88,10 @@ func TestCreatePipelineTopicSources(t *testing.T) { g := NewGomegaWithT(t) type test struct { - name string - server *ChainerServer - pipelineName string - inputs []string - sources []*chainer.PipelineTopic + name string + server *ChainerServer + inputs []string + sources []*chainer.PipelineTopic } getPtrStr := func(val string) *string { return &val } @@ -108,7 +107,6 @@ func TestCreatePipelineTopicSources(t *testing.T) { logger: log.New(), topicNamer: createTopicNamer("default", "seldon"), }, - pipelineName: "p1", inputs: []string{ "foo.inputs", "foo.outputs", diff --git a/scheduler/pkg/store/pipeline/utils_test.go b/scheduler/pkg/store/pipeline/utils_test.go index 460ef15675..3d8bfd2a2e 100644 --- a/scheduler/pkg/store/pipeline/utils_test.go +++ b/scheduler/pkg/store/pipeline/utils_test.go @@ -381,18 +381,16 @@ func TestCreatePipelineFromProto(t *testing.T) { func TestUpdateExternalInputSteps(t *testing.T) { g := NewGomegaWithT(t) type test struct { - name string - pipelineName string - inputs []string - expected []string + name string + inputs []string + expected []string } tests := []test{ { - name: "test update external inputs", - pipelineName: "pipeline", - inputs: []string{"p1", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1", "p1.step.m1.outputs.t1"}, - expected: []string{"p1.outputs", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1.outputs", "p1.step.m1.outputs.t1"}, + name: "test update external inputs", + inputs: []string{"p1", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1", "p1.step.m1.outputs.t1"}, + expected: []string{"p1.outputs", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1.outputs", "p1.step.m1.outputs.t1"}, }, } for _, test := range tests {