From 3e4f4e1870dd083673bb2ced241a6873295aa3be Mon Sep 17 00:00:00 2001 From: cliveseldon Date: Mon, 23 May 2022 06:19:12 +0100 Subject: [PATCH] add locks around stream send (#226) --- scheduler/pkg/agent/server.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scheduler/pkg/agent/server.go b/scheduler/pkg/agent/server.go index bdd8c032c4..fd75dcbf0e 100644 --- a/scheduler/pkg/agent/server.go +++ b/scheduler/pkg/agent/server.go @@ -43,8 +43,8 @@ type SchedulerAgent interface { type AgentSubscriber struct { finished chan<- bool - //mutext sync.Mutex // grpc streams are not thread safe for sendMsg https://github.com/grpc/grpc-go/issues/2355 - stream pb.AgentService_SubscribeServer + mutext sync.Mutex // grpc streams are not thread safe for sendMsg https://github.com/grpc/grpc-go/issues/2355 + stream pb.AgentService_SubscribeServer } func NewAgentServer( @@ -122,10 +122,12 @@ func (s *Server) Sync(modelName string) { continue } + as.mutext.Lock() err = as.stream.Send(&pb.ModelOperationMessage{ Operation: pb.ModelOperationMessage_LOAD_MODEL, ModelVersion: &pb.ModelVersion{Model: latestModel.GetModel(), Version: latestModel.GetVersion()}, }) + as.mutext.Unlock() if err != nil { logger.WithError(err).Errorf("stream message send failed for model %s and replicaidx %d", modelName, replicaIdx) continue @@ -147,10 +149,12 @@ func (s *Server) Sync(modelName string) { logger.Errorf("Failed to find server replica for %s:%d", modelVersion.Server(), replicaIdx) continue } + as.mutext.Lock() err = as.stream.Send(&pb.ModelOperationMessage{ Operation: pb.ModelOperationMessage_UNLOAD_MODEL, ModelVersion: &pb.ModelVersion{Model: modelVersion.GetModel(), Version: modelVersion.GetVersion()}, }) + as.mutext.Unlock() if err != nil { logger.WithError(err).Errorf("stream message send failed for model %s and replicaidx %d", modelName, replicaIdx) continue