Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Go Sql Online Store #2446

Merged
merged 43 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
501f59d
Initial structure for go sqlite online store
kevjumba Mar 23, 2022
0fbd8dc
Somewhat intermediate state
kevjumba Mar 23, 2022
21c8900
Add sqlite online store for go for testing
kevjumba Mar 24, 2022
01d9a1d
Revert
kevjumba Mar 24, 2022
2c4fccd
Revert
kevjumba Mar 24, 2022
34dcd15
Clean up
kevjumba Mar 24, 2022
80f4579
Address review issues
kevjumba Mar 24, 2022
17acc6a
Fix/address issues
kevjumba Mar 25, 2022
64fa88b
lint
kevjumba Mar 25, 2022
0601f0a
Fix
kevjumba Mar 25, 2022
2233ff8
Fix
kevjumba Mar 25, 2022
4de021b
Make integration test work
kevjumba Mar 25, 2022
e3f9970
Fix tests
kevjumba Mar 25, 2022
d9f5333
Fix tests
kevjumba Mar 25, 2022
a631c52
debugging
kevjumba Mar 25, 2022
993ead4
Debug
kevjumba Mar 25, 2022
63fbee6
Debug
kevjumba Mar 25, 2022
c325834
Debug
kevjumba Mar 25, 2022
f2202b8
Debug
kevjumba Mar 25, 2022
c242587
Debug
kevjumba Mar 25, 2022
08e51fb
Debug
kevjumba Mar 25, 2022
b551639
Remove feature_repo files
kevjumba Mar 28, 2022
0755e05
update gitignore
kevjumba Mar 28, 2022
eb8a320
Clean up code
kevjumba Mar 28, 2022
c3472bd
Update go mod
kevjumba Mar 28, 2022
a5b6607
Update makefile
kevjumba Mar 28, 2022
5f68689
Fix gitignore issue
kevjumba Mar 28, 2022
3896f99
Update makefile
kevjumba Mar 28, 2022
111c2d0
Update makefile
kevjumba Mar 28, 2022
c6197ce
Update makefile
kevjumba Mar 28, 2022
a5d4d28
Update makefile
kevjumba Mar 28, 2022
14d5602
Update makefile
kevjumba Mar 28, 2022
02eda7a
Revert worfklow
kevjumba Mar 28, 2022
14876a2
Update build path
kevjumba Mar 29, 2022
3bfcfad
remove
kevjumba Mar 29, 2022
8739e68
rename
kevjumba Mar 29, 2022
bd052a3
Address review
kevjumba Mar 29, 2022
189d3d2
fix tests
kevjumba Mar 29, 2022
acc86bb
Fix
kevjumba Mar 29, 2022
34ab63d
see if this fixes test
kevjumba Mar 29, 2022
fa0abf3
Fix
kevjumba Mar 29, 2022
941aea8
revert
kevjumba Mar 29, 2022
025a7e4
Will add in separate pr to update cryptography
kevjumba Mar 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
infra/scripts/*.conf
go/internal/test/feature_repo

# Translations
*.mo
Expand Down Expand Up @@ -204,7 +205,7 @@ ui/.pnp
ui/.pnp.js
ui/coverage
ui/build
ui/feature_repo/data/online.db
ui/feature_repo/data/online.db
ui/feature_repo/registry.db
ui/.vercel

Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies
python -m pip install pybindgen==0.22.0
cd sdk/python && python setup.py build_go_lib

# Needs feast package to setup the feature store
test-go: compile-protos-go
pip install -e "sdk/python[ci]"
go test ./...

format-go:
Expand Down
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@ require (
github.com/go-python/gopy v0.4.0
github.com/go-redis/redis/v8 v8.11.4
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.1.2
github.com/google/uuid v1.2.0
github.com/mattn/go-sqlite3 v1.14.12
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
)

require (
github.com/apache/thrift v0.16.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/pierrec/lz4/v4 v4.1.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/exp v0.0.0-20211028214138-64b4c8e87d1a // indirect
Expand All @@ -31,7 +35,6 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20220118154757-00ab72f36ad5 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
Expand Down
215 changes: 210 additions & 5 deletions go.sum

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions go/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
Expand All @@ -24,6 +25,9 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request
}, nil
}

// Returns an object containing the response to GetOnlineFeatures.
// Metadata contains featurenames that corresponds to the number of rows in response.Results.
// Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format.
func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
featuresOrService, err := s.fs.ParseFeatures(request.GetKind())
if err != nil {
Expand All @@ -36,6 +40,9 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
featuresOrService.FeatureService,
request.GetEntities(),
request.GetFullFeatureNames())
if err != nil {
return nil, err
}

resp := &serving.GetOnlineFeaturesResponse{
Results: make([]*serving.GetOnlineFeaturesResponse_FeatureVector, 0),
Expand Down
95 changes: 78 additions & 17 deletions go/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,42 @@ package main

import (
"context"
"net"
"path/filepath"
"reflect"
"runtime"
"testing"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/test"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
"net"
"path/filepath"
"runtime"
"testing"
)

// Return absolute path to the test_repo directory regardless of the working directory
func getRepoPath() string {
func getRepoPath(basePath string) string {
// Get the file path of this source file, regardless of the working directory
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("couldn't find file path of the test file")
if basePath == "" {
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("couldn't find file path of the test file")
}
return filepath.Join(filename, "..", "..", "feature_repo")
} else {
return filepath.Join(basePath, "feature_repo")
}
return filepath.Join(filename, "..", "..", "feature_repo")
}

func getClient(ctx context.Context) (serving.ServingServiceClient, func()) {
// Starts a new grpc server, registers the serving service and returns a client.
func getClient(ctx context.Context, basePath string) (serving.ServingServiceClient, func()) {
buffer := 1024 * 1024
listener := bufconn.Listen(buffer)

server := grpc.NewServer()
config, err := feast.NewRepoConfigFromFile(getRepoPath())
config, err := feast.NewRepoConfigFromFile(getRepoPath(basePath))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -58,21 +67,73 @@ func getClient(ctx context.Context) (serving.ServingServiceClient, func()) {
}

func TestGetFeastServingInfo(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
ctx := context.Background()
client, closer := getClient(ctx)
// Pregenerated using `feast init`.
dir := "."
err := test.SetupFeatureRepo(dir)
assert.Nil(t, err)
defer test.CleanUpRepo(dir)
client, closer := getClient(ctx, dir)
defer closer()
response, err := client.GetFeastServingInfo(ctx, &serving.GetFeastServingInfoRequest{})
assert.Nil(t, err)
assert.Equal(t, feastServerVersion, response.Version)
}

func TestGetOnlineFeatures(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
func TestGetOnlineFeaturesSqlite(t *testing.T) {
ctx := context.Background()
client, closer := getClient(ctx)
// Pregenerated using `feast init`.
dir := "."
err := test.SetupFeatureRepo(dir)
assert.Nil(t, err)
defer test.CleanUpRepo(dir)
client, closer := getClient(ctx, dir)
defer closer()
response, err := client.GetOnlineFeatures(ctx, &serving.GetOnlineFeaturesRequest{})
entities := make(map[string]*types.RepeatedValue)
entities["driver_id"] = &types.RepeatedValue{
Val: []*types.Value{
{Val: &types.Value_Int64Val{Int64Val: 1001}},
{Val: &types.Value_Int64Val{Int64Val: 1003}},
{Val: &types.Value_Int64Val{Int64Val: 1005}},
},
}
request := &serving.GetOnlineFeaturesRequest{
Kind: &serving.GetOnlineFeaturesRequest_Features{
Features: &serving.FeatureList{
Val: []string{"driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips"},
},
},
Entities: entities,
}
response, err := client.GetOnlineFeatures(ctx, request)
expectedEntityValuesResp := []*types.Value{
{Val: &types.Value_Int64Val{Int64Val: 1001}},
{Val: &types.Value_Int64Val{Int64Val: 1003}},
{Val: &types.Value_Int64Val{Int64Val: 1005}},
}
expectedFeatureNamesResp := []string{"driver_id", "conv_rate", "acc_rate", "avg_daily_trips"}
assert.Nil(t, err)
assert.NotNil(t, response)
rows, err := test.ReadParquet(filepath.Join(dir, "feature_repo", "data", "driver_stats.parquet"))
assert.Nil(t, err)
entityKeys := map[int64]bool{1001: true, 1003: true, 1005: true}
correctFeatures := test.GetLatestFeatures(rows, entityKeys)
expectedConvRateValues := []*types.Value{}
expectedAccRateValues := []*types.Value{}
expectedAvgDailyTripsValues := []*types.Value{}

for _, key := range []int64{1001, 1003, 1005} {
expectedConvRateValues = append(expectedConvRateValues, &types.Value{Val: &types.Value_FloatVal{FloatVal: correctFeatures[key].Conv_rate}})
expectedAccRateValues = append(expectedAccRateValues, &types.Value{Val: &types.Value_FloatVal{FloatVal: correctFeatures[key].Acc_rate}})
expectedAvgDailyTripsValues = append(expectedAvgDailyTripsValues, &types.Value{Val: &types.Value_Int64Val{Int64Val: int64(correctFeatures[key].Avg_daily_trips)}})
}
// Columnar so get in column format row by row should have column names of all features
assert.Equal(t, len(response.Results), 4)

assert.True(t, reflect.DeepEqual(response.Results[0].Values, expectedEntityValuesResp))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add here check for feature values as well, since we read them from the source file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOne.

assert.True(t, reflect.DeepEqual(response.Results[1].Values, expectedConvRateValues))
assert.True(t, reflect.DeepEqual(response.Results[2].Values, expectedAccRateValues))
assert.True(t, reflect.DeepEqual(response.Results[3].Values, expectedAvgDailyTripsValues))

assert.True(t, reflect.DeepEqual(response.Metadata.FeatureNames.Val, expectedFeatureNamesResp))
}
7 changes: 3 additions & 4 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
}
result := make([]*FeatureVector, 0)
arrowMemory := memory.NewGoAllocator()

for _, groupRef := range groupedRefs {
featureData, err := fs.readFromOnlineStore(ctx, groupRef.entityKeys, groupRef.featureViewNames, groupRef.featureNames)
if err != nil {
Expand Down Expand Up @@ -348,7 +349,6 @@ func (fs *FeatureStore) getFeatureViewsToUseByFeatureRefs(features []string, hid
fvs := make(map[string]*FeatureView)
requestFvs := make(map[string]*RequestFeatureView)
odFvs := make(map[string]*OnDemandFeatureView)

featureViews, err := fs.listFeatureViews(hideDummyEntity)
if err != nil {
return nil, nil, nil, nil, err
Expand Down Expand Up @@ -405,7 +405,6 @@ func (fs *FeatureStore) getFeatureViewsToUseByFeatureRefs(features []string, hid
" feature view %s and that you have registered it by running \"apply\"", featureViewName, featureViewName)
}
}

return fvs, fvsToUse, requestFvsToUse, odFvsToUse, nil
}

Expand Down Expand Up @@ -561,9 +560,9 @@ func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*p
requestedFeatureNames []string,
) ([][]FeatureData, error) {
numRows := len(entityRows)
entityRowsValue := make([]prototypes.EntityKey, numRows)
entityRowsValue := make([]*prototypes.EntityKey, numRows)
for index, entityKey := range entityRows {
entityRowsValue[index] = prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
entityRowsValue[index] = &prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
}
return fs.onlineStore.OnlineRead(ctx, entityRowsValue, requestedFeatureViewNames, requestedFeatureNames)
}
Expand Down
8 changes: 5 additions & 3 deletions go/internal/feast/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package feast
import (
"context"
"fmt"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/golang/protobuf/ptypes/timestamp"
Expand Down Expand Up @@ -33,7 +34,7 @@ type OnlineStore interface {
// Feature object as pointers in GetOnlineFeaturesResponse)
// => allocate memory for each field once in OnlineRead
// and reuse them in GetOnlineFeaturesResponse?
OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing the interface here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't make sense to have to copy the EntityKey around, changing it to a pointer doesn't change the functionality, just makes it more efficient

// Destruct must be call once user is done using OnlineStore
// This is to comply with the Connector since we have to close the plugin
Destruct()
Expand All @@ -51,12 +52,13 @@ func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool)
func NewOnlineStore(config *RepoConfig) (OnlineStore, error) {
onlineStoreType, ok := getOnlineStoreType(config.OnlineStore)
if !ok {
return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore)
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
Comment on lines 54 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong ? DOn't we need ok && onlineStoreType == "sqlite" before we do this code path?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the python implementation, no type for online store config automatically assumes sqlite as the onlinestore => I believe I tried adding type = sqlite there were some small config collisions so I decided it makes sense to adhere to the python online store configuraiton.

}
if onlineStoreType == "redis" {
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
return onlineStore, err
} else {
return nil, fmt.Errorf("%s online store type is currently not supported; only Redis is supported", onlineStoreType)
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
}
}
11 changes: 6 additions & 5 deletions go/internal/feast/redisonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"encoding/binary"
"errors"
"fmt"
"sort"
"strconv"
"strings"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"github.com/spaolacci/murmur3"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"sort"
"strconv"
"strings"
)

type redisType int
Expand Down Expand Up @@ -117,7 +118,7 @@ func getRedisType(onlineStoreConfig map[string]interface{}) (redisType, error) {
return t, nil
}

func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
featureCount := len(featureNames)
index := featureCount
featureViewIndices := make(map[string]int)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.En
redisKeyToEntityIndex := make(map[string]int)
for i := 0; i < len(entityKeys); i++ {

var key, err = buildRedisKey(r.project, &entityKeys[i])
var key, err = buildRedisKey(r.project, entityKeys[i])
if err != nil {
return nil, err
}
Expand Down
Loading