-
Notifications
You must be signed in to change notification settings - Fork 834
/
Copy pathclient.go
180 lines (161 loc) · 5.42 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
Copyright (c) 2024 Seldon Technologies Ltd.
Use of this software is governed by
(1) the license included in the LICENSE file or
(2) if the license included in the LICENSE file is the Business Source License 1.1,
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/
package status
import (
"context"
"fmt"
"math"
"sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
"github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)
const (
SubscriberName = "seldon-pipelinegateway"
)
type PipelineSchedulerClient struct {
logger logrus.FieldLogger
conn *grpc.ClientConn
callOptions []grpc.CallOption
pipelineStatusUpdater PipelineStatusUpdater
certificateStore *seldontls.CertificateStore
stop atomic.Bool
}
func NewPipelineSchedulerClient(logger logrus.FieldLogger, pipelineStatusUpdater PipelineStatusUpdater) *PipelineSchedulerClient {
opts := []grpc.CallOption{
grpc.MaxCallSendMsgSize(math.MaxInt32),
grpc.MaxCallRecvMsgSize(math.MaxInt32),
}
return &PipelineSchedulerClient{
logger: logger.WithField("source", "PipelineSchedulerClient"),
callOptions: opts,
pipelineStatusUpdater: pipelineStatusUpdater,
}
}
func (pc *PipelineSchedulerClient) connectToScheduler(host string, plainTxtPort int, tlsPort int) error {
logger := pc.logger.WithField("func", "ConnectToScheduler")
var err error
if pc.conn != nil {
err = pc.conn.Close()
if err != nil {
logger.WithError(err).Error("Failed to close previous grpc connection to scheduler")
}
}
protocol := seldontls.GetSecurityProtocolFromEnv(seldontls.EnvSecurityPrefixControlPlane)
if protocol == seldontls.SecurityProtocolSSL {
pc.certificateStore, err = seldontls.NewCertificateStore(seldontls.Prefix(seldontls.EnvSecurityPrefixControlPlaneClient),
seldontls.ValidationPrefix(seldontls.EnvSecurityPrefixControlPlaneServer))
if err != nil {
return err
}
}
var transCreds credentials.TransportCredentials
var port int
if pc.certificateStore == nil {
logger.Info("Starting plaintxt client to scheduler")
transCreds = insecure.NewCredentials()
port = plainTxtPort
} else {
logger.Info("Starting TLS client to scheduler")
transCreds = pc.certificateStore.CreateClientTransportCredentials()
port = tlsPort
}
kacp := util.GetClientKeepAliveParameters()
// note: retry is done in the caller
opts := []grpc.DialOption{
grpc.WithTransportCredentials(transCreds),
grpc.WithKeepaliveParams(kacp),
}
logger.Infof("Connecting to scheduler at %s:%d", host, port)
conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", host, port), opts...)
if err != nil {
return err
}
pc.conn = conn
return nil
}
func (pc *PipelineSchedulerClient) Stop() {
pc.stop.Store(true)
if pc.conn != nil {
_ = pc.conn.Close()
}
}
func (pc *PipelineSchedulerClient) Start(host string, plainTxtPort int, tlsPort int) error {
logger := pc.logger.WithField("func", "Start")
for {
if pc.stop.Load() {
logger.Info("Stopping")
return nil
}
err := pc.connectToScheduler(host, plainTxtPort, tlsPort)
if err != nil {
logger.WithError(err).Fatalf("Failed to connect to scheduler")
}
logger := pc.logger.WithField("func", "Start")
logFailure := func(err error, delay time.Duration) {
logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := util.GetClientExponentialBackoff()
err = backoff.RetryNotify(pc.SubscribePipelineEvents, backOffExp, logFailure)
if err != nil {
logger.WithError(err).Fatal("Failed to start pipeline gateway client")
return err
}
logger.Info("Subscribe ended")
}
}
func (pc *PipelineSchedulerClient) SubscribePipelineEvents() error {
logger := pc.logger.WithField("func", "SubscribePipelineEvents")
grpcClient := scheduler.NewSchedulerClient(pc.conn)
logger.Info("Subscribing to pipeline status events")
stream, errSub := grpcClient.SubscribePipelineStatus(
context.Background(),
&scheduler.PipelineSubscriptionRequest{SubscriberName: SubscriberName},
grpc_retry.WithMax(util.MaxGRPCRetriesOnStream),
)
if errSub != nil {
return errSub
}
for {
if pc.stop.Load() {
logger.Info("Stopping")
break
}
event, err := stream.Recv()
if err != nil {
logger.WithError(err).Error("event recv failed")
break
}
// The expected contract is just the latest version will be sent to us
if len(event.Versions) != 1 {
logger.Info("Expected a single model version", "numVersions", len(event.Versions), "name", event.GetPipelineName())
continue
}
pv, err := pipeline.CreatePipelineVersionWithStateFromProto(event.Versions[0])
if err != nil {
logger.Warningf("Failed to create pipeline state for pipeline %s with %s", event.PipelineName, protojson.Format(event))
continue
}
logger.Debugf("Processing pipeline %s version %d with state %s", pv.Name, pv.Version, pv.State.Status.String())
pc.pipelineStatusUpdater.Update(pv)
}
logger.Infof("Closing connection to scheduler")
defer func() {
_ = stream.CloseSend()
}()
return nil
}