Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add server keep alive enforcement policy #6016

Merged
merged 26 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions operator/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ import (

const (
// these 2 constants in combination with the backoff exponential function will give us a max backoff of 13.5 minutes
SchedulerConnectMaxRetries = 12
SchedulerConnectBackoffScalar = 200 * time.Millisecond
ClientKeapAliveTime = 60 * time.Second
ClientKeapAliveTimeout = 2 * time.Second
ClientKeapAlivePermit = true
schedulerConnectMaxRetries = 100
schedulerConnectBackoffScalar = 200 * time.Millisecond
// these keep alive settings need to match the scheduler counterpart in scheduler/pkg/util/constants.go
clientKeepAliveTime = 60 * time.Second
clientKeepAliveTimeout = 2 * time.Second
clientKeepAlivePermit = false
// backoff
backoffMaxElapsedTime = 0 // Never stop due to large time between calls
backOffMaxInterval = time.Second * 15
backOffInitialInterval = time.Second
)

type SchedulerClient struct {
Expand Down Expand Up @@ -229,9 +234,9 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai
}
}
kacp := keepalive.ClientParameters{
Time: ClientKeapAliveTime,
Timeout: ClientKeapAliveTimeout,
PermitWithoutStream: ClientKeapAlivePermit,
Time: clientKeepAliveTime,
Timeout: clientKeepAliveTimeout,
PermitWithoutStream: clientKeepAlivePermit,
}

retryOpts := []grpc_retry.CallOption{
Expand All @@ -249,7 +254,7 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
s.logger.Info("Running scheduler client in plain text mode", "port", port)
}
opts = append(opts, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)))
// we dont have backoff retry on the grpc streams as we handle this in the event handlers
opts = append(opts, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)))
opts = append(opts, grpc.WithKeepaliveParams(kacp))
s.logger.Info("Dialing scheduler", "host", host, "port", port)
Expand Down Expand Up @@ -313,7 +318,7 @@ func retryFn(
logFailure := func(err error, delay time.Duration) {
logger.Error(err, "Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
backOffExp := getClientExponentialBackoff()
fnWithArgs := func() error {
grpcClient := scheduler.NewSchedulerClient(conn)
return fn(context.Background(), grpcClient, namespace)
Expand All @@ -325,3 +330,11 @@ func retryFn(
}
return nil
}

func getClientExponentialBackoff() *backoff.ExponentialBackOff {
backOffExp := backoff.NewExponentialBackOff()
backOffExp.MaxElapsedTime = backoffMaxElapsedTime
backOffExp.MaxInterval = backOffMaxInterval
backOffExp.InitialInterval = backOffInitialInterval
return backOffExp
}
4 changes: 2 additions & 2 deletions operator/scheduler/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC
stream, err := grpcClient.SubscribeControlPlane(
ctx,
&scheduler.ControlPlaneSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions operator/scheduler/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alp
_, err = grpcClient.StartExperiment(
ctx,
req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
}
Expand All @@ -66,8 +66,8 @@ func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alph
_, err = grpcClient.StopExperiment(
ctx,
req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
}
Expand All @@ -79,8 +79,8 @@ func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcCli
stream, err := grpcClient.SubscribeExperimentStatus(
ctx,
&scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions operator/scheduler/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model,
_, err = grpcClient.LoadModel(
ctx,
&loadModelRequest,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return s.checkErrorRetryable(model.Kind, model.Name, err), err
Expand Down Expand Up @@ -102,8 +102,8 @@ func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model
_, err = grpcClient.UnloadModel(
ctx,
modelRef,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return s.checkErrorRetryable(model.Kind, model.Name, err), err
Expand All @@ -117,8 +117,8 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient s
stream, err := grpcClient.SubscribeModelStatus(
ctx,
&scheduler.ModelSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions operator/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (s *SchedulerClient) LoadPipeline(ctx context.Context, pipeline *v1alpha1.P
_, err = grpcClient.LoadPipeline(
ctx,
&req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
return s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err), err
}
Expand All @@ -62,8 +62,8 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1
_, err = grpcClient.UnloadPipeline(
ctx,
&req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err)
Expand All @@ -85,8 +85,8 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, grpcClien
stream, err := grpcClient.SubscribePipelineStatus(
ctx,
&scheduler.PipelineSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions operator/scheduler/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler
_, err := grpcClient.ServerNotify(
ctx,
request,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
logger.Error(err, "Failed to send notify server to scheduler")
Expand All @@ -82,8 +82,8 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient
stream, err := grpcClient.SubscribeServerStatus(
ctx,
&scheduler.ServerSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion scheduler/config/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ static_resources:
socket_address:
address: seldon-scheduler
port_value: 9002
http2_protocol_options: {}
http2_protocol_options: {
connection_keepalive: {
interval: 60s,
timeout: 2s,
}
}
name: xds_cluster
- connect_timeout: 0.250s
type: LOGICAL_DNS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import io.klogging.logger as coLogger

@OptIn(FlowPreview::class)
Expand All @@ -60,6 +61,9 @@ class PipelineSubscriber(
.defaultServiceConfig(grpcServiceConfig)
.usePlaintext() // Use TLS
.enableRetry()
// these keep alive settings need to match the go counterpart in scheduler/pkg/util/constants.go
.keepAliveTime(60L, TimeUnit.SECONDS)
.keepAliveTimeout(2L, TimeUnit.SECONDS)
.build()
private val client = ChainerGrpcKt.ChainerCoroutineStub(channel)

Expand Down
12 changes: 3 additions & 9 deletions scheduler/pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/encoding/protojson"

"github.com/seldonio/seldon-core/apis/go/v2/mlops/agent"
Expand Down Expand Up @@ -237,8 +236,7 @@ func (c *Client) Start() error {
logFailure := func(err error, delay time.Duration) {
c.logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp := util.GetClientExponentialBackoff()
err := backoff.RetryNotify(c.StartService, backOffExp, logFailure)
if err != nil {
c.logger.WithError(err).Fatal("Failed to start client")
Expand Down Expand Up @@ -417,11 +415,7 @@ func (c *Client) getConnection(host string, plainTxtPort int, tlsPort int) (*grp

logger.Infof("Connecting (non-blocking) to scheduler at %s:%d", host, port)

kacp := keepalive.ClientParameters{
Time: util.ClientKeapAliveTime,
Timeout: util.ClientKeapAliveTimeout,
PermitWithoutStream: util.ClientKeapAlivePermit,
}
kacp := util.GetClientKeepAliveParameters()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(transCreds),
Expand Down Expand Up @@ -453,7 +447,7 @@ func (c *Client) StartService() error {
Shared: true,
AvailableMemoryBytes: c.stateManager.GetAvailableMemoryBytesWithOverCommit(),
},
grpc_retry.WithMax(1),
grpc_retry.WithMax(util.MaxGRPCRetriesOnStream),
) // TODO make configurable
if err != nil {
return err
Expand Down
8 changes: 0 additions & 8 deletions scheduler/pkg/agent/modelserver_controlplane/oip/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane"
Expand Down Expand Up @@ -51,19 +50,12 @@ func CreateV2GrpcConnection(v2Config V2Config) (*grpc.ClientConn, error) {
grpc_retry.WithMax(v2Config.GRPRetryMaxCount),
}

kacp := keepalive.ClientParameters{
sakoush marked this conversation as resolved.
Show resolved Hide resolved
Time: util.ClientKeapAliveTime,
Timeout: util.ClientKeapAliveTimeout,
PermitWithoutStream: util.ClientKeapAlivePermit,
}

opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(v2Config.GRPCMaxMsgSizeBytes), grpc.MaxCallSendMsgSize(v2Config.GRPCMaxMsgSizeBytes)),
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)),
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithKeepaliveParams(kacp),
}
conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", v2Config.Host, v2Config.GRPCPort), opts...)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions scheduler/pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,16 @@ func (s *Server) startServer(port uint, secure bool) error {
if err != nil {
return err
}

kaep := util.GetServerKeepAliveEnforcementPolicy()

opts := []grpc.ServerOption{}
if secure {
opts = append(opts, grpc.Creds(s.certificateStore.CreateServerTransportCredentials()))
}
opts = append(opts, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep))
grpcServer := grpc.NewServer(opts...)
pb.RegisterAgentServiceServer(grpcServer, s)
s.logger.Printf("Agent server running on %d mtls:%v", port, secure)
Expand Down
4 changes: 4 additions & 0 deletions scheduler/pkg/envoy/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"google.golang.org/grpc"

seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)

const (
Expand Down Expand Up @@ -66,12 +68,14 @@ func (x *XDSServer) StartXDSServer(port uint) error {
return err
}
}
kaep := util.GetServerKeepAliveEnforcementPolicy()
secure := x.certificateStore != nil
var grpcOptions []grpc.ServerOption
if secure {
grpcOptions = append(grpcOptions, grpc.Creds(x.certificateStore.CreateServerTransportCredentials()))
}
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcOptions = append(grpcOptions, grpc.KeepaliveEnforcementPolicy(kaep))
grpcServer := grpc.NewServer(grpcOptions...)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
Expand Down
4 changes: 4 additions & 0 deletions scheduler/pkg/kafka/dataflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ func (c *ChainerServer) StartGrpcServer(agentPort uint) error {
if err != nil {
log.Fatalf("failed to create listener: %v", err)
}

kaep := util.GetServerKeepAliveEnforcementPolicy()

var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcOptions = append(grpcOptions, grpc.KeepaliveEnforcementPolicy(kaep))
grpcServer := grpc.NewServer(grpcOptions...)
chainer.RegisterChainerServer(grpcServer, c)
c.logger.Printf("Chainer server running on %d", agentPort)
Expand Down
19 changes: 7 additions & 12 deletions scheduler/pkg/kafka/gateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

"github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls"
Expand Down Expand Up @@ -81,11 +80,7 @@ func (kc *KafkaSchedulerClient) ConnectToScheduler(host string, plainTxtPort int
port = tlsPort
}

kacp := keepalive.ClientParameters{
Time: util.ClientKeapAliveTime,
Timeout: util.ClientKeapAliveTimeout,
PermitWithoutStream: util.ClientKeapAlivePermit,
}
kacp := util.GetClientKeepAliveParameters()

// note: retry is done in the caller
opts := []grpc.DialOption{
Expand Down Expand Up @@ -123,11 +118,7 @@ func (kc *KafkaSchedulerClient) Start() error {
logFailure := func(err error, delay time.Duration) {
kc.logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
// Set some reasonable settings for trying to reconnect to scheduler
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp.MaxInterval = time.Second * 15
backOffExp.InitialInterval = time.Second
backOffExp := util.GetClientExponentialBackoff()
err := backoff.RetryNotify(kc.SubscribeModelEvents, backOffExp, logFailure)
if err != nil {
kc.logger.WithError(err).Fatal("Failed to start modelgateway client")
Expand All @@ -141,7 +132,11 @@ func (kc *KafkaSchedulerClient) SubscribeModelEvents() error {
logger := kc.logger.WithField("func", "SubscribeModelEvents")
grpcClient := scheduler.NewSchedulerClient(kc.conn)
logger.Info("Subscribing to model status events")
stream, errSub := grpcClient.SubscribeModelStatus(context.Background(), &scheduler.ModelSubscriptionRequest{SubscriberName: SubscriberName}, grpc_retry.WithMax(100))
stream, errSub := grpcClient.SubscribeModelStatus(
context.Background(),
&scheduler.ModelSubscriptionRequest{SubscriberName: SubscriberName},
grpc_retry.WithMax(util.MaxGRPCRetriesOnStream),
)
if errSub != nil {
return errSub
}
Expand Down
Loading
Loading