-
Notifications
You must be signed in to change notification settings - Fork 1k
/
client.go
94 lines (79 loc) · 2.83 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package feast
import (
"context"
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/feast-dev/feast/sdk/go/protos/feast/serving"
"github.com/feast-dev/feast/sdk/go/protos/feast/types"
"google.golang.org/grpc"
"go.opencensus.io/plugin/ocgrpc"
)
// Client is a feast serving client.
type Client interface {
GetOnlineFeatures(ctx context.Context, req *OnlineFeaturesRequest) (*OnlineFeaturesResponse, error)
GetFeastServingInfo(ctx context.Context, in *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error)
Close() error
}
// GrpcClient is a grpc client for feast serving.
type GrpcClient struct {
cli serving.ServingServiceClient
conn *grpc.ClientConn
}
// NewGrpcClient constructs a client that can interact via grpc with the feast serving instance at the given host:port.
func NewGrpcClient(host string, port int) (*GrpcClient, error) {
feastCli := &GrpcClient{}
adr := fmt.Sprintf("%s:%d", host, port)
conn, err := grpc.Dial(adr, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), grpc.WithInsecure())
if err != nil {
return nil, err
}
feastCli.cli = serving.NewServingServiceClient(conn)
feastCli.conn = conn
return feastCli, nil
}
// GetOnlineFeatures gets the latest values of the request features from the Feast serving instance provided.
func (fc *GrpcClient) GetOnlineFeatures(ctx context.Context, req *OnlineFeaturesRequest) (
*OnlineFeaturesResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "get_online_features")
defer span.Finish()
featuresRequest, err := req.buildRequest()
if err != nil {
return nil, err
}
resp, err := fc.cli.GetOnlineFeatures(ctx, featuresRequest)
// collect unqiue entity refs from entity rows
entityRefs := make(map[string]struct{})
for _, entityRows := range req.Entities {
for ref, _ := range entityRows {
entityRefs[ref] = struct{}{}
}
}
// strip projects from to projects
for _, fieldValue := range resp.GetFieldValues() {
stripFields := make(map[string]*types.Value)
for refStr, value := range fieldValue.Fields {
_, isEntity := entityRefs[refStr]
if !isEntity { // is feature ref
featureRef, err := parseFeatureRef(refStr, true)
if err != nil {
return nil, err
}
refStr = toFeatureRefStr(featureRef)
}
stripFields[refStr] = value
}
fieldValue.Fields = stripFields
}
return &OnlineFeaturesResponse{RawResponse: resp}, err
}
// GetFeastServingInfo gets information about the feast serving instance this client is connected to.
func (fc *GrpcClient) GetFeastServingInfo(ctx context.Context, in *serving.GetFeastServingInfoRequest) (
*serving.GetFeastServingInfoResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "get_info")
defer span.Finish()
return fc.cli.GetFeastServingInfo(ctx, in)
}
// Close the grpc connection.
func (fc *GrpcClient) Close() error {
return fc.conn.Close()
}