Skip to content

Commit

Permalink
test: Ensure agent client starts (#5112)
Browse files Browse the repository at this point in the history
I've also added some comments based on my understanding of the code.
  • Loading branch information
jesse-c authored and Adrian Gonzalez-Martin committed Sep 1, 2023
1 parent ce76909 commit 7077f7f
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 56 deletions.
19 changes: 11 additions & 8 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
"time"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
envoyServerControlPlaneV3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
log "github.com/sirupsen/logrus"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/server"
envoyServer "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/server"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/config"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/dataflow"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler/cleaner"
server2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/server"
schedulerServer "github.com/seldonio/seldon-core/scheduler/v2/pkg/server"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline"
Expand Down Expand Up @@ -158,8 +158,8 @@ func main() {
// Create a cache
xdsCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger)
ctx := context.Background()
srv := serverv3.NewServer(ctx, xdsCache, nil)
xdsServer := server.NewXDSServer(srv, logger)
srv := envoyServerControlPlaneV3.NewServer(ctx, xdsCache, nil)
xdsServer := envoyServer.NewXDSServer(srv, logger)
err = xdsServer.StartXDSServer(envoyPort)
if err != nil {
log.WithError(err).Fatalf("Failed to start envoy xDS server")
Expand Down Expand Up @@ -228,25 +228,28 @@ func main() {
log.Warn("Not running with scheduler local DB")
}

s := server2.NewSchedulerServer(logger, ss, es, ps, sched, eventHub)
s := schedulerServer.NewSchedulerServer(logger, ss, es, ps, sched, eventHub)
err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort)
if err != nil {
log.WithError(err).Fatalf("Scheduler start servers error")
log.WithError(err).Fatalf("Failed to start server gRPC servers")
}

err = as.StartGrpcServer(allowPlaintxt, agentPort, agentMtlsPort)
if err != nil {
log.Fatalf("Failed to start agent grpc server %s", err.Error())
log.WithError(err).Fatalf("Failed to start agent gRPC server")
}

// Wait for completion
<-done

log.Info("Shutting down services")

s.StopSendModelEvents()
s.StopSendServerEvents()
s.StopSendExperimentEvents()
s.StopSendPipelineEvents()
cs.StopSendPipelineEvents()
as.StopAgentStreams()

log.Info("Shutdown services")
}
32 changes: 14 additions & 18 deletions scheduler/docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
* Calls scheduler with load/unload model requests
* **Agent**
* Runs on each server pod. On start calls scheduler to inform of new server replica with given capabilities, memory.
* Hanldes load requests:
* Tell rclone server to download artifacts
* Handles load requests:
* Tell Rclone server to download artifacts
* Tell server to load/unload model
* **Sheduler**
* **Scheduler**
* Assigns models to server replicas
* Manages grpc connections to:
* Manages gRPC connections to:
* Agent to load/unload models assigned to a replica
* Envoy to update routing to replicas for models
* **Server**
Expand All @@ -26,18 +26,17 @@
* Scale from zero
* Payload logging


## Agent-scheduler design
## Agent-Scheduler design

Requirements:

* Handle server updates, e.g. user changes server configuration (more memory, different image with capabilities (sklearn, alibi etc)
* Handle server updates, e.g. user changes server configuration (more memory, different image with capabilities (sklearn, alibi etc))
* Handle server failures

Due to above current design:
* Agent calls schedueler on startup when pod is ready and informs scheduler of new replica with a server with given capabilities
* Scheduler tells agent of models to load
* Scheduler handles loss of grpc connection by rescheduling models (if possible)
* Agent calls scheduler on startup, when pod is ready, and informs scheduler of new replica (if any, there may be 0 replicas) with a server with given capabilities
* Scheduler tells agent of model(s) to load
* Scheduler handles loss of gRPC connection by rescheduling models (if possible)
* Scheduler reschedules failed scheduling models when replicas restart

## Scheduler design
Expand All @@ -49,17 +48,15 @@ Requirements:
* Handle syncing of Envoy and Agents when model->server changes
* Handle running as multiple pods with remote DB storage



## GRPC Services
## gRPC Services

* [Scheduler](../apis/mlops/scheduler/scheduler.proto)
* [agent](../apis/mlops/agent/agent.proto)
* [Agent](../apis/mlops/agent/agent.proto)


## Model Replica State

A model state can be in a set of states.
A model state can be in a set of states.

```golang
const (
Expand Down Expand Up @@ -98,12 +95,12 @@ The idea is the core scheduler, the scheduler-agent server and the scheduler-env

### Scheduler
#### Loading
1. Scheduler gprc receives load model rpc
1. Scheduler gprc receives load model RPC
1. Scheduler assigns model to 1 or more replicas updating the core model->server state
1. Model replica state it set to `LoadRequested` and Model state is set to `ModelProgressing`

#### Unloading
1. Scheduler gprc receives unload model rpc
1. Scheduler gprc receives unload model RPC
1. Scheduler removes model replicas updating the core model->server state
1. Model replica state it set to `UnloadEnvoyRequested` and Model state is set to `ModelTerminating`

Expand All @@ -124,4 +121,3 @@ When it syncs
### Scheduler-Envoy
1. Envoy syncs and updates mapping for any models it sees that have state `Loaded` to be `Available` and removes any whose state is not `Loaded`
1. Envoy sets all model replicas marked as `UnloadEnvoyRequested` to `UnloadRequested`, which would trigger Agent-server model replica unload

11 changes: 8 additions & 3 deletions scheduler/pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func (c *Client) StartService() error {

grpcClient := agent.NewAgentServiceClient(c.conn)

// Connect to the scheduler for server-side streaming
stream, err := grpcClient.Subscribe(
context.Background(),
&agent.AgentSubscribeRequest{
Expand Down Expand Up @@ -457,7 +458,7 @@ func (c *Client) StartService() error {
_, _ = clientStream.CloseAndRecv()
}()

// start stream to server
// Start the main control loop for the agent<-scheduler stream
for {
if c.stop.Load() {
logger.Info("Stopping")
Expand Down Expand Up @@ -489,6 +490,7 @@ func (c *Client) StartService() error {

case agent.ModelOperationMessage_UNLOAD_MODEL:
c.logger.Infof("calling unload model")

go func() {
err := c.UnloadModel(operation)
if err != nil {
Expand Down Expand Up @@ -677,8 +679,9 @@ func (c *Client) sendModelEventError(
event agent.ModelEventMessage_Event,
err error,
) {
c.logger.WithError(err).Errorf("Failed to load model, sending error to scheduler")
grpcClient := agent.NewAgentServiceClient(c.conn)
_, err = grpcClient.AgentEvent(context.Background(), &agent.ModelEventMessage{
modelEventResponse, err := grpcClient.AgentEvent(context.Background(), &agent.ModelEventMessage{
ServerName: c.serverName,
ReplicaIdx: c.replicaIdx,
ModelName: modelName,
Expand All @@ -688,8 +691,10 @@ func (c *Client) sendModelEventError(
AvailableMemoryBytes: c.stateManager.GetAvailableMemoryBytesWithOverCommit(),
})
if err != nil {
c.logger.WithError(err).Errorf("Failed to send error back on load model")
c.logger.WithError(err).Errorf("Failed to send error back to scheduler on load model")
return
}
c.logger.WithField("modelEventResponse", modelEventResponse).Infof("Sent agent model event to scheduler")
}

func (c *Client) sendAgentEvent(
Expand Down
36 changes: 28 additions & 8 deletions scheduler/pkg/agent/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,10 @@ func TestLoadModel(t *testing.T) {
success bool
autoscalingEnabled bool
}

smallMemory := uint64(500)
largeMemory := uint64(2000)

tests := []test{
{
name: "simple",
Expand Down Expand Up @@ -339,43 +341,60 @@ func TestLoadModel(t *testing.T) {
for tidx, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Logf("Test #%d", tidx)

// Set up dependencies
v2Client := createTestV2Client(addVerionToModels(test.models, 0), test.v2Status)
httpmock.ActivateNonDefault(v2Client.(*testing_utils.V2RestClientForTest).HttpClient)
modelRepository := FakeModelRepository{err: test.modelRepoErr}
rpHTTP := FakeDependencyService{err: nil}
rpGRPC := FakeDependencyService{err: nil}
agentDebug := FakeDependencyService{err: nil}

lags := modelscaling.ModelScalingStatsWrapper{
Stats: modelscaling.NewModelReplicaLagsKeeper(),
Operator: interfaces.Gte,
Threshold: 10,
Reset: true,
EventType: modelscaling.ScaleUpEvent,
}

lastUsed := modelscaling.ModelScalingStatsWrapper{
Stats: modelscaling.NewModelReplicaLastUsedKeeper(),
Operator: interfaces.Gte,
Threshold: 10,
Reset: false,
EventType: modelscaling.ScaleDownEvent,
}

modelScalingService := modelscaling.NewStatsAnalyserService(
[]modelscaling.ModelScalingStatsWrapper{lags, lastUsed}, logger, 10)

drainerServicePort, _ := testing_utils2.GetFreePortForTest()
drainerService := drainservice.NewDrainerService(logger, uint(drainerServicePort))

client := NewClient(
NewClientSettings("mlserver", 1, "scheduler", 9002, 9055, 1*time.Minute, 1*time.Minute, 1*time.Minute, 1, 1),
logger, modelRepository, v2Client, test.replicaConfig, "default",
rpHTTP, rpGRPC, agentDebug, modelScalingService, drainerService, newFakeMetricsHandler())

mockAgentV2Server := &mockAgentV2Server{models: []string{}}
conn, cerr := grpc.DialContext(context.Background(), "", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(dialerv2(mockAgentV2Server)))
g.Expect(cerr).To(BeNil())

client.conn = conn

go func() {
_ = client.Start()
err := client.Start()
// Regardless if this is/isn't a success test case, the client should've started without error
g.Expect(err).To(BeNil())
}()

// Give the client time to start (?)
time.Sleep(50 * time.Millisecond)

// Do the actual function call that is being tested
err := client.LoadModel(test.op)

if test.success {
g.Expect(err).To(BeNil())
g.Expect(mockAgentV2Server.loadedEvents).To(Equal(1))
Expand All @@ -386,6 +405,7 @@ func TestLoadModel(t *testing.T) {
g.Expect(proto.Clone(loadedVersions[0])).To(Equal(proto.Clone(test.op.ModelVersion)))
// we have set model stats state if autoscaling is enabled
versionedModelName := util.GetVersionedModelName(test.op.GetModelVersion().Model.Meta.Name, test.op.GetModelVersion().GetVersion())

if test.autoscalingEnabled {
_, err := lags.Stats.Get(versionedModelName)
g.Expect(err).To(BeNil())
Expand Down Expand Up @@ -428,13 +448,13 @@ func TestLoadModelWithAuth(t *testing.T) {
rcloneConfig := `{"type":"s3","name":"s3","parameters":{"provider":"minio","env_auth":"false","access_key_id":"minioadmin","secret_access_key":"minioadmin","endpoint":"http://172.18.255.2:9000"}}`
rcloneSecret := "minio-secret"
yamlSecretDataOK := `
type: s3
name: s3
parameters:
provider: minio
env_auth: false
access_key_id: minioadmin
secret_access_key: minioadmin
type: s3
name: s3
parameters:
provider: minio
env_auth: false
access_key_id: minioadmin
secret_access_key: minioadmin
endpoint: http://172.18.255.2:9000
`
smallMemory := uint64(500)
Expand Down
5 changes: 5 additions & 0 deletions scheduler/pkg/agent/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (r *RCloneClient) listenForConfigUpdates() {
}
}

// Sends a serialised operation (op) payload to Rclone's HTTP API.
func (r *RCloneClient) call(op []byte, path string) ([]byte, error) {
rcloneUrl := url.URL{
Scheme: "http",
Expand Down Expand Up @@ -188,12 +189,16 @@ func (r *RCloneClient) call(op []byte, path string) ([]byte, error) {
return b, nil
}

// Ready sends a no-op request to the RClone client and checks if it is ready.
// It returns an error if there was a problem marshaling the request or if the client is not ready.
// It uses Rclone's built-in no-op HTTP endpoint.
func (r *RCloneClient) Ready() error {
noop := Noop{Foo: "bar"}
b, err := json.Marshal(noop)
if err != nil {
return err
}
// It's a no-op, so ignore the response.
_, err = r.call(b, RcloneNoopPath)
return err
}
Expand Down
9 changes: 4 additions & 5 deletions scheduler/pkg/agent/repository/triton/triton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,10 @@ func TestUpdateModelRepository(t *testing.T) {
g := NewGomegaWithT(t)

type test struct {
name string
config *pb.ModelConfig
repoConfig *pb.ModelConfig
isVersionFolder bool
modelName string
name string
config *pb.ModelConfig
repoConfig *pb.ModelConfig
modelName string
}

tests := []test{
Expand Down
10 changes: 4 additions & 6 deletions scheduler/pkg/kafka/dataflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ func TestCreatePipelineTopicSources(t *testing.T) {
g := NewGomegaWithT(t)

type test struct {
name string
server *ChainerServer
pipelineName string
inputs []string
sources []*chainer.PipelineTopic
name string
server *ChainerServer
inputs []string
sources []*chainer.PipelineTopic
}

getPtrStr := func(val string) *string { return &val }
Expand All @@ -108,7 +107,6 @@ func TestCreatePipelineTopicSources(t *testing.T) {
logger: log.New(),
topicNamer: createTopicNamer("default", "seldon"),
},
pipelineName: "p1",
inputs: []string{
"foo.inputs",
"foo.outputs",
Expand Down
14 changes: 6 additions & 8 deletions scheduler/pkg/store/pipeline/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,18 +381,16 @@ func TestCreatePipelineFromProto(t *testing.T) {
func TestUpdateExternalInputSteps(t *testing.T) {
g := NewGomegaWithT(t)
type test struct {
name string
pipelineName string
inputs []string
expected []string
name string
inputs []string
expected []string
}

tests := []test{
{
name: "test update external inputs",
pipelineName: "pipeline",
inputs: []string{"p1", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1", "p1.step.m1.outputs.t1"},
expected: []string{"p1.outputs", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1.outputs", "p1.step.m1.outputs.t1"},
name: "test update external inputs",
inputs: []string{"p1", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1", "p1.step.m1.outputs.t1"},
expected: []string{"p1.outputs", "p1.outputs", "p1.inputs", "p1.inputs.t1", "p1.step.m1.outputs", "p1.step.m1.outputs.t1"},
},
}
for _, test := range tests {
Expand Down

0 comments on commit 7077f7f

Please sign in to comment.